[jboss-cvs] JBoss Messaging SVN: r4884 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 27 23:50:18 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-27 23:50:18 -0400 (Wed, 27 Aug 2008)
New Revision: 4884
Modified:
branches/Branch_JBMESSAGING-1314/src/config/queues.xml
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Few Tweaks, docs and making using the configuration on queue-max-size
Modified: branches/Branch_JBMESSAGING-1314/src/config/queues.xml
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/config/queues.xml 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/config/queues.xml 2008-08-28 03:50:18 UTC (rev 4884)
@@ -93,6 +93,10 @@
<clustered>false</clustered>
</queue-settings>
+ <queue-settings match="queuejms.MyQueue">
+ <max-size-bytes>104857600</max-size-bytes>
+ </queue-settings>
+
<!--default for catch all-->
<queue-settings match="*">
<clustered>false</clustered>
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/LastPageRecord.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -28,7 +28,7 @@
/**
*
- * Stores the last pageID processed during depage, to detect duplications during the delete
+ * Stores the last pageID processed during depage, to detect duplications after the delete
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
@@ -46,10 +46,10 @@
// Public --------------------------------------------------------
- /** Internal record with the primary key, used on the journal/database*/
+ /** Internal field with the primary key, used on the journal/database */
long getRecordId();
- /** Internal record with the primary key, used on the journal/database*/
+ /** Internal field with the primary key, used on the journal/database */
void setRecordId(long recordId);
SimpleString getDestination();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -97,7 +97,7 @@
* Or else the record could live forever on the journal.
* @throws Exception
* */
- void clearLastRecord(LastPageRecord lastRecord) throws Exception;
+ void clearLastPageRecord(LastPageRecord lastRecord) throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -44,6 +44,13 @@
SimpleString getStoreName();
+ /** Maximum number of bytes allowed in memory */
+ long getMaxSizeBytes();
+
+ long getQueueSize();
+
+ long addQueueSize(long add);
+
/** @return true if paging was started, or false if paging was already started before this call */
boolean startPaging() throws Exception;
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -23,8 +23,10 @@
package org.jboss.messaging.core.paging;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+
/**
* The integration point between the PagingManger and the File System (aka SequentialFiles)
*
@@ -34,6 +36,6 @@
public interface PagingStoreFactory
{
- PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName);
+ PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName, QueueSettings queueSettings);
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -60,7 +60,6 @@
// Attributes ----------------------------------------------------
private final int pageId;
- private final PagingStoreFactory storeFactory;
private final AtomicInteger numberOfMessages = new AtomicInteger(0);
private final SequentialFile file;
private final SequentialFileFactory fileFactory;
@@ -71,12 +70,11 @@
// Constructors --------------------------------------------------
- public PageImpl(final SequentialFileFactory factory, final SequentialFile file, PagingStoreFactory storeFactory, final int pageId) throws Exception
+ public PageImpl(final SequentialFileFactory factory, final SequentialFile file,final int pageId) throws Exception
{
this.pageId = pageId;
this.file = file;
this.fileFactory = factory;
- this.storeFactory = storeFactory;
if (factory.isSupportsCallbacks())
{
callback = new PagingCallback();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -29,6 +29,7 @@
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
/**
@@ -59,13 +60,13 @@
// Public --------------------------------------------------------
- public PagingStore newStore(final SimpleString destinationName)
+ public PagingStore newStore(final SimpleString destinationName, QueueSettings settings)
{
final String destinationDirectory = directory + "/" + destinationName.toString();
File destinationFile = new File(destinationDirectory);
destinationFile.mkdirs();
- return new PagingStoreImpl(newFileFactory(destinationDirectory), this, destinationName, pageSize);
+ return new PagingStoreImpl(newFileFactory(destinationDirectory), destinationName, pageSize, settings);
}
// Package protected ---------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -29,6 +29,8 @@
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
/**
@@ -46,15 +48,18 @@
private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
private final PagingStoreFactory pagingSPI;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PagingManagerImpl(final PagingStoreFactory pagingSPI)
+ public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository<QueueSettings> queueSettingsRepository)
{
this.pagingSPI = pagingSPI;
+ this.queueSettingsRepository = queueSettingsRepository;
}
// Public --------------------------------------------------------
@@ -113,7 +118,7 @@
private PagingStore newStore(final SimpleString destinationName)
{
- return pagingSPI.newStore(destinationName);
+ return pagingSPI.newStore(destinationName, this.queueSettingsRepository.getMatch(destinationName.toString()));
}
private void validateStarted()
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -37,7 +38,7 @@
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PageMessage;
import org.jboss.messaging.core.paging.Pager;
-import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
/**
@@ -59,28 +60,28 @@
private final SimpleString storeName;
- private final PagingStoreFactory storeFactory;
-
private final SequentialFileFactory fileFactory;
private final long maxPageSize;
- private volatile Thread dequeueThread;
+ private final QueueSettings queueSettings;
+ // Bytes consumed by the queue on the memory
+ private final AtomicLong sizeInBytes = new AtomicLong();
+ private volatile Thread dequeueThread;
private volatile int numberOfPages;
private volatile int firstPageId = Integer.MAX_VALUE;
private volatile int currentPageId;
private volatile Page currentPage;
- // This is supposed to perform better than synchronized methods
- // synchronizedBlockLock protects opening/closing and messing up with IDs
- private final Semaphore synchronizedBlockLock = new Semaphore(1);
+ // positioningGlobalLock protects opening/closing and messing up with positions (currentPage and IDs)
+ private final Semaphore positioningGlobalLock = new Semaphore(1);
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private volatile boolean initialized = false;
- private volatile LastPageRecord lastRecord;
+ private volatile LastPageRecord lastPageRecord;
// Static --------------------------------------------------------
@@ -88,12 +89,12 @@
// Constructors --------------------------------------------------
- public PagingStoreImpl(final SequentialFileFactory fileFactory, final PagingStoreFactory storeFactory, final SimpleString storeName, final long maxPageSize)
+ public PagingStoreImpl(final SequentialFileFactory fileFactory, final SimpleString storeName, final long maxPageSize, QueueSettings queueSettings)
{
- this.storeFactory = storeFactory;
this.fileFactory = fileFactory;
this.storeName = storeName;
this.maxPageSize = maxPageSize;
+ this.queueSettings = queueSettings;
}
@@ -101,7 +102,23 @@
// PagingStore implementation ------------------------------------
+ public long getQueueSize()
+ {
+ return sizeInBytes.get();
+ }
+ public long addQueueSize(long delta)
+ {
+ return sizeInBytes.addAndGet(delta);
+ }
+
+ /** Maximum number of bytes allowed in memory */
+ public long getMaxSizeBytes()
+ {
+ return queueSettings.getMaxSizeBytes();
+ }
+
+
public boolean isPaging()
{
lock.readLock().lock();
@@ -134,10 +151,8 @@
{
validateInit();
- // Read needs both global and writeLock
- synchronizedBlockLock.acquire(); // This is a replacement synchronized.
- // Can't change any IDs while depaging.
- lock.writeLock().lock(); // Wait pending writes to finish before depage.
+ positioningGlobalLock.acquire(); // Can't change currentPage or any of ids without a global lock
+ lock.writeLock().lock(); // Wait pending writes to finish before entering the block
try
{
@@ -194,7 +209,7 @@
finally
{
lock.writeLock().unlock();
- synchronizedBlockLock.release();
+ positioningGlobalLock.release();
}
}
@@ -206,11 +221,10 @@
int bytesToWrite = fileFactory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
- // This would be a synchronized block... (but using a Semaphore)
- synchronizedBlockLock.acquire();
+ // The only thing single-threaded done on paging is positioning and check-files (verifying if we need to open a new page file)
+ positioningGlobalLock.acquire();
- // The only thing single-threaded done on paging is positioning and check-files (verifying if we need to open a new page file)
- // After we have it allocated we keep all the threads working until we need to move to a new file (in which case we demand a writeLock, to wait for the writes to finish)
+ // After we have it locked we keep all the threads working until we need to move to a new file (in which case we demand a writeLock, to wait for the writes to finish)
try
{
if (currentPage == null)
@@ -220,7 +234,7 @@
if ((pageUsedSize.addAndGet(bytesToWrite) > maxPageSize && currentPage.getNumberOfMessages() > 0))
{
- // Wait any pending transaction on the current page to finish before we can open another page.
+ // Wait any pending write on the current page to finish before we can open another page.
lock.writeLock().lock();
try
{
@@ -241,7 +255,7 @@
}
finally
{
- synchronizedBlockLock.release();
+ positioningGlobalLock.release();
}
// End of a synchronized block..
@@ -315,12 +329,12 @@
public LastPageRecord getLastRecord()
{
- return lastRecord;
+ return lastPageRecord;
}
public void setLastRecord(LastPageRecord record)
{
- this.lastRecord = record;
+ this.lastPageRecord = record;
}
@@ -410,7 +424,16 @@
{
validateInit();
- synchronizedBlockLock.acquire();
+ positioningGlobalLock.acquire();
+
+ // StartPaging would change positioning (by changing currentPage), because of that it needs to be in a synchronized block.
+ // Case this lock becomes a contention, we will need to implement the dual-lock antipattern (which I tried to avoid):
+ // if (currentPage == null)
+ // {
+ // synchronizedBlockLock.acquire();
+ // if (currentPage == null) // this dual-verification should be fine as currentPage is volatile
+ // etc, etc...
+
try
{
if (currentPage == null)
@@ -425,7 +448,7 @@
}
finally
{
- synchronizedBlockLock.release();
+ positioningGlobalLock.release();
}
}
@@ -505,7 +528,7 @@
file.close();
- return new PageImpl(fileFactory, file, storeFactory, page);
+ return new PageImpl(fileFactory, file, page);
}
/**
@@ -555,11 +578,11 @@
Page page = depage();
if (page == null)
{
- if (lastRecord != null)
+ if (lastPageRecord != null)
{
- listener.clearLastRecord(lastRecord);
+ listener.clearLastPageRecord(lastPageRecord);
}
- lastRecord = null;
+ lastPageRecord = null;
break;
}
page.open();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -34,7 +34,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
@@ -70,17 +69,17 @@
public class PostOfficeImpl implements PostOffice
{
- private static final long MAX_SIZE = 100 * 1024 * 1024;
-
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
- private static final boolean isTrace = log.isTraceEnabled();
+ //private static final boolean isTrace = log.isTraceEnabled();
+ private static final boolean isTrace = true;
// This is just a debug tool method.
// During debugs you could make log.trace as log.info, and change the variable isTrace above
private static void trace(String message)
{
- log.trace(message);
+ //log.trace(message);
+ log.info(message);
}
//private final int nodeID;
@@ -93,10 +92,6 @@
private final ConcurrentMap<SimpleString, FlowController> flowControllers = new ConcurrentHashMap<SimpleString, FlowController>();
- private final ConcurrentMap<SimpleString, AtomicLong> queueSize = new ConcurrentHashMap<SimpleString, AtomicLong>();
-
- private final AtomicLong totalSize = new AtomicLong(0);
-
private final QueueFactory queueFactory;
private final boolean checkAllowable;
@@ -246,16 +241,7 @@
public List<MessageReference> route(final ServerMessage message) throws Exception
{
- if (pager.addSize(message) > MAX_SIZE)
- {
- // TODO: move this inside the Pager
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- if (store.startPaging())
- {
- log.info("Starting paging on " + message.getDestination());
- }
- }
+ pager.addSize(message);
SimpleString address = message.getDestination();
@@ -328,30 +314,8 @@
return flowControllers.get(address);
}
- public long getSize(SimpleString destination)
- {
- return getQueueSize(destination).get();
- }
-
// Private -----------------------------------------------------------------
-
- private AtomicLong getQueueSize(SimpleString destination)
- {
- AtomicLong size = this.queueSize.get(destination);
- if (size == null)
- {
- size = new AtomicLong(0);
- AtomicLong oldSize = this.queueSize.putIfAbsent(destination, size);
- if (oldSize != null)
- {
- size = oldSize;
- }
- }
-
- return size;
- }
-
private Binding createBinding(final SimpleString address, final SimpleString name, final Filter filter,
final boolean durable, final boolean temporary)
{
@@ -453,23 +417,17 @@
for (SimpleString destination: dests)
{
PagingStore store = pagingManager.getPageStore(destination);
- startDepageThread(store);
+ store.startDequeueThread(pager);
}
}
- private boolean startDepageThread(PagingStore store) throws Exception
- {
- return store.startDequeueThread(this.pager);
- }
-
-
// TODO this probably will become a separate class?
private class PagerImpl implements Pager
{
private final ConcurrentMap</*TransactionID*/ Long , PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
- public void clearLastRecord(LastPageRecord lastRecord) throws Exception
+ public void clearLastPageRecord(LastPageRecord lastRecord) throws Exception
{
trace("Clearing lastRecord information " + lastRecord.getLastId());
storageManager.storeDelete(lastRecord.getRecordId());
@@ -573,7 +531,7 @@
ref.getQueue().addLast(ref);
}
- return PostOfficeImpl.this.getQueueSize(destination).get() < MAX_SIZE;
+ return pagingStore.getQueueSize() < pagingStore.getMaxSizeBytes();
}
public void loadLastPage(LastPageRecord lastPage) throws Exception
@@ -589,27 +547,11 @@
public void messageDone(ServerMessage message) throws Exception
{
- final long size = addSize(message.getDestination(), message.getEncodeSize() * -1);
-
- if (size < MAX_SIZE)
- {
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- if (startDepageThread(store))
- {
- log.info("Starting depaging Thread, size = " + size);
- }
- }
+ addSize(message.getDestination(), message.getEncodeSize() * -1);
}
- private long addSize(SimpleString destination, long size)
- {
- totalSize.addAndGet(size);
- return getQueueSize(destination).addAndGet(size);
- }
-
/** To be called when a rollback is called after messageDone was called */
- public long addSize(ServerMessage message) throws Exception
+ public long addSize(final ServerMessage message) throws Exception
{
return addSize(message.getDestination(), message.getEncodeSize());
}
@@ -642,7 +584,6 @@
this.transactions.remove(pageTrans);
}
}
-
public void sync(Collection<SimpleString> destinationsToSync) throws Exception
{
@@ -651,7 +592,46 @@
pagingManager.getPageStore(destination).sync();
}
}
+
+
+
+ private long addSize(final SimpleString destination, final long size) throws Exception
+ {
+ final PagingStore store = pagingManager.getPageStore(destination);
+
+ final long addressSize = store.addQueueSize(size);
+
+ final long maxSize = store.getMaxSizeBytes();
+ if (size > 0)
+ {
+
+ if (maxSize > 0 && addressSize > maxSize)
+ {
+ if (store.startPaging())
+ {
+ if (isTrace)
+ {
+ trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
+ }
+ }
+ }
+ }
+ else
+ {
+ if ( maxSize > 0 && addressSize < maxSize)
+ {
+ if (store.startDequeueThread(this))
+ {
+ log.info("Starting depaging Thread, size = " + addressSize);
+ }
+ }
+ }
+
+ return addressSize;
+ }
+
+
}
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -34,14 +34,11 @@
import org.jboss.messaging.core.management.MessagingServerManagement;
import org.jboss.messaging.core.management.impl.MessagingServerManagementImpl;
import org.jboss.messaging.core.paging.PagingManager;
-import org.jboss.messaging.core.paging.PagingStoreFactory;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
-import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.RemotingService;
@@ -90,11 +87,11 @@
// wired components
private SecurityStore securityStore;
- private HierarchicalRepository<QueueSettings> queueSettingsRepository = new HierarchicalObjectRepository<QueueSettings>();
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository = new HierarchicalObjectRepository<QueueSettings>();
private ScheduledExecutorService scheduledExecutor;
private QueueFactory queueFactory;
private PostOffice postOffice;
- private ExecutorFactory executorFactory = new OrderedExecutorFactory(Executors.newCachedThreadPool(new JBMThreadFactory("JBM-async-session-delivery-threads")));
+ private final ExecutorFactory executorFactory = new OrderedExecutorFactory(Executors.newCachedThreadPool(new JBMThreadFactory("JBM-async-session-delivery-threads")));
private HierarchicalRepository<Set<Role>> securityRepository;
private ResourceManager resourceManager;
private MessagingServerPacketHandler serverPacketHandler;
@@ -172,7 +169,7 @@
scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));
queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
- PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
+ PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()), queueSettingsRepository);
postOffice = new PostOfficeImpl(storageManager, pagingManager,
queueFactory, configuration.isRequireDestinations());
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -533,11 +533,12 @@
private synchronized HandleStatus add(final MessageReference ref, final boolean first)
{
- if (maxSizeBytes != -1 && sizeBytes.get() + ref.getMessage().getEncodeSize() >= maxSizeBytes)
- {
- return HandleStatus.BUSY;
- }
-
+ // TODO: Verify what this following clause means
+// if (maxSizeBytes != -1 && sizeBytes.get() + ref.getMessage().getEncodeSize() >= maxSizeBytes)
+// {
+// return HandleStatus.BUSY;
+// }
+
if (!first)
{
messagesAdded.incrementAndGet();
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -35,6 +35,9 @@
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.util.RandomUtil;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -61,8 +64,12 @@
public void testPagingManagerNIO() throws Exception
{
+ HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
+ queueSettings.setDefault(new QueueSettings());
+
+
PagingManagerImpl managerImpl =
- new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024));
+ new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024), queueSettings);
managerImpl.start();
PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/fakes/FakeManagerFactory.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -1,63 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.tests.unit.core.paging.fakes;
-
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
-import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
-
-public class FakeManagerFactory extends PagingManagerFactoryNIO
-{
-
- public FakeManagerFactory(long pageSize)
- {
- super("", pageSize);
- }
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
-
- @Override
- protected SequentialFileFactory newFileFactory(String destinationDirectory)
- {
- return new FakeSequentialFileFactory();
- }
-
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -34,7 +34,6 @@
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
import org.jboss.messaging.tests.util.RandomUtil;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -67,7 +66,7 @@
SequentialFile file = factory.createSequentialFile("00010.page", 1);
- PageImpl impl = new PageImpl(factory, file, new FakeManagerFactory(1024), 10);
+ PageImpl impl = new PageImpl(factory, file, 10);
assertEquals(10, impl.getPageId());
@@ -107,7 +106,7 @@
file = factory.createSequentialFile("00010.page", 1);
file.open();
- impl = new PageImpl(factory, file, new FakeManagerFactory(1024), 10);
+ impl = new PageImpl(factory, file, 10);
PageMessage msgs[] = impl.read();
@@ -117,7 +116,7 @@
for (int i = 0; i < msgs.length; i++)
{
- assertEquals((long)i, msgs[i].getMessage().getMessageID());
+ assertEquals((long)0, msgs[i].getMessage().getMessageID());
assertEquals(simpleDestination, msgs[i].getMessage().getDestination());
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -32,6 +32,9 @@
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -42,6 +45,15 @@
// Attributes ----------------------------------------------------
+
+ private static HierarchicalRepository<QueueSettings> repoSettings = new HierarchicalObjectRepository<QueueSettings>();
+ static
+ {
+ repoSettings.setDefault(new QueueSettings());
+ }
+
+
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -51,8 +63,11 @@
public void testGetStore() throws Exception
{
+ HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
+ queueSettings.setDefault(new QueueSettings());
+
PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- PagingManagerImpl manager = new PagingManagerImpl(spi);
+ PagingManagerImpl manager = new PagingManagerImpl(spi, queueSettings);
SimpleString destination = new SimpleString("some-destination");
@@ -69,7 +84,7 @@
PagingStore store = EasyMock.createNiceMock(PagingStore.class);
- EasyMock.expect(spi.newStore(destination)).andReturn(store);
+ EasyMock.expect(spi.newStore(EasyMock.eq(destination), EasyMock.isA(QueueSettings.class))).andReturn(store);
store.start();
@@ -105,7 +120,7 @@
public void testMultipleThreadsGetStore() throws Exception
{
PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- final PagingManagerImpl manager = new PagingManagerImpl(spi);
+ final PagingManagerImpl manager = new PagingManagerImpl(spi, repoSettings);
final SimpleString destination = new SimpleString("some-destination");
@@ -113,9 +128,9 @@
EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
- PagingStoreImpl storeImpl = new PagingStoreImpl(factory, spi, destination, 1);
+ PagingStoreImpl storeImpl = new PagingStoreImpl(factory, destination, 1, new QueueSettings());
- EasyMock.expect(spi.newStore(destination)).andStubReturn(storeImpl);
+ EasyMock.expect(spi.newStore(EasyMock.eq(destination), EasyMock.isA(QueueSettings.class))).andStubReturn(storeImpl);
EasyMock.replay(spi, factory);
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -34,8 +34,8 @@
import org.jboss.messaging.core.paging.impl.PageMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
-import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
import org.jboss.messaging.util.SimpleString;
/**
@@ -62,7 +62,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
+ PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2, new QueueSettings());
storeImpl.start();
@@ -78,7 +78,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
+ PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2, new QueueSettings());
storeImpl.start();
@@ -90,12 +90,12 @@
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
- ByteBuffer buffer = createRandomBuffer(10);
+ ByteBuffer buffer = createRandomBuffer(0, 10);
buffers.add(buffer);
SimpleString destination = new SimpleString("test");
- PageMessageImpl msg = createMessage(1l, destination, buffer);
+ PageMessageImpl msg = createMessage(destination, buffer);
assertTrue(storeImpl.isPaging());
@@ -105,7 +105,7 @@
storeImpl.sync();
- storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 2);
+ storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2, new QueueSettings());
storeImpl.start();
@@ -117,7 +117,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
+ PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10, new QueueSettings());
storeImpl.start();
@@ -132,11 +132,11 @@
for (int i = 0; i < 10; i++)
{
- ByteBuffer buffer = createRandomBuffer(10);
+ ByteBuffer buffer = createRandomBuffer(i+1l, 10);
buffers.add(buffer);
- PageMessageImpl msg = createMessage(i+1l, destination, buffer);
+ PageMessageImpl msg = createMessage(destination, buffer);
assertTrue(storeImpl.page(msg));
}
@@ -163,7 +163,7 @@
for (int i = 0; i < 10; i++)
{
- assertEquals(i + 1l, msg[i].getMessage().getMessageID());
+ assertEquals(0, msg[i].getMessage().getMessageID());
assertEqualsByteArrays(buffers.get(i).array(), msg[i].getMessage().getBody().array());
}
@@ -173,7 +173,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), destinationTestName, 1024 * 10);
+ TestSupportPageStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10, new QueueSettings());
storeImpl.start();
@@ -190,7 +190,7 @@
for (int i = 0; i < 10; i++)
{
- ByteBuffer buffer = createRandomBuffer(10);
+ ByteBuffer buffer = createRandomBuffer(i+1l, 10);
buffers.add(buffer);
@@ -200,7 +200,7 @@
}
- PageMessageImpl msg = createMessage(i+1l, destination, buffer);
+ PageMessageImpl msg = createMessage(destination, buffer);
assertTrue(storeImpl.page(msg));
}
@@ -224,7 +224,7 @@
for (int i = 0; i < 5; i++)
{
- assertEquals(pageNr*5 + i + 1l, msg[i].getMessage().getMessageID());
+ assertEquals(0, msg[i].getMessage().getMessageID());
assertEqualsByteArrays(buffers.get(pageNr*5 + i).array(), msg[i].getMessage().getBody().array());
}
}
@@ -233,7 +233,7 @@
assertTrue(storeImpl.isPaging());
- PageMessageImpl msg = createMessage(100, destination, buffers.get(0));
+ PageMessageImpl msg = createMessage(destination, buffers.get(0));
assertTrue(storeImpl.page(msg));
@@ -267,7 +267,7 @@
assertEquals(1, msgs.length);
- assertEquals(100l, msgs[0].getMessage().getMessageID());
+ assertEquals(0l, msgs[0].getMessage().getMessageID());
assertEqualsByteArrays(buffers.get(0).array(), msgs[0].getMessage().getBody().array());
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-27 16:36:27 UTC (rev 4883)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-28 03:50:18 UTC (rev 4884)
@@ -41,7 +41,7 @@
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.tests.unit.core.paging.fakes.FakeManagerFactory;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.util.RandomUtil;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -83,7 +83,7 @@
final ArrayList<Page> readPages = new ArrayList<Page>();
- final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
+ final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE, new QueueSettings());
storeImpl.start();
@@ -112,7 +112,7 @@
while (true)
{
long id = messageIdGenerator.incrementAndGet();
- PageMessageImpl msg = createMessage(id, destination, createRandomBuffer(5));
+ PageMessageImpl msg = createMessage(destination, createRandomBuffer(id, 5));
if (storeImpl.page(msg))
{
buffers.put(id, msg);
@@ -207,8 +207,12 @@
for (PageMessage msg : msgs)
{
- PageMessageImpl msgWritten = buffers.remove(msg.getMessage().getMessageID());
- buffers2.put(msg.getMessage().getMessageID(), msg);
+ msg.getMessage().getBody().rewind();
+ long id = msg.getMessage().getBody().getLong();
+ msg.getMessage().getBody().rewind();
+
+ PageMessageImpl msgWritten = buffers.remove(id);
+ buffers2.put(id, msg);
assertNotNull(msgWritten);
assertEquals (msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
@@ -229,7 +233,7 @@
fileTmp.close();
}
- TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new FakeManagerFactory(0), new SimpleString("test"), MAX_SIZE);
+ TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE, new QueueSettings());
storeImpl2.start();
int numberOfPages = storeImpl2.getNumberOfPages();
@@ -245,7 +249,7 @@
assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
long lastMessageId = messageIdGenerator.incrementAndGet();
- PageMessage lastMsg = createMessage(lastMessageId, destination, createRandomBuffer(5));
+ PageMessage lastMsg = createMessage(destination, createRandomBuffer(lastMessageId, 5));
storeImpl2.page(lastMsg);
buffers2.put(lastMessageId, lastMsg);
@@ -270,7 +274,9 @@
for (PageMessage msg: msgs)
{
- PageMessage msgWritten = buffers2.remove(msg.getMessage().getMessageID());
+ msg.getMessage().getBody().rewind();
+ long id = msg.getMessage().getBody().getLong();
+ PageMessage msgWritten = buffers2.remove(id);
assertNotNull(msgWritten);
assertEquals (msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
@@ -283,7 +289,8 @@
lastPage.close();
assertEquals(1, lastMessages.length);
- assertEquals(lastMessages[0].getMessage().getMessageID(), lastMessageId);
+ lastMessages[0].getMessage().getBody().rewind();
+ assertEquals(lastMessages[0].getMessage().getBody().getLong(), lastMessageId);
assertEqualsByteArrays(lastMessages[0].getMessage().getBody().array(), lastMsg.getMessage().getBody().array());
assertEquals(0, buffers2.size());
@@ -291,22 +298,22 @@
}
- protected PageMessageImpl createMessage(long messageId, SimpleString destination, ByteBuffer buffer)
+ protected PageMessageImpl createMessage(SimpleString destination, ByteBuffer buffer)
{
ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
System.currentTimeMillis(), (byte)0, new ByteBufferWrapper(buffer));
- msg.setMessageID((long)messageId);
-
msg.setDestination(destination);
return new PageMessageImpl(msg);
}
- protected ByteBuffer createRandomBuffer(int size)
+ protected ByteBuffer createRandomBuffer(long id, int size)
{
- ByteBuffer buffer = ByteBuffer.allocate(size);
+ ByteBuffer buffer = ByteBuffer.allocate(size + 8);
- for (int j = 0; j < buffer.limit(); j++)
+ buffer.putLong(id);
+
+ for (int j = 8; j < buffer.limit(); j++)
{
buffer.put(RandomUtil.randomByte());
}
More information about the jboss-cvs-commits
mailing list