[jboss-cvs] JBoss Messaging SVN: r4938 - in trunk: src/main/org/jboss/messaging/core/config and 24 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 12 23:51:33 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-09-12 23:51:32 -0400 (Fri, 12 Sep 2008)
New Revision: 4938
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/base/
trunk/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java
trunk/tests/src/org/jboss/messaging/tests/stress/paging/
trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/QueueFactory.java
trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/SubscriptionInfo.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
trunk/src/main/org/jboss/messaging/util/TypedProperties.java
trunk/src/main/org/jboss/messaging/util/VariableLatch.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/SubscriptionInfoTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java
Log:
JBMESSAGING-1314 - Completing implementation on paging
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/config/jbm-configuration.xml 2008-09-13 03:51:32 UTC (rev 4938)
@@ -94,6 +94,12 @@
</acceptor>
</remoting-acceptors>
+ <!-- Paging configuration -->
+
+ <paging-directory>data/paging</paging-directory>
+
+ <max-global-size-bytes>104857600</max-global-size-bytes>
+
<!-- Storage configuration -->
<bindings-directory>data/bindings</bindings-directory>
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -140,5 +140,9 @@
boolean isCreateJournalDir();
void setCreateJournalDir(boolean create);
+
+ long getMaxGlobalSizeBytes();
+
+ void setMaxGlobalSizeBytes(long maxGlobalSize);
}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -114,7 +114,12 @@
protected TransportConfiguration backupConnectorConfig;
+ // Paging related attributes
+
+ protected long maxGlobalSize = -1;
+ protected String pagingDirectory = DEFAULT_PAGING_DIR;
+
// Journal related attributes
protected String bindingsDirectory = DEFAULT_BINDINGS_DIRECTORY;
@@ -123,8 +128,6 @@
protected String journalDirectory = DEFAULT_JOURNAL_DIR;
- protected String pagingDirectory = DEFAULT_PAGING_DIR;
-
protected boolean createJournalDir = DEFAULT_CREATE_JOURNAL_DIR;
public JournalType journalType = DEFAULT_JOURNAL_TYPE;
@@ -393,6 +396,18 @@
return this.journalBufferReuseSize;
}
+
+ public long getMaxGlobalSizeBytes()
+ {
+ return this.maxGlobalSize;
+ }
+
+ public void setMaxGlobalSizeBytes(long maxGlobalSize)
+ {
+ this.maxGlobalSize = maxGlobalSize;
+ }
+
+
public boolean equals(Object other)
{
if (this == other)
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -313,6 +313,8 @@
journalDirectory = getString(e, "journal-directory", journalDirectory);
pagingDirectory = getString(e, "paging-directory", pagingDirectory);
+
+ maxGlobalSize = getLong(e, "max-global-size-bytes", maxGlobalSize);
createJournalDir = getBoolean(e, "create-journal-dir", createJournalDir);
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -40,8 +40,11 @@
public interface PagingManager extends MessagingComponent
{
+
+ boolean isGlobalPageMode();
+
/** To return the PageStore associated with the address */
- public PagingStore getPageStore(SimpleString address) throws Exception;
+ PagingStore getPageStore(SimpleString address) throws Exception;
/** An injection point for the PostOffice to inject itself */
void setPostOffice(PostOffice postOffice);
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -84,7 +84,7 @@
* @return false if a thread was already started, or if not in page mode
* @throws Exception
*/
- boolean startDepaging() throws Exception;
+ boolean startDepaging();
LastPageRecord getLastRecord();
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -23,6 +23,8 @@
package org.jboss.messaging.core.paging;
+import java.util.concurrent.Executor;
+
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -38,6 +40,8 @@
PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName, QueueSettings queueSettings);
+ Executor getPagingExecutor();
+
void setPagingManager(PagingManager manager);
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -73,6 +73,11 @@
// Public --------------------------------------------------------
+ public Executor getPagingExecutor()
+ {
+ return this.executor;
+ }
+
public PagingStore newStore(final SimpleString destinationName, QueueSettings settings)
{
final String destinationDirectory = directory + "/" + destinationName.toString();
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -29,6 +29,8 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
@@ -57,11 +59,21 @@
// Constants -----------------------------------------------------
+ private static final long WATERMARK_GLOBAL_PAGE = QueueSettings.DEFAULT_PAGE_SIZE_BYTES;
// Attributes ----------------------------------------------------
+
private volatile boolean started = false;
+ private final long maxGlobalSize;
+
+ private final AtomicLong globalSize = new AtomicLong(0);
+
+ private final AtomicBoolean globalMode = new AtomicBoolean(false);
+
+ private final AtomicBoolean globalDepageRunning = new AtomicBoolean(false);
+
private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
@@ -95,17 +107,25 @@
// Constructors --------------------------------------------------------------------------------------------------------------------
public PagingManagerImpl(final PagingStoreFactory pagingSPI, StorageManager storageManager,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository)
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final long maxGlobalSize)
{
this.pagingSPI = pagingSPI;
this.queueSettingsRepository = queueSettingsRepository;
this.storageManager = storageManager;
+ this.maxGlobalSize = maxGlobalSize;
}
// Public ---------------------------------------------------------------------------------------------------------------------------
// PagingManager implementation -----------------------------------------------------------------------------------------------------
+
+ public boolean isGlobalPageMode()
+ {
+ return globalMode.get();
+ }
+
public PagingStore getPageStore(final SimpleString storeName) throws Exception
{
validateStarted();
@@ -246,7 +266,18 @@
ref.getQueue().addLast(ref);
}
- return pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes();
+
+ if (globalMode.get())
+ {
+ return globalSize.get() < maxGlobalSize - WATERMARK_GLOBAL_PAGE &&
+ pagingStore.getMaxSizeBytes() <= 0 || pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes();
+ }
+ else
+ {
+ // If Max-size is not configured (-1) it will aways return true, as this method was probably called by global-depage
+ return pagingStore.getMaxSizeBytes() <= 0 || pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes();
+ }
+
}
public void setLastPage(LastPageRecord lastPage) throws Exception
@@ -262,12 +293,12 @@
public void messageDone(ServerMessage message) throws Exception
{
- addSize(message.getDestination(), message.getEncodeSize() * -1);
+ addSize(message.getDestination(), message.getMemoryEstimate() * -1);
}
public long addSize(final ServerMessage message) throws Exception
{
- return addSize(message.getDestination(), message.getEncodeSize());
+ return addSize(message.getDestination(), message.getMemoryEstimate());
}
public boolean page(ServerMessage message, long transactionId)
@@ -358,9 +389,11 @@
final long pageSize = store.getPageSizeBytes();
+
if (store.isDropWhenMaxSize() && size > 0)
{
- if (store.getAddressSize() + size > maxSize)
+ // if destination configured to drop messages && size is over the limit, we return -1 what means drop the message
+ if ((store.getAddressSize() + size > maxSize) || (maxGlobalSize > 0 && (globalSize.get() + size > maxGlobalSize)))
{
if (!store.isDroppedMessage())
{
@@ -377,12 +410,16 @@
}
else
{
+
+ long currentGlobalSize = globalSize.addAndGet(size);
+
final long addressSize = store.addAddressSize(size);
if (size > 0)
{
- if (maxSize > 0 && addressSize > maxSize)
+ if ((maxGlobalSize > 0 && (currentGlobalSize > maxGlobalSize)))
{
+ globalMode.set(true);
if (store.startPaging())
{
if (isTrace)
@@ -391,9 +428,26 @@
}
}
}
+ else
+ if ((maxSize > 0 && (addressSize > maxSize)))
+ {
+ if (store.startPaging())
+ {
+ if (isTrace)
+ {
+ trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
+ }
+ }
+ }
}
else
{
+ // When in Global mode, we use the default page size as the minimal watermark to start depage
+ if (globalMode.get() && currentGlobalSize < maxGlobalSize - QueueSettings.DEFAULT_PAGE_SIZE_BYTES)
+ {
+ startGlobalDepage();
+ }
+ else
if ( maxSize > 0 && addressSize < (maxSize - pageSize))
{
if (store.startDepaging())
@@ -407,7 +461,71 @@
}
}
+
+ private void startGlobalDepage()
+ {
+ if (globalDepageRunning.compareAndSet(false, true))
+ {
+ Runnable globalDepageRunnable = new GlobalDepager();
+ pagingSPI.getPagingExecutor().execute(globalDepageRunnable);
+ }
+ }
+
// Inner classes -------------------------------------------------
+
+ class GlobalDepager implements Runnable
+ {
+ public void run()
+ {
+ try
+ {
+ while (globalSize.get() < maxGlobalSize)
+ {
+ boolean depaged = false;
+ // Round robin depaging one page at the time from each destination
+ for (PagingStore store : stores.values())
+ {
+ if (globalSize.get() < maxGlobalSize)
+ {
+ if (store.isPaging())
+ {
+ depaged = true;
+ try
+ {
+ store.readPage();
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ if (!depaged)
+ {
+ break;
+ }
+ }
+
+ if (globalSize.get() < maxGlobalSize)
+ {
+
+ globalMode.set(false);
+ // Clearing possible messages still in page-mode
+ for (PagingStore store : stores.values())
+ {
+ store.startDepaging();
+ }
+ }
+ }
+ finally
+ {
+ PagingManagerImpl.this.globalDepageRunning.set(false);
+ }
+ }
+
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -44,6 +44,8 @@
/**
*
+ * @see PagingStore
+ *
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
@@ -197,10 +199,10 @@
}
page.open();
PageMessage messages[] = page.read();
- boolean addressFull = pagingManager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
+ boolean addressNotFull = pagingManager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
page.delete();
- return addressFull;
+ return addressNotFull;
}
@@ -363,7 +365,7 @@
}
}
- public boolean startDepaging() throws Exception
+ public boolean startDepaging()
{
lock.readLock().lock();
try
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -110,6 +110,9 @@
pagingManager.start();
}
+ // Injecting the postoffice (itself) on queueFactory for paging-control
+ queueFactory.setPostOffice(this);
+
loadBindings();
started = true;
@@ -439,8 +442,11 @@
for (SimpleString destination: dests)
{
- PagingStore store = pagingManager.getPageStore(destination);
- store.startDepaging();
+ if (!pagingManager.isGlobalPageMode())
+ {
+ PagingStore store = pagingManager.getPageStore(destination);
+ store.startDepaging();
+ }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -38,8 +38,6 @@
private boolean durable;
- private int maxSize;
-
private int consumerCount;
private int messageCount;
@@ -48,19 +46,19 @@
private SimpleString address;
- public SessionQueueQueryResponseMessage(final boolean durable, final int maxSize,
+ public SessionQueueQueryResponseMessage(final boolean durable,
final int consumerCount, final int messageCount, final SimpleString filterString,
final SimpleString address)
{
- this(durable, maxSize, consumerCount, messageCount, filterString, address, true);
+ this(durable, consumerCount, messageCount, filterString, address, true);
}
public SessionQueueQueryResponseMessage()
{
- this(false, 0, 0, 0, null, null, false);
+ this(false, 0, 0, null, null, false);
}
- private SessionQueueQueryResponseMessage(final boolean durable, final int maxSize,
+ private SessionQueueQueryResponseMessage(final boolean durable,
final int consumerCount, final int messageCount, final SimpleString filterString, final SimpleString address,
final boolean exists)
{
@@ -68,8 +66,6 @@
this.durable = durable;
- this.maxSize = maxSize;
-
this.consumerCount = consumerCount;
this.messageCount = messageCount;
@@ -96,11 +92,6 @@
return durable;
}
- public int getMaxSize()
- {
- return maxSize;
- }
-
public int getConsumerCount()
{
return consumerCount;
@@ -125,7 +116,6 @@
{
buffer.putBoolean(exists);
buffer.putBoolean(durable);
- buffer.putInt(maxSize);
buffer.putInt(consumerCount);
buffer.putInt(messageCount);
buffer.putNullableSimpleString(filterString);
@@ -136,7 +126,6 @@
{
exists = buffer.getBoolean();
durable = buffer.getBoolean();
- maxSize = buffer.getInt();
consumerCount = buffer.getInt();
messageCount = buffer.getInt();
filterString = buffer.getNullableSimpleString();
@@ -154,7 +143,6 @@
return super.equals(other) && this.exists == r.exists &&
this.durable == r.durable &&
- this.maxSize == r.maxSize &&
this.consumerCount == r.consumerCount &&
this.messageCount == r.messageCount &&
this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString) &&
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -51,8 +51,6 @@
HandleStatus addFirst(MessageReference ref);
- QueueSettings getSettings();
-
/**
* This method is used to add a List of MessageReferences atomically at the head of the list.
* Useful when cancelling messages and guaranteeing ordering
Modified: trunk/src/main/org/jboss/messaging/core/server/QueueFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/QueueFactory.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/QueueFactory.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.server;
import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.util.SimpleString;
/**
@@ -39,4 +40,10 @@
{
Queue createQueue(long persistenceID, SimpleString name, Filter filter,
boolean durable);
+
+ /**
+ * This is required for delete-all-reference to work correctly with paging
+ * @param postOffice
+ */
+ void setPostOffice(PostOffice postOffice);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -52,5 +52,7 @@
int getRefCount();
ServerMessage copy();
+
+ int getMemoryEstimate();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -49,8 +49,6 @@
private static final Logger log = Logger.getLogger(MessageReferenceImpl.class);
// Attributes ----------------------------------------------------
-
- private boolean trace = log.isTraceEnabled();
private volatile int deliveryCount;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -193,7 +193,7 @@
PagingStoreFactory storeFactory = new PagingManagerFactoryNIO(configuration.getPagingDirectory());
- pagingManager = new PagingManagerImpl(storeFactory, storageManager, queueSettingsRepository);
+ pagingManager = new PagingManagerImpl(storeFactory, storageManager, queueSettingsRepository, configuration.getMaxGlobalSizeBytes());
storeFactory.setPagingManager(pagingManager);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -25,6 +25,7 @@
import java.util.concurrent.ScheduledExecutorService;
import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -45,6 +46,9 @@
private final ScheduledExecutorService scheduledExecutor;
+ /** This is required for delete-all-reference to work correctly with paging, and controlling global-size */
+ private PostOffice postOffice;
+
public QueueFactoryImpl(final ScheduledExecutorService scheduledExecutor,
final HierarchicalRepository<QueueSettings> queueSettingsRepository)
{
@@ -53,12 +57,17 @@
this.scheduledExecutor = scheduledExecutor;
}
+ public void setPostOffice(PostOffice postOffice)
+ {
+ this.postOffice = postOffice;
+ }
+
public Queue createQueue(final long persistenceID, final SimpleString name, final Filter filter,
final boolean durable)
{
QueueSettings queueSettings = queueSettingsRepository.getMatch(name.toString());
- Queue queue = new QueueImpl(persistenceID, name, filter, queueSettings.isClustered(), durable, queueSettings, scheduledExecutor);
+ Queue queue = new QueueImpl(persistenceID, name, filter, queueSettings.isClustered(), durable, scheduledExecutor, postOffice);
queue.setDistributionPolicy(queueSettings.getDistributionPolicy());
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -88,6 +88,8 @@
private final boolean durable;
private final ScheduledExecutorService scheduledExecutor;
+
+ private final PostOffice postOffice;
private final PriorityLinkedList<MessageReference> messageReferences =
new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
@@ -118,14 +120,12 @@
private final Lock lock = new ReentrantLock(false);
- private final QueueSettings settings;
-
private volatile boolean backup;
public QueueImpl(final long persistenceID, final SimpleString name,
final Filter filter, final boolean clustered, final boolean durable,
- final QueueSettings settings,
- final ScheduledExecutorService scheduledExecutor)
+ final ScheduledExecutorService scheduledExecutor,
+ final PostOffice postOffice)
{
this.persistenceID = persistenceID;
@@ -139,7 +139,7 @@
this.scheduledExecutor = scheduledExecutor;
- this.settings = settings;
+ this.postOffice = postOffice;
direct = true;
}
@@ -148,11 +148,6 @@
// -------------------------------------------------------------------
- public QueueSettings getSettings()
- {
- return this.settings;
- }
-
public boolean isClustered()
{
return clustered;
@@ -472,7 +467,7 @@
public synchronized void deleteAllReferences(final StorageManager storageManager) throws Exception
{
- Transaction tx = new TransactionImpl(storageManager, null);
+ Transaction tx = new TransactionImpl(storageManager, postOffice);
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -508,7 +503,7 @@
{
boolean deleted = false;
- Transaction tx = new TransactionImpl(storageManager, null);
+ Transaction tx = new TransactionImpl(storageManager, postOffice);
Iterator<MessageReference> iter = messageReferences.iterator();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -137,7 +137,17 @@
{
return refCount.get();
}
+
+ public int getMemoryEstimate()
+ {
+ // This is just an estimate...
+ // due to memory alignments and JVM implementation this could be very different from reality
+ return getEncodeSize() +
+ (16 + 4) * 2 +// Each AtomicInteger consumes 16 bytes for the Object and ObjectReference + 4 bytes for the internal integer
+ 8; // MessageID
+ }
+
public ServerMessage copy()
{
return new ServerMessageImpl(this);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -1096,11 +1096,8 @@
SimpleString filterString = filter == null ? null : filter
.getFilterString();
-
- QueueSettings settings = queue.getSettings();
-
// TODO: Remove MAX-SIZE-BYTES from SessionQueueQueryResponse.
- response = new SessionQueueQueryResponseMessage(queue.isDurable(), settings.getMaxSizeBytes(),
+ response = new SessionQueueQueryResponseMessage(queue.isDurable(),
queue.getConsumerCount(), queue.getMessageCount(),
filterString, binding.getAddress());
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -57,8 +57,6 @@
long getSizeBytes();
- int getMaxSizeBytes();
-
int getMessageCount();
long getScheduledCount();
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/SubscriptionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/SubscriptionInfo.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/SubscriptionInfo.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -50,13 +50,13 @@
private static final String SUBSCRIPTION_TYPE_NAME = "SubscriptionInfo";
private static final String SUBSCRIPTION_TABULAR_TYPE_NAME = "SubscriptionTabularInfo";
private static final String[] ITEM_NAMES = new String[] { "queueName", "clientID",
- "name", "durable", "selector", "messageCount", "maxSizeBytes" };
+ "name", "durable", "selector", "messageCount" };
private static final String[] ITEM_DESCRIPTIONS = new String[] {
"ID of the subscription", "ClientID of the subscription",
"name of the subscription", "Is the subscriber durable?", "Selector",
- "Number of messages", "Maximum size in bytes" };
+ "Number of messages" };
private static final OpenType[] ITEM_TYPES = new OpenType[] { STRING,
- STRING, STRING, BOOLEAN, STRING, INTEGER, INTEGER };
+ STRING, STRING, BOOLEAN, STRING, INTEGER};
static
{
@@ -78,7 +78,6 @@
private final boolean durable;
private final String selector;
private final int messageCount;
- private final int maxSizeBytes;
// Static --------------------------------------------------------
@@ -111,7 +110,7 @@
public SubscriptionInfo(final String queueName, final String clientID,
final String name, final boolean durable, final String selector,
- final int messageCount, final int maxSizeBytes)
+ final int messageCount)
{
this.queueName = queueName;
this.clientID = clientID;
@@ -119,7 +118,6 @@
this.durable = durable;
this.selector = selector;
this.messageCount = messageCount;
- this.maxSizeBytes = maxSizeBytes;
}
// Public --------------------------------------------------------
@@ -154,17 +152,12 @@
return messageCount;
}
- public int getMaxSizeBytes()
- {
- return maxSizeBytes;
- }
-
public CompositeData toCompositeData()
{
try
{
return new CompositeDataSupport(TYPE, ITEM_NAMES, new Object[] { queueName,
- clientID, name, durable, selector, messageCount, maxSizeBytes });
+ clientID, name, durable, selector, messageCount});
} catch (OpenDataException e)
{
return null;
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -149,11 +149,6 @@
return coreQueue.getDeliveringCount();
}
- public int getMaxSizeBytes()
- {
- return coreQueue.getSettings().getMaxSizeBytes();
- }
-
public long getScheduledCount()
{
return coreQueue.getScheduledCount();
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -245,14 +245,12 @@
clientID = pair.a;
subName = pair.b;
}
-
- QueueSettings queueSettings = queue.getSettings();
String filter = queue.getFilter() != null ? queue.getFilter()
.getFilterString().toString() : null;
SubscriptionInfo info = new SubscriptionInfo(queue.getName().toString(),
clientID, subName, queue.isDurable(), filter, queue
- .getMessageCount(), queueSettings.getMaxSizeBytes());
+ .getMessageCount());
subInfos.add(info);
}
return (SubscriptionInfo[]) subInfos.toArray(new SubscriptionInfo[subInfos
Modified: trunk/src/main/org/jboss/messaging/util/TypedProperties.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TypedProperties.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/util/TypedProperties.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -48,7 +48,6 @@
import java.util.Map;
import java.util.Set;
-import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -62,7 +61,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class TypedProperties implements EncodingSupport
+public class TypedProperties
{
private static final Logger log = Logger.getLogger(TypedProperties.class);
Modified: trunk/src/main/org/jboss/messaging/util/VariableLatch.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/VariableLatch.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/src/main/org/jboss/messaging/util/VariableLatch.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -28,16 +28,17 @@
/**
*
- * This class will use the framework provided to by AbstractQueuedSynchronizer.
- * AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.
+ * <p>This class will use the framework provided to by AbstractQueuedSynchronizer.</p>
+ * <p>AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.</p>
*
- * The idea is, instead of providing each user specific Latch/Synchronization, java.util.concurrent provides the framework for reuses, based on an AtomicInteger (getState())
+ * <p>The idea is, instead of providing each user specific Latch/Synchronization, java.util.concurrent provides the framework for reuses, based on an AtomicInteger (getState())</p>
*
- * On JBossMessaging we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown.
+ * <p>On JBossMessaging we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown.</p>
*
- * Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.
- * For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.
+ * <p>Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.</p>
*
+ * <p>For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.</p>
+ *
* @author Clebert Suconic
* */
public class VariableLatch
Added: trunk/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -0,0 +1,155 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.base;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ *
+ * Base class with basic utilities on starting up a basic server
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class IntegrationTestBase extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+ protected static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+
+ protected String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/journal";
+ protected String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/bindings";
+ protected String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/page";
+ protected MessagingService messagingService;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void clearData()
+ {
+ File file = new File(journalDir);
+ File file2 = new File(bindingsDir);
+ File file3 = new File(pageDir);
+ deleteDirectory(file);
+ file.mkdirs();
+ deleteDirectory(file2);
+ file2.mkdirs();
+ deleteDirectory(file3);
+ file3.mkdirs();
+ }
+
+
+ protected MessagingService createService(Configuration configuration, Map<String, QueueSettings> settings)
+ {
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ MessagingService service = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+
+ for (Map.Entry<String, QueueSettings> setting: settings.entrySet())
+ {
+ service.getServer().getQueueSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+ }
+
+
+ return service;
+ }
+
+ protected MessagingService createService()
+ {
+ return createService(createDefaultConfig(), new HashMap<String, QueueSettings>());
+ }
+
+
+ protected Configuration createDefaultConfig()
+ {
+ Configuration configuration = new ConfigurationImpl();
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setPagingDirectory(pageDir);
+
+ return configuration;
+ }
+
+
+ protected ClientSessionFactory createFactory()
+ {
+ return new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ }
+
+ protected ClientMessage createTextMessage(ClientSession session, String s)
+ {
+ return createTextMessage(session, s, true);
+ }
+
+ protected ClientMessage createTextMessage(ClientSession session, String s, boolean durable)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString(s);
+ message.getBody().flip();
+ return message;
+ }
+
+ protected ClientMessage createBytesMessage(ClientSession session, byte[] b, boolean durable)
+ {
+ ClientMessage message = session.createClientMessage(JBossBytesMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
+ message.getBody().putBytes(b);
+ message.getBody().flip();
+ return message;
+ }
+
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -69,7 +69,7 @@
PagingManagerImpl managerImpl =
- new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir), null, queueSettings);
+ new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir), null, queueSettings, -1);
managerImpl.start();
PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
@@ -108,12 +108,12 @@
QueueSettings simpleTestSettings = new QueueSettings();
simpleTestSettings.setDropMessagesWhenFull(true);
- simpleTestSettings.setMaxSizeBytes(150);
+ simpleTestSettings.setMaxSizeBytes(200);
queueSettings.addMatch("simple-test", simpleTestSettings);
PagingManagerImpl managerImpl =
- new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir), null, queueSettings);
+ new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir), null, queueSettings, -1);
managerImpl.start();
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));
Added: trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -0,0 +1,211 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.stress.paging;
+
+import java.util.HashMap;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.integration.base.IntegrationTestBase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * TODO: This is an integration-tests that will take some time to run.
+ * Maybe it should be placed on stress-tests?
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class MultipleDestinationPagingTest extends IntegrationTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ MessagingService service;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ public void testGlobalPage() throws Exception
+ {
+ testPage(true);
+ }
+
+ public void testRegularPage() throws Exception
+ {
+ testPage(false);
+ }
+
+ public void testPage(boolean globalPage) throws Exception
+ {
+ Configuration config = createDefaultConfig();
+
+ HashMap <String, QueueSettings> settings = new HashMap<String, QueueSettings>();
+
+ if (globalPage)
+ {
+ config.setMaxGlobalSizeBytes(20*1024*1024);
+ }
+ else
+ {
+ QueueSettings setting = new QueueSettings();
+ setting.setMaxSizeBytes(20*1024*1024);
+ settings.put("page-adr", setting);
+ }
+
+ service = createService(config, settings);
+ service.start();
+
+ ClientSessionFactory factory = createFactory();
+ ClientSession session = null;
+
+ try
+ {
+
+ session = factory.createSession(false, false, false, -1, false);
+
+ SimpleString address = new SimpleString("page-adr");
+ SimpleString queue[] = new SimpleString[]{new SimpleString("queue1"), new SimpleString("queue2")};
+
+ session.createQueue(address, queue[0], null, true, false);
+ session.createQueue(address, queue[1], null, true, false);
+
+ ClientProducer prod = session.createProducer(address);
+
+ ClientMessage message = createBytesMessage(session, new byte[700], false);
+
+ int NUMBER_OF_MESSAGES = 60000;
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ if (i % 10000 == 0) System.out.println(i);
+ prod.send(message);
+ }
+
+ session.commit();
+
+ session.start();
+
+ int counters[] = new int[2];
+
+ ClientConsumer consumers[] = new ClientConsumer[] {session.createConsumer(queue[0]), session.createConsumer(queue[1])};
+
+ int reads = 0;
+
+ while (true)
+ {
+ int msgs1 = readMessages(session, consumers[0], queue[0]);
+ if (reads ++ == 0)
+ {
+ assertTrue(reads > 0 && reads < NUMBER_OF_MESSAGES);
+ }
+ int msgs2 = readMessages(session, consumers[1], queue[1]);
+ counters[0] += msgs1;
+ counters[1] += msgs2;
+
+ System.out.println("msgs1 = " + msgs1 + " msgs2 = " + msgs2);
+
+ if (msgs1 + msgs2 == 0)
+ {
+ break;
+ }
+ }
+
+ consumers[0].close();
+ consumers[1].close();
+
+ assertEquals(NUMBER_OF_MESSAGES, counters[0]);
+ assertEquals(NUMBER_OF_MESSAGES, counters[1]);
+ }
+ finally
+ {
+ session.close();
+ service.stop();
+ }
+
+ }
+
+
+
+ private int readMessages(ClientSession session, ClientConsumer consumer, SimpleString queue)
+ throws MessagingException
+ {
+ session.start();
+ int msgs = 0;
+
+ ClientMessage msg = null;
+ do
+ {
+ msg = consumer.receive(1000);
+ session.acknowledge();
+ if (msg != null)
+ {
+ if (++msgs % 10000 == 0)
+ {
+ System.out.println("received " + msgs);
+ session.commit();
+
+ }
+ }
+ } while (msg != null);
+
+ session.commit();
+
+ return msgs;
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ clearData();
+ }
+
+
+
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -37,7 +37,6 @@
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.impl.QueueImpl;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeConsumer;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -83,7 +82,7 @@
public void testScheduledNoConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, scheduledExecutor, null);
//Send one scheduled
@@ -149,7 +148,7 @@
private void testScheduled(boolean direct)
{
- Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, scheduledExecutor, null);
FakeConsumer consumer = null;
@@ -246,7 +245,7 @@
public void testDeleteAllReferences() throws Exception
{
- Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, new SimpleString("queue1"), null, false, true, scheduledExecutor, null);
StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
@@ -337,7 +336,7 @@
public void testDeliveryScheduled() throws Exception
{
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
final CountDownLatch countDownLatch = new CountDownLatch(1);
EasyMock.expect(consumer.handle(messageReference)).andAnswer(new IAnswer<HandleStatus>()
@@ -362,7 +361,7 @@
public void testDeliveryScheduledBusyConsumer() throws Exception
{
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
final CountDownLatch countDownLatch = new CountDownLatch(1);
EasyMock.expect(consumer.handle(messageReference)).andAnswer(new IAnswer<HandleStatus>()
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -68,7 +68,7 @@
queueSettings.setDefault(new QueueSettings());
PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- PagingManagerImpl manager = new PagingManagerImpl(spi, null, queueSettings);
+ PagingManagerImpl manager = new PagingManagerImpl(spi, null, queueSettings, -1);
SimpleString destination = new SimpleString("some-destination");
@@ -121,7 +121,7 @@
public void testMultipleThreadsGetStore() throws Exception
{
PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- final PagingManagerImpl manager = new PagingManagerImpl(spi, null, repoSettings);
+ final PagingManagerImpl manager = new PagingManagerImpl(spi, null, repoSettings, -1);
final SimpleString destination = new SimpleString("some-destination");
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -68,6 +68,8 @@
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.replay(pm, qf);
@@ -85,6 +87,8 @@
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.replay(pm, qf);
@@ -116,6 +120,8 @@
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(new LoadBindingsIAnswer(bindingArrayList, null));
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
@@ -161,6 +167,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(new LoadBindingsIAnswer(bindingArrayList, null));
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
@@ -204,6 +213,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(new LoadBindingsIAnswer(bindingArrayList, null));
@@ -253,6 +265,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(new LoadBindingsIAnswer(bindingArrayList, null));
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
@@ -296,6 +311,9 @@
EasyMock.expect(pgm.getPageStore(address1)).andReturn(store);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
EasyMock.expectLastCall().andAnswer(new LoadBindingsIAnswer(bindingArrayList, dests));
EasyMock.expect(pm.addDestination(address1)).andReturn(true);
@@ -325,6 +343,9 @@
PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
ArrayList<Binding> bindingArrayList = new ArrayList<Binding>();
List<SimpleString> dests = new ArrayList<SimpleString>();
Binding[] bindings = new Binding[100];
@@ -385,6 +406,9 @@
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
ArrayList<Binding> bindingArrayList = new ArrayList<Binding>();
List<SimpleString> dests = new ArrayList<SimpleString>();
Binding[] bindings = new Binding[100];
@@ -442,6 +466,10 @@
PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
+
ArrayList<Binding> bindingArrayList = new ArrayList<Binding>();
List<SimpleString> dests = new ArrayList<SimpleString>();
Binding[] bindings = new Binding[100];
@@ -500,6 +528,10 @@
PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
+
ArrayList<Binding> bindingArrayList = new ArrayList<Binding>();
List<SimpleString> dests = new ArrayList<SimpleString>();
Binding[] bindings = new Binding[100];
@@ -736,6 +768,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(pm.addDestination(address)).andReturn(true);
@@ -759,6 +794,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(pm.addDestination(address)).andReturn(true);
@@ -788,6 +826,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(pm.addDestination(address)).andReturn(true);
@@ -814,6 +855,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(pm.addDestination(address)).andReturn(true);
@@ -848,13 +892,16 @@
{
SimpleString queueName = new SimpleString("testQueueName");
StorageManager pm = EasyMock.createStrictMock(StorageManager.class);
- QueueFactory qf = EasyMock.createStrictMock(QueueFactory.class);
+ QueueFactory qf = EasyMock.createMock(QueueFactory.class);
Filter filter = EasyMock.createStrictMock(Filter.class);
Queue queue = EasyMock.createStrictMock(Queue.class);
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, true)).andReturn(queue);
@@ -885,6 +932,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, true)).andReturn(queue);
@@ -925,6 +975,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, false)).andReturn(queue);
@@ -954,6 +1007,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, false)).andReturn(queue);
@@ -991,6 +1047,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, true)).andReturn(queue);
@@ -1028,6 +1087,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, true)).andReturn(queue);
@@ -1062,6 +1124,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, true)).andReturn(queue);
@@ -1110,6 +1175,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, false)).andReturn(queue);
@@ -1142,6 +1210,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, false)).andReturn(queue);
@@ -1185,6 +1256,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(queue.getName()).andStubReturn(queueName);
@@ -1214,6 +1288,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(message.getDestination()).andStubReturn(new SimpleString("testtDestination"));
@@ -1242,6 +1319,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
EasyMock.expect(message.getDestination()).andStubReturn(new SimpleString("testtDestination"));
@@ -1266,6 +1346,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
SimpleString address = new SimpleString("testtDestination");
@@ -1310,7 +1393,10 @@
EasyMock.expect(pgm.addSize(EasyMock.isA(ServerMessage.class))).andReturn(-1l);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
@@ -1347,6 +1433,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
SimpleString address = new SimpleString("testtDestination");
@@ -1381,6 +1470,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
SimpleString address = new SimpleString("testtDestination");
@@ -1418,6 +1510,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
SimpleString address = new SimpleString("testtDestination");
@@ -1474,6 +1569,9 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null);
+
+ qf.setPostOffice(postOffice);
+
pm.loadBindings(EasyMock.eq(qf), (List<Binding>) EasyMock.anyObject(), (List<SimpleString>) EasyMock.anyObject());
pm.loadMessages(EasyMock.eq(postOffice), (Map<Long, Queue>) EasyMock.anyObject(), (ResourceManager) EasyMock.anyObject());
SimpleString address = new SimpleString("testtDestination");
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -78,7 +78,7 @@
{
final long id = 123;
- Queue queue = new QueueImpl(id, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(id, queue1, null, false, true, scheduledExecutor, null);
assertEquals(id, queue.getPersistenceID());
@@ -93,29 +93,29 @@
{
final SimpleString name = new SimpleString("oobblle");
- Queue queue = new QueueImpl(1, name, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, name, null, false, true, scheduledExecutor, null);
assertEquals(name, queue.getName());
}
public void testClustered()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
assertFalse(queue.isClustered());
- queue = new QueueImpl(1, queue1, null, true, true, new QueueSettings(), scheduledExecutor);
+ queue = new QueueImpl(1, queue1, null, true, true, scheduledExecutor, null);
assertTrue(queue.isClustered());
}
public void testDurable()
{
- Queue queue = new QueueImpl(1, queue1, null, false, false, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, false, scheduledExecutor, null);
assertFalse(queue.isDurable());
- queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
assertTrue(queue.isDurable());
}
@@ -128,7 +128,7 @@
Consumer cons3 = new FakeConsumer();
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
assertEquals(0, queue.getConsumerCount());
@@ -169,7 +169,7 @@
public void testGetSetDistributionPolicy()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
assertNotNull(queue.getDistributionPolicy());
@@ -184,14 +184,14 @@
public void testGetFilter()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
assertNull(queue.getFilter());
Filter filter = createMock(Filter.class);
replay(filter);
- queue = new QueueImpl(1, queue1, filter, false, true, new QueueSettings(), scheduledExecutor);
+ queue = new QueueImpl(1, queue1, filter, false, true, scheduledExecutor, null);
assertEquals(filter, queue.getFilter());
@@ -200,7 +200,7 @@
public void testSimpleAddLast()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
final int numMessages = 10;
@@ -219,7 +219,7 @@
public void testSimpleDirectDelivery()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
FakeConsumer consumer = new FakeConsumer();
@@ -247,7 +247,7 @@
public void testSimpleNonDirectDelivery()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
final int numMessages = 10;
@@ -285,7 +285,7 @@
public void testBusyConsumer()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
FakeConsumer consumer = new FakeConsumer();
@@ -329,7 +329,7 @@
public void testBusyConsumerThenAddMoreMessages()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
FakeConsumer consumer = new FakeConsumer();
@@ -396,7 +396,7 @@
public void testAddFirstAddLast()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
final int numMessages = 10;
@@ -451,7 +451,7 @@
public void testChangeConsumersAndDeliver() throws Exception
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
final int numMessages = 10;
@@ -605,7 +605,7 @@
public void testConsumerReturningNull()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
class NullConsumer implements Consumer
{
@@ -633,7 +633,7 @@
public void testRoundRobinWithQueueing()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributionPolicy);
@@ -678,7 +678,7 @@
public void testRoundRobinDirect()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributionPolicy);
@@ -721,7 +721,7 @@
public void testDeleteAllReferences() throws Exception
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
@@ -805,7 +805,7 @@
public void testWithPriorities()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
final int numMessages = 10;
@@ -872,7 +872,7 @@
public void testConsumerWithFilterAddAndRemove()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
Filter filter = new FakeFilter("fruit", "orange");
@@ -881,7 +881,7 @@
public void testList()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
final int numMessages = 20;
@@ -905,7 +905,7 @@
public void testListWithFilter()
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
final int numMessages = 20;
@@ -941,7 +941,7 @@
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
Filter filter = new FakeFilter("fruit", "orange");
@@ -1014,7 +1014,7 @@
private void testConsumerWithFilters(boolean direct) throws Exception
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
Filter filter = new FakeFilter("fruit", "orange");
@@ -1103,7 +1103,7 @@
public void testMessageOrder() throws Exception
{
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1121,7 +1121,7 @@
public void testMessagesAdded() throws Exception
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1134,7 +1134,7 @@
public void testAddLastWhenLocked() throws Exception
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
queue.lock();
CountDownLatch countDownLatch = new CountDownLatch(1);
@@ -1150,7 +1150,7 @@
public void testAddLastWhenLockedMultiple() throws Exception
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1176,7 +1176,7 @@
public void testAddFirstWhenLocked() throws Exception
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
queue.lock();
CountDownLatch countDownLatch = new CountDownLatch(1);
@@ -1192,7 +1192,7 @@
public void testAddFirstWhenLockedMultiple() throws Exception
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1218,7 +1218,7 @@
public void testAddListFirst() throws Exception
{
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1239,7 +1239,7 @@
public void testRemoveReferenceWithId() throws Exception
{
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1260,7 +1260,7 @@
public void testGetReference() throws Exception
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1273,7 +1273,7 @@
public void testGetNonExistentReference() throws Exception
{
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1287,7 +1287,7 @@
public void testConsumerRemovedAfterException() throws Exception
{
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1307,7 +1307,7 @@
public void testDeliveryAsync() throws Exception
{
Consumer consumer = EasyMock.createStrictMock(Consumer.class);
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1336,7 +1336,7 @@
{
long messageID = randomLong();
final SimpleString expiryQueue = new SimpleString("expiryQueue");
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, messageID);
StorageManager storageManager = EasyMock.createMock(StorageManager.class);
EasyMock.expect(storageManager.generateTransactionID()).andReturn(randomLong());
@@ -1394,7 +1394,7 @@
{
long messageID = randomLong();
final SimpleString dlqName = new SimpleString("dlq");
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
MessageReference messageReference = generateReference(queue, messageID);
StorageManager storageManager = createMock(StorageManager.class);
expect(storageManager.generateTransactionID()).andReturn(randomLong());
@@ -1453,7 +1453,7 @@
long newMessageID = randomLong();
long tid = randomLong();
final SimpleString toQueueName = new SimpleString("toQueueName");
- Queue queue = new QueueImpl(1, queue1, null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue = new QueueImpl(1, queue1, null, false, true, scheduledExecutor, null);
Queue toQueue = createMock(Queue.class);
MessageReference messageReference = generateReference(queue, messageID);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -26,10 +26,10 @@
import java.util.concurrent.ScheduledExecutorService;
import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.impl.QueueImpl;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
/**
@@ -42,11 +42,19 @@
public class FakeQueueFactory implements QueueFactory
{
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+
+ private PostOffice postOffice;
public Queue createQueue(long persistenceID, SimpleString name, Filter filter,
boolean durable)
{
- return new QueueImpl(persistenceID, name, filter, false, durable, new QueueSettings(), scheduledExecutor);
+ return new QueueImpl(persistenceID, name, filter, false, durable, scheduledExecutor, postOffice);
}
+ public void setPostOffice(PostOffice postOffice)
+ {
+ this.postOffice = postOffice;
+
+ }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -535,13 +535,13 @@
public void testAckCommit() throws Exception
{
//Durable queue
- Queue queue1 = new QueueImpl(12, new SimpleString("queue1"), null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue1 = new QueueImpl(12, new SimpleString("queue1"), null, false, true, scheduledExecutor, null);
//Durable queue
- Queue queue2 = new QueueImpl(34, new SimpleString("queue2"), null, false, true, new QueueSettings(), scheduledExecutor);
+ Queue queue2 = new QueueImpl(34, new SimpleString("queue2"), null, false, true, scheduledExecutor, null);
//Non durable queue
- Queue queue3 = new QueueImpl(65, new SimpleString("queue3"), null, false, false, new QueueSettings(), scheduledExecutor);
+ Queue queue3 = new QueueImpl(65, new SimpleString("queue3"), null, false, false, scheduledExecutor, null);
//Some refs to ack
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossSessionTest.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -672,7 +672,7 @@
// isExists() will return true
SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(
- false, -1, 0, 1, null, destination.getSimpleAddress());
+ false, 0, 1, null, destination.getSimpleAddress());
expect(mockClientSession.queueQuery(destination.getSimpleAddress()))
.andReturn(resp);
expect(
@@ -1032,7 +1032,7 @@
.andReturn(bindingResp);
// already 1 durable subscriber
SessionQueueQueryResponseMessage queryResp =
- new SessionQueueQueryResponseMessage(true, -1, 1, 0, null, topic.getSimpleAddress());
+ new SessionQueueQueryResponseMessage(true, 1, 0, null, topic.getSimpleAddress());
expect(mockClientSession.queueQuery(isA(SimpleString.class))).andReturn(queryResp);
replay(sf, mockClientSession);
@@ -1109,7 +1109,7 @@
.andReturn(bindingResp);
// isExists will return true
SessionQueueQueryResponseMessage queryResp =
- new SessionQueueQueryResponseMessage(true, -1, 0, 0, null, topic.getSimpleAddress());
+ new SessionQueueQueryResponseMessage(true, 0, 0, null, topic.getSimpleAddress());
expect(mockClientSession.queueQuery(isA(SimpleString.class))).andReturn(queryResp);
expect(
mockClientSession.createConsumer(isA(SimpleString.class),
@@ -1148,7 +1148,7 @@
.andReturn(bindingResp);
// isExists will return true
SessionQueueQueryResponseMessage queryResp =
- new SessionQueueQueryResponseMessage(true, -1, 0, 0, null, oldTopic.getSimpleAddress());
+ new SessionQueueQueryResponseMessage(true, 0, 0, null, oldTopic.getSimpleAddress());
expect(mockClientSession.queueQuery(isA(SimpleString.class))).andReturn(queryResp);
// queue address of the old topic
mockClientSession.deleteQueue(isA(SimpleString.class));
@@ -1191,7 +1191,7 @@
.andReturn(bindingResp);
// isExists will return true
SessionQueueQueryResponseMessage queryResp =
- new SessionQueueQueryResponseMessage(true, -1, 0, 0, oldSelector, topic.getSimpleAddress());
+ new SessionQueueQueryResponseMessage(true, 0, 0, oldSelector, topic.getSimpleAddress());
expect(mockClientSession.queueQuery(isA(SimpleString.class))).andReturn(queryResp);
// queue address of the old topic
mockClientSession.deleteQueue(isA(SimpleString.class));
@@ -1377,7 +1377,7 @@
// isExists() will return true
SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(
- false, -1, -1, 1, null, queueAddress);
+ false, -1, 1, null, queueAddress);
expect(mockClientSession.queueQuery(queueAddress)).andReturn(resp);
replay(sf, mockClientSession);
@@ -1674,7 +1674,7 @@
// isExists() will return true
SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(
- false, -1, -1, 1, null, queueAddress);
+ false, -1, 1, null, queueAddress);
expect(mockClientSession.queueQuery(queueAddress)).andReturn(resp);
mockClientSession.removeDestination(queueAddress, false);
mockClientSession.deleteQueue(queueAddress);
@@ -1724,7 +1724,7 @@
SessionQueueQueryResponseMessage resp =
- new SessionQueueQueryResponseMessage(false, 0, consumerCount, 0, null, queueAddress);
+ new SessionQueueQueryResponseMessage(false, consumerCount, 0, null, queueAddress);
expect(mockClientSession.queueQuery(queueAddress)).andReturn(resp);
replay(sf, mockClientSession);
@@ -1946,7 +1946,7 @@
String subName = randomString();
String clientID = randomString();
SimpleString queueAddres = new SimpleString(JBossTopic.createQueueNameForDurableSubscription(clientID, subName));
- SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(false, 0, 0, 0, null, queueAddres);
+ SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(false, 0, 0, null, queueAddres);
expect(mockClientSession.queueQuery(queueAddres)).andReturn(resp );
mockClientSession.deleteQueue(queueAddres);
replay(sf, mockClientSession);
@@ -1991,7 +1991,7 @@
int consumerCount = 1;
SessionQueueQueryResponseMessage resp =
- new SessionQueueQueryResponseMessage(true, 0, consumerCount, 0, null, queueAddres);
+ new SessionQueueQueryResponseMessage(true, consumerCount, 0, null, queueAddres);
expect(mockClientSession.queueQuery(isA(SimpleString.class))).andReturn(resp );
replay(sf, mockClientSession);
@@ -2083,7 +2083,7 @@
// isExists() will return true
SessionQueueQueryResponseMessage resp = new SessionQueueQueryResponseMessage(
- false, -1, 0, 1, null, destination.getSimpleAddress());
+ false, 0, 1, null, destination.getSimpleAddress());
expect(mockClientSession.queueQuery(destination.getSimpleAddress()))
.andReturn(resp);
expect(
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/SubscriptionInfoTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/SubscriptionInfoTest.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/SubscriptionInfoTest.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -58,7 +58,6 @@
assertEquals(expected.isDurable(), actual.get("durable"));
assertEquals(expected.getSelector(), actual.get("selector"));
assertEquals(expected.getMessageCount(), actual.get("messageCount"));
- assertEquals(expected.getMaxSizeBytes(), actual.get("maxSizeBytes"));
}
// Constructors --------------------------------------------------
@@ -68,8 +67,7 @@
public void testToCompositeData() throws Exception
{
SubscriptionInfo info = new SubscriptionInfo(randomString(), randomString(),
- randomString(), randomBoolean(), randomString(), randomInt(),
- randomInt());
+ randomString(), randomBoolean(), randomString(), randomInt());
CompositeData data = info.toCompositeData();
assertEquals(info, data);
@@ -78,11 +76,9 @@
public void testToTabularData() throws Exception
{
SubscriptionInfo info_1 = new SubscriptionInfo(randomString(), randomString(),
- randomString(), randomBoolean(), randomString(), randomInt(),
- randomInt());
+ randomString(), randomBoolean(), randomString(), randomInt());
SubscriptionInfo info_2 = new SubscriptionInfo(randomString(), randomString(),
- randomString(), randomBoolean(), randomString(), randomInt(),
- randomInt());
+ randomString(), randomBoolean(), randomString(), randomInt());
SubscriptionInfo[] infos = new SubscriptionInfo[] { info_1, info_2 };
TabularData data = SubscriptionInfo.toTabularData(infos);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java 2008-09-12 04:51:48 UTC (rev 4937)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java 2008-09-13 03:51:32 UTC (rev 4938)
@@ -284,11 +284,8 @@
JBossTopic topic = new JBossTopic(name);
PostOffice postOffice = createMock(PostOffice.class);
StorageManager storageManager = createMock(StorageManager.class);
-
- QueueSettings settings = new QueueSettings();
Queue durableQueue = createMock(Queue.class);
- expect(durableQueue.getSettings()).andStubReturn(settings);
expect(durableQueue.getName()).andStubReturn(
JBossTopic.createAddressFromName(randomString()));
expect(durableQueue.getFilter()).andStubReturn(null);
@@ -298,7 +295,6 @@
expect(bindingForDurableQueue.getQueue()).andStubReturn(durableQueue);
Queue nonDurableQueue = createMock(Queue.class);
- expect(nonDurableQueue.getSettings()).andStubReturn(settings);
expect(nonDurableQueue.getName()).andStubReturn(
JBossTopic.createAddressFromName(randomString()));
expect(nonDurableQueue.getFilter()).andStubReturn(null);
@@ -342,7 +338,6 @@
StorageManager storageManager = createMock(StorageManager.class);
Queue durableQueue = createMock(Queue.class);
- expect(durableQueue.getSettings()).andStubReturn(settings);
expect(durableQueue.getName()).andStubReturn(
JBossTopic.createAddressFromName(randomString()));
expect(durableQueue.getFilter()).andStubReturn(null);
@@ -352,7 +347,6 @@
expect(bindingForDurableQueue.getQueue()).andStubReturn(durableQueue);
Queue nonDurableQueue = createMock(Queue.class);
- expect(nonDurableQueue.getSettings()).andStubReturn(settings);
expect(nonDurableQueue.getName()).andStubReturn(
JBossTopic.createAddressFromName(randomString()));
expect(nonDurableQueue.getFilter()).andStubReturn(null);
More information about the jboss-cvs-commits
mailing list