[jboss-cvs] JBoss Messaging SVN: r5520 - in trunk: src/main/org/jboss/messaging/core/client/impl and 16 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Dec 11 22:59:02 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-12-11 22:59:01 -0500 (Thu, 11 Dec 2008)
New Revision: 5520
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1467 & Several tweaks on Paging, ExpiryDelivery and added a bunch of more tests
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/config/jbm-configuration.xml 2008-12-12 03:59:01 UTC (rev 5520)
@@ -156,6 +156,8 @@
<!-- Paging configuration -->
+ <paging-max-threads>10</paging-max-threads>
+
<paging-directory>data/paging</paging-directory>
<paging-max-global-size-bytes>104857600</paging-max-global-size-bytes>
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -674,6 +674,8 @@
"-" +
getID() +
".jbm"));
+
+ cloneMessage.setFlowControlSize(message.getFlowControlSize());
addBytesBody(cloneMessage, message.getBody().array());
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -111,14 +111,11 @@
public int getFlowControlSize()
{
- if (flowControlSize > 0)
+ if (flowControlSize < 0)
{
- return flowControlSize;
+ throw new IllegalStateException("Flow Control hasn't been set");
}
- else
- {
- return getEncodeSize();
- }
+ return flowControlSize;
}
public void setFlowControlSize(final int flowControlSize)
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -81,6 +81,7 @@
}
else
{
+ message.getClientMessage().setFlowControlSize(packet.getPacketSize());
clientSession.handleReceiveMessage(message.getConsumerID(), message.getClientMessage());
}
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -56,6 +56,7 @@
void setQueueActivationTimeout(long timeout);
+
int getScheduledThreadPoolMaxSize();
void setScheduledThreadPoolMaxSize(int maxSize);
@@ -178,6 +179,10 @@
// Paging Properties --------------------------------------------------------------------
+ int getPagingMaxThreads();
+
+ void setPagingMaxThread(int pagingMaxThreads);
+
String getPagingDirectory();
void setPagingDirectory(String dir);
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -65,6 +65,8 @@
public static final String DEFAULT_PAGING_DIR = "data/paging";
+ public static final int DEFAULT_PAGE_MAX_THREADS = 10;
+
public static final long DEFAULT_PAGE_SIZE = 10 * 1024 * 1024;
public static final String DEFAULT_LARGE_MESSAGES_DIR = "data/largemessages";
@@ -169,6 +171,8 @@
protected String pagingDirectory = DEFAULT_PAGING_DIR;
+ protected int pagingMaxThreads = DEFAULT_PAGE_MAX_THREADS;
+
// File related attributes -----------------------------------------------------------
@@ -402,7 +406,18 @@
{
return journalType;
}
+
+ public int getPagingMaxThreads()
+ {
+ return pagingMaxThreads;
+ }
+
+ public void setPagingMaxThread(final int pagingMaxThreads)
+ {
+ this.pagingMaxThreads = pagingMaxThreads;
+ }
+
public void setPagingDirectory(final String dir)
{
pagingDirectory = dir;
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -226,6 +226,8 @@
journalDirectory = getString(e, "journal-directory", journalDirectory);
+ pagingMaxThreads = getInteger(e, "paging-max-threads", pagingMaxThreads);
+
pagingDirectory = getString(e, "paging-directory", pagingDirectory);
pagingMaxGlobalSize = getLong(e, "paging-max-global-size-bytes", pagingMaxGlobalSize);
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -26,6 +26,7 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingComponent;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.util.SimpleString;
@@ -103,7 +104,7 @@
* Point to inform/restoring Transactions used when the messages were added into paging
* */
void addTransaction(PageTransactionInfo pageTransaction);
-
+
/**
* Point to inform/restoring Transactions used when the messages were added into paging
* */
@@ -123,9 +124,13 @@
void messageDone(ServerMessage message) throws Exception;
/** To be called when an message is being added to the address.
- * @return the current size of the queue, or -1 if the queue is full and it should drop the message */
- long addSize(ServerMessage message) throws Exception;
+ * @return false is the address is full */
+ boolean addSize(ServerMessage message) throws Exception;
+ void removeSize(ServerMessage message) throws Exception;
+
+ void removeSize(MessageReference reference) throws Exception;
+
/** Sync current-pages on disk for these destinations */
void sync(Collection<SimpleString> destinationsToSync) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -82,5 +82,5 @@
* @return
* @throws Exception
*/
- long addSize(long memoryEstimate) throws Exception;
+ boolean addSize(long memoryEstimate) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -36,6 +36,7 @@
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -194,11 +195,21 @@
getPageStore(message.getDestination()).addSize(message.getMemoryEstimate() * -1);
}
- public long addSize(final ServerMessage message) throws Exception
+ public boolean addSize(final ServerMessage message) throws Exception
{
return getPageStore(message.getDestination()).addSize(message.getMemoryEstimate());
}
+ public void removeSize(final ServerMessage message) throws Exception
+ {
+ getPageStore(message.getDestination()).addSize(-message.getMemoryEstimate());
+ }
+
+ public void removeSize(final MessageReference reference) throws Exception
+ {
+ getPageStore(reference.getMessage().getDestination()).addSize(-reference.getMemoryEstimate());
+ }
+
public boolean page(final ServerMessage message, final long transactionId) throws Exception
{
// The sync on transactions is done on commit only
@@ -283,7 +294,7 @@
store.startDepaging(pagingSPI.getGlobalDepagerExecutor());
}
}
-
+
/* (non-Javadoc)
* @see org.jboss.messaging.core.paging.PagingManager#getGlobalSize()
*/
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -25,6 +25,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.journal.SequentialFileFactory;
@@ -70,11 +72,15 @@
// Constructors --------------------------------------------------
- public PagingStoreFactoryNIO(final String directory)
+ public PagingStoreFactoryNIO(final String directory, final int maxThreads)
{
+ System.out.println("maxThreads = " + maxThreads);
this.directory = directory;
- parentExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-depaging-threads"));
+ parentExecutor = new ThreadPoolExecutor(0, maxThreads,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ new JBMThreadFactory("JBM-depaging-threads"));
executorFactory = new OrderedExecutorFactory(parentExecutor);
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -37,6 +37,7 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PageTransactionInfo;
@@ -215,7 +216,7 @@
return storeName;
}
- public long addSize(final long size) throws Exception
+ public boolean addSize(final long size) throws Exception
{
final long maxSize = getMaxSizeBytes();
@@ -235,11 +236,12 @@
log.warn("Messages are being dropped on adress " + getStoreName());
}
- return -1l;
+ return false;
}
else
{
- return addAddressSize(size);
+ addAddressSize(size);
+ return true;
}
}
else
@@ -284,14 +286,14 @@
{
log.trace(" globalDepage = " + pagingManager.isGlobalPageMode() +
- "\n currentGlobalSize = " +
- currentGlobalSize +
- "\n defaultPageSize = " +
- pagingManager.getDefaultPageSize() +
- "\n maxGlobalSize = " +
- maxGlobalSize +
- "\n maxGlobalSize - defaultPageSize = " +
- (maxGlobalSize - pagingManager.getDefaultPageSize()));
+ "\n currentGlobalSize = " +
+ currentGlobalSize +
+ "\n defaultPageSize = " +
+ pagingManager.getDefaultPageSize() +
+ "\n maxGlobalSize = " +
+ maxGlobalSize +
+ "\n maxGlobalSize - defaultPageSize = " +
+ (maxGlobalSize - pagingManager.getDefaultPageSize()));
}
if (pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
@@ -307,7 +309,7 @@
}
}
- return addressSize;
+ return true;
}
}
@@ -434,7 +436,8 @@
else
{
// startDepaging and clearDepage needs to be atomic.
- // We can't use writeLock to this operation as writeLock would still be used by another thread, and still being a valid usage
+ // We can't use writeLock to this operation as writeLock would still be used by another thread, and still
+ // being a valid usage
synchronized (this)
{
if (!depaging.get())
@@ -779,8 +782,17 @@
}
}
- refsToAdd.addAll(postOffice.route(pagedMessage));
+ List<MessageReference> routedReferences = postOffice.route(pagedMessage);
+ Long scheduledDeliveryTime = (Long)pagedMessage.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+
+ if (scheduledDeliveryTime != null)
+ {
+ postOffice.scheduleReferences(depageTransactionID, scheduledDeliveryTime, routedReferences);
+ }
+
+ refsToAdd.addAll(routedReferences);
+
if (pagedMessage.getDurableRefCount() != 0)
{
storageManager.storeMessageTransactional(depageTransactionID, pagedMessage);
@@ -806,11 +818,7 @@
trace("Depage committed");
- for (MessageReference ref : refsToAdd)
- {
- ref.getQueue().addLast(ref);
- }
-
+ postOffice.deliver(refsToAdd);
}
/**
@@ -988,7 +996,8 @@
{
readPage();
}
- // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed because the page was full
+ // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
+ // because the page was full
if (!clearDepage())
{
followingExecutor.execute(this);
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -51,6 +51,7 @@
* route to.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+* @author <a href="mailto:csuconic at redhat.com">Clebert Suconic</a>
*
*/
public interface PostOffice extends MessagingComponent
@@ -70,8 +71,11 @@
Binding getBinding(SimpleString queueName);
+ /** Deliver references previously routed */
+ void deliver(List<MessageReference> references);
+
List<MessageReference> route(ServerMessage message) throws Exception;
-
+
//For testing only
Map<SimpleString, List<Binding>> getMappings();
@@ -84,4 +88,8 @@
SendLock getAddressLock(SimpleString address);
DuplicateIDCache getDuplicateIDCache(SimpleString address);
+
+ void scheduleReferences(long scheduledDeliveryTime, List<MessageReference> references) throws Exception;
+
+ void scheduleReferences( long transactionID, long scheduledDeliveryTime, List<MessageReference> references) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -37,7 +37,9 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.AddressManager;
import org.jboss.messaging.core.postoffice.Binding;
@@ -48,6 +50,7 @@
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.SendLock;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.MessageReferenceImpl;
import org.jboss.messaging.core.server.impl.SendLockImpl;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -61,6 +64,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="csuconic at redhat.com">Clebert Suconic</a>
*/
public class PostOfficeImpl implements PostOffice
{
@@ -97,7 +101,7 @@
private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<SimpleString, DuplicateIDCache>();
private final int idCacheSize;
-
+
private final boolean persistIDCache;
public PostOfficeImpl(final StorageManager storageManager,
@@ -144,7 +148,7 @@
this.backup = backup;
this.idCacheSize = idCacheSize;
-
+
this.persistIDCache = persistIDCache;
}
@@ -314,82 +318,96 @@
return addressManager.getBinding(queueName);
}
+ public void deliver(final List<MessageReference> references)
+ {
+ for (MessageReference ref : references)
+ {
+ ref.getQueue().addLast(ref);
+ }
+ }
+
public List<MessageReference> route(final ServerMessage message) throws Exception
{
- long size = pagingManager.addSize(message);
+ final PagingStore pagingStore = pagingManager.getPageStore(message.getDestination());
- if (size < 0)
+ SimpleString address = message.getDestination();
+
+ if (checkAllowable)
{
- return new ArrayList<MessageReference>();
- }
- else
- {
- SimpleString address = message.getDestination();
-
- if (checkAllowable)
+ if (!addressManager.containsDestination(address))
{
- if (!addressManager.containsDestination(address))
- {
- throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST,
- "Cannot route to address " + address);
- }
+ throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST,
+ "Cannot route to address " + address);
}
+ }
- List<Binding> bindings = addressManager.getBindings(address);
+ List<Binding> bindings = addressManager.getBindings(address);
- List<MessageReference> refs = new ArrayList<MessageReference>();
+ List<MessageReference> refs = new ArrayList<MessageReference>();
- if (bindings != null)
+ int refEstimate = 0;
+ if (bindings != null)
+ {
+ Binding theBinding = null;
+
+ long lowestRoutings = -1;
+
+ for (Binding binding : bindings)
{
- Binding theBinding = null;
+ Queue queue = binding.getQueue();
- long lowestRoutings = -1;
+ Filter filter = queue.getFilter();
- for (Binding binding : bindings)
+ if (filter == null || filter.match(message))
{
- Queue queue = binding.getQueue();
+ if (binding.isFanout())
+ {
+ // Fanout bindings always get the reference
+ MessageReference reference = message.createReference(queue);
- Filter filter = queue.getFilter();
+ refEstimate += reference.getMemoryEstimate();
- if (filter == null || filter.match(message))
+ refs.add(reference);
+ }
+ else
{
- if (binding.isFanout())
- {
- // Fanout bindings always get the reference
- MessageReference reference = message.createReference(queue);
+ // We choose the queue with the lowest routings value
+ // This gives us a weighted round robin, where the weight
+ // Can be determined from the number of consumers on the queue
+ long routings = binding.getRoutings();
- refs.add(reference);
- }
- else
+ if (routings < lowestRoutings || lowestRoutings == -1)
{
- // We choose the queue with the lowest routings value
- // This gives us a weighted round robin, where the weight
- // Can be determined from the number of consumers on the queue
- long routings = binding.getRoutings();
+ lowestRoutings = routings;
- if (routings < lowestRoutings || lowestRoutings == -1)
- {
- lowestRoutings = routings;
-
- theBinding = binding;
- }
+ theBinding = binding;
}
}
}
+ }
- if (theBinding != null)
- {
- MessageReference reference = message.createReference(theBinding.getQueue());
+ if (theBinding != null)
+ {
+ MessageReference reference = message.createReference(theBinding.getQueue());
- refs.add(reference);
+ refEstimate += reference.getMemoryEstimate();
- theBinding.incrementRoutings();
- }
+ refs.add(reference);
+ theBinding.incrementRoutings();
}
+ }
+
+ if (refs.size() > 0 && pagingStore.addSize(message.getMemoryEstimate() + refEstimate))
+ {
return refs;
}
+ else
+ {
+ return new ArrayList<MessageReference>();
+ }
+
}
public PagingManager getPagingManager()
@@ -457,7 +475,34 @@
return cache;
}
+
+
+ public void scheduleReferences(final long scheduledDeliveryTime, final List<MessageReference> references) throws Exception
+ {
+ scheduleReferences(-1, scheduledDeliveryTime, references);
+ }
+
+ public void scheduleReferences(final long transactionID, final long scheduledDeliveryTime, final List<MessageReference> references) throws Exception
+ {
+ for (MessageReference ref : references)
+ {
+ ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+ {
+ if (transactionID >= 0)
+ {
+ storageManager.updateScheduledDeliveryTimeTransactional(transactionID, ref);
+ }
+ else
+ {
+ storageManager.updateScheduledDeliveryTime(ref);
+ }
+ }
+ }
+ }
+
+
// Private -----------------------------------------------------------------
private Binding createBinding(final SimpleString address,
@@ -536,13 +581,13 @@
Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
storageManager.loadMessageJournal(this, queues, resourceManager, duplicateIDMap);
-
- for (Map.Entry<SimpleString, List<Pair<SimpleString, Long>>> entry: duplicateIDMap.entrySet())
+
+ for (Map.Entry<SimpleString, List<Pair<SimpleString, Long>>> entry : duplicateIDMap.entrySet())
{
SimpleString address = entry.getKey();
-
+
DuplicateIDCache cache = getDuplicateIDCache(address);
-
+
if (persistIDCache)
{
cache.load(entry.getValue());
Modified: trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessageReference.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/MessageReference.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -55,6 +55,7 @@
void setScheduledDeliveryTime(long scheduledDeliveryTime);
+ int getMemoryEstimate();
int getDeliveryCount();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -37,6 +37,7 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.DataConstants;
import org.jboss.messaging.util.SimpleString;
import java.util.List;
@@ -62,7 +63,11 @@
private ServerMessage message;
private Queue queue;
+
+ // Static --------------------------------------------------------
+
+
// Constructors --------------------------------------------------
public MessageReferenceImpl(final MessageReferenceImpl other, final Queue queue)
@@ -84,12 +89,19 @@
}
// MessageReference implementation -------------------------------
-
public MessageReference copy(final Queue queue)
{
return new MessageReferenceImpl(this, queue);
}
+ public int getMemoryEstimate()
+ {
+ // from few tests I have done, deliveryCount and scheduledDelivery will use two longs (because of alignment)
+ // and each of the references (messages and queue) will use the equivalent to two longs (because of long pointers).
+ // Anyway.. this is just an estimate
+ return DataConstants.SIZE_LONG * 4;
+ }
+
public int getDeliveryCount()
{
return deliveryCount;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -210,7 +210,8 @@
new JBMThreadFactory("JBM-scheduled-threads"));
queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
- pagingManager = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory()),
+ pagingManager = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+ configuration.getPagingMaxThreads()),
storageManager,
queueSettingsRepository,
configuration.getPagingMaxGlobalSizeBytes(),
@@ -295,7 +296,7 @@
clusterManager.start();
}
-
+
serverManagement = managementService.registerServer(postOffice,
storageManager,
configuration,
@@ -305,7 +306,6 @@
remotingService,
this);
-
log.info("Started messaging server");
started = true;
@@ -456,7 +456,7 @@
{
return started;
}
-
+
public ClusterManager getClusterManager()
{
return clusterManager;
@@ -620,7 +620,7 @@
if (backupConnectorFactory != null)
{
NoCacheConnectionLifeCycleListener listener = new NoCacheConnectionLifeCycleListener();
-
+
RemotingConnectionImpl replicatingConnection = (RemotingConnectionImpl)RemotingConnectionImpl.createConnection(backupConnectorFactory,
backupConnectorParams,
ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Consumer;
@@ -901,12 +902,22 @@
deliveringCount.decrementAndGet();
sizeBytes.addAndGet(-ref.getMessage().getEncodeSize());
+
+
+ // TODO: We could optimize this by storing the paging-store for the address on the Queue. We would need to know the Address for the Queue
+ PagingStore store = null;
+
+ if (pagingManager != null)
+ {
+ store = pagingManager.getPageStore(ref.getMessage().getDestination());
+ store.addSize(-ref.getMemoryEstimate());
+ }
if (ref.getMessage().decrementRefCount() == 0)
{
- if (pagingManager != null)
+ if (store != null)
{
- pagingManager.messageDone(ref.getMessage());
+ store.addSize(-ref.getMessage().getMemoryEstimate());
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -589,13 +589,15 @@
*/
private void sendStandardMessage(final MessageReference ref, final ServerMessage message)
{
+
+ final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
+
if (availableCredits != null)
{
- availableCredits.addAndGet(-message.getEncodeSize());
+ // RequiredBufferSize is the actual size for this packet
+ availableCredits.addAndGet(-packet.getRequiredBufferSize());
}
- final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
-
DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()));
if (result == null)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -2651,23 +2651,12 @@
storageManager.storeMessage(msg);
}
- // TODO - this code is also duplicated in transactionimpl and in depaging
- // it should all be centralised
-
- for (MessageReference ref : refs)
+ if (scheduledDeliveryTime != null)
{
- if (scheduledDeliveryTime != null)
- {
- ref.setScheduledDeliveryTime(scheduledDeliveryTime.longValue());
-
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
- {
- storageManager.updateScheduledDeliveryTime(ref);
- }
- }
-
- ref.getQueue().addLast(ref);
+ postOffice.scheduleReferences(scheduledDeliveryTime, refs);
}
+
+ postOffice.deliver(refs);
}
}
else
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -17,6 +17,7 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import javax.transaction.xa.Xid;
@@ -59,6 +60,12 @@
private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
+ /** List of destinations in page mode.
+ * Once a destination was considered in page, it should go toward paging until commit is called,
+ * even if the page-mode has changed, or messageOrder won't be respected */
+ private final Set<SimpleString> destinationsInPageMode = new HashSet<SimpleString>();
+
+ // FIXME: As part of https://jira.jboss.org/jira/browse/JBMESSAGING-1313
private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
private PageTransactionInfo pageTransaction;
@@ -165,9 +172,12 @@
{
throw new IllegalStateException("Transaction is in invalid state " + state);
}
+
+ SimpleString destination = message.getDestination();
- if (pagingManager.isPaging(message.getDestination()))
+ if (destinationsInPageMode.contains(destination) || pagingManager.isPaging(destination))
{
+ destinationsInPageMode.add(destination);
pagedMessages.add(message);
}
else
@@ -310,10 +320,7 @@
storageManager.commit(id);
}
- for (MessageReference ref : refsToAdd)
- {
- ref.getQueue().addLast(ref);
- }
+ postOffice.deliver(refsToAdd);
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
// transaction until all the messages were added to the queue
@@ -433,6 +440,7 @@
{
containsPersistent = true;
refsToAdd.addAll(messages);
+
this.acknowledgements.addAll(acknowledgements);
this.pageTransaction = pageTransaction;
@@ -494,7 +502,22 @@
}
toCancel.add(ref);
}
+
+ HashSet<ServerMessage> messagesAdded = new HashSet<ServerMessage>();
+
+ // We need to remove the sizes added on paging manager, for the messages that only exist here on the Transaction
+ for (MessageReference ref: this.refsToAdd)
+ {
+ messagesAdded.add(ref.getMessage());
+ pagingManager.getPageStore(ref.getMessage().getDestination()).addSize(-ref.getMemoryEstimate());
+ }
+
+ for (ServerMessage msg: messagesAdded)
+ {
+ pagingManager.removeSize(msg);
+ }
+
clear();
return toCancel;
@@ -523,18 +546,10 @@
containsPersistent = true;
}
-
+
if (scheduledDeliveryTime != null)
{
- for (MessageReference ref : refs)
- {
- ref.setScheduledDeliveryTime(scheduledDeliveryTime);
-
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
- {
- storageManager.updateScheduledDeliveryTimeTransactional(id, ref);
- }
- }
+ postOffice.scheduleReferences(id, scheduledDeliveryTime, refs);
}
return refs;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -140,8 +140,6 @@
ClientSession session = sf.createSession(null, null, false, true, false, preAck, 0);
session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
-
- long initialSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
ClientProducer producer = session.createProducer(ADDRESS);
@@ -288,7 +286,7 @@
session.close();
long globalSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
- assertTrue(globalSize == initialSize || globalSize == 0);
+ assertEquals(0l, globalSize);
assertEquals(0, messagingService.getServer().getPostOffice().getBinding(ADDRESS).getQueue().getDeliveringCount());
assertEquals(0, messagingService.getServer().getPostOffice().getBinding(ADDRESS).getQueue().getMessageCount());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -66,7 +66,7 @@
HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
queueSettings.setDefault(new QueueSettings());
- PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir()),
+ PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(), 10),
null,
queueSettings,
-1,
@@ -116,7 +116,7 @@
queueSettings.addMatch("simple-test", simpleTestSettings);
- PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getJournalDir()),
+ PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getJournalDir(), 10),
null,
queueSettings,
-1,
@@ -128,22 +128,20 @@
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));
- long currentSize = managerImpl.addSize(msg);
+ assertTrue(managerImpl.addSize(msg));
- assertTrue(currentSize > 0);
-
- assertEquals(currentSize, managerImpl.getPageStore(new SimpleString("simple-test")).getAddressSize());
-
for (int i = 0; i < 10; i++)
{
- assertTrue(managerImpl.addSize(msg) < 0);
+ long currentSize = managerImpl.getPageStore(new SimpleString("simple-test")).getAddressSize();
+ assertFalse(managerImpl.addSize(msg));
+ // should be unchanged
assertEquals(currentSize, managerImpl.getPageStore(new SimpleString("simple-test")).getAddressSize());
}
managerImpl.messageDone(msg);
- assertTrue(managerImpl.addSize(msg) > 0);
+ assertTrue(managerImpl.addSize(msg));
managerImpl.stop();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -34,6 +34,7 @@
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.MessagingService;
@@ -141,6 +142,10 @@
sf = createInVMFactory();
+ System.out.println("Size = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
+ assertTrue(messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize() > 0);
+
session = sf.createSession(null, null, false, true, true, false, 0);
ClientConsumer consumer = session.createConsumer(ADDRESS);
@@ -174,6 +179,9 @@
consumer.close();
session.close();
+
+ assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
}
finally
{
@@ -188,6 +196,467 @@
}
+
+ /**
+ * - Make a destination in page mode
+ * - Add stuff to a transaction
+ * - Consume the entire destination (not in page mode any more)
+ * - Add stuff to a transaction again
+ * - Check order
+ *
+ */
+ public void testDepageDuringTransaction() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(100 * 1024);
+ config.setPagingDefaultSize(10 * 1024);
+
+ MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+ messagingService.start();
+
+ final int numberOfIntegers = 256;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ ClientMessage message = null;
+
+ int numberOfMessages = 0;
+ while (true)
+ {
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+
+ // Stop sending message as soon as we start paging
+ if (messagingService.getServer().getPostOffice().getPagingManager().isPaging(ADDRESS))
+ {
+ break;
+ }
+ numberOfMessages ++;
+
+
+ producer.send(message);
+ }
+
+
+
+ session.start();
+
+ ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
+
+ ClientProducer producerTransacted = sessionTransacted.createProducer(ADDRESS);
+
+ for (int i = 0; i< 10; i++)
+ {
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+ message.putIntProperty(new SimpleString("id"), i);
+
+ // Consume messages to force an eventual out of order delivery
+ if (i == 5)
+ {
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ for (int j = 0; j < numberOfMessages; j++)
+ {
+ ClientMessage msg = consumer.receive(1000);
+ msg.acknowledge();
+ assertNotNull(msg);
+ }
+
+
+ assertNull(consumer.receive(100));
+ consumer.close();
+ }
+
+ Integer messageID = (Integer) message.getProperty(new SimpleString("id"));
+ assertNotNull(messageID);
+ assertEquals(messageID.intValue(), i);
+
+ producerTransacted.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ assertNull(consumer.receive(100));
+
+ sessionTransacted.commit();
+
+ sessionTransacted.close();
+
+ for (int i = 0; i < 10; i++)
+ {
+ message = consumer.receive(10000);
+
+ assertNotNull(message);
+
+ Integer messageID = (Integer) message.getProperty(new SimpleString("id"));
+
+ assertNotNull(messageID);
+ assertEquals("message received out of order", messageID.intValue(), i);
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer.receive(100));
+
+ consumer.close();
+
+ session.close();
+
+ assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+
+
+ public void testPageOnSchedulingNoRestart() throws Exception
+ {
+ internalTestPageOnScheduling(false);
+ }
+
+
+
+ public void testPageOnSchedulingRestart() throws Exception
+ {
+ internalTestPageOnScheduling(true);
+ }
+
+
+ public void internalTestPageOnScheduling(final boolean restart) throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(100 * 1024);
+ config.setPagingDefaultSize(10 * 1024);
+
+ MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+ messagingService.start();
+
+ final int numberOfIntegers = 256;
+
+ final int numberOfMessages = 10000;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+
+ ClientMessage message = null;
+
+ MessagingBuffer body = null;
+
+ long scheduledTime = System.currentTimeMillis() + 5000;
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ for (int j = 1; j <= numberOfIntegers; j++)
+ {
+ bodyLocal.putInt(j);
+ }
+ bodyLocal.flip();
+
+ if (i == 0)
+ {
+ body = bodyLocal;
+ }
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+ message.putIntProperty(new SimpleString("id"), i);
+
+ // Worse scenario possible... only schedule what's on pages
+ if (messagingService.getServer().getPostOffice().getPagingManager().isPaging(ADDRESS))
+ {
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, scheduledTime);
+ }
+
+
+ producer.send(message);
+ }
+
+ if (restart)
+ {
+ session.close();
+
+ messagingService.stop();
+
+ messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+ messagingService.start();
+
+ sf = createInVMFactory();
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+ }
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(10000);
+
+ assertNotNull(message2);
+
+ message2.acknowledge();
+
+ assertNotNull(message2);
+
+ Long scheduled = (Long)message2.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+ if (scheduled != null)
+ {
+ assertTrue("Scheduling didn't work", System.currentTimeMillis() >= scheduledTime);
+ }
+
+ try
+ {
+ assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
+ }
+ catch (AssertionFailedError e)
+ {
+ log.info("Expected buffer:" + dumbBytesHex(body.array(), 40));
+ log.info("Arriving buffer:" + dumbBytesHex(message2.getBody().array(), 40));
+ throw e;
+ }
+ }
+
+ consumer.close();
+
+ session.close();
+
+ assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+ public void testRollbackOnSend() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(100 * 1024);
+ config.setPagingDefaultSize(10 * 1024);
+
+ MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+ messagingService.start();
+
+ final int numberOfIntegers = 256;
+
+ final int numberOfMessages = 10;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(null, null, false, false, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+
+ long initialSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
+
+ ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ for (int j = 1; j <= numberOfIntegers; j++)
+ {
+ bodyLocal.putInt(j);
+ }
+ bodyLocal.flip();
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.rollback();
+
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ assertNull(consumer.receive(500));
+
+ session.close();
+
+ assertEquals(initialSize, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+
+ public void testCommitOnSend() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(100 * 1024);
+ config.setPagingDefaultSize(10 * 1024);
+
+ MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+ messagingService.start();
+
+ final int numberOfIntegers = 10;
+
+ final int numberOfMessages = 10;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+
+ long initialSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
+
+ ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ for (int j = 1; j <= numberOfIntegers; j++)
+ {
+ bodyLocal.putInt(j);
+ }
+ bodyLocal.flip();
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(500);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+
+ session.commit();
+
+ session.close();
+
+ assertEquals(initialSize, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -57,14 +57,17 @@
messagingService.getServer().getQueueSettingsRepository().addMatch(qName.toString(), queueSettings);
clientSession.createQueue(ea, eq, null, false, false, false);
clientSession.createQueue(qName, qName, null, false, false, false);
+
ClientProducer producer = clientSession.createProducer(qName);
ClientMessage clientMessage = createTextMessage("heyho!", clientSession);
clientMessage.setExpiration(System.currentTimeMillis());
producer.send(clientMessage);
+
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
ClientMessage m = clientConsumer.receive(500);
assertNull(m);
+ System.out.println("size3 = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
m = clientConsumer.receive(500);
assertNull(m);
clientConsumer.close();
@@ -72,6 +75,10 @@
m = clientConsumer.receive(500);
assertNotNull(m);
assertEquals(m.getBody().getString(), "heyho!");
+ m.acknowledge();
+
+ // PageSize should be the same as when it started
+ assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
}
public void testBasicSendToMultipleQueues() throws Exception
@@ -89,24 +96,49 @@
ClientProducer producer = clientSession.createProducer(qName);
ClientMessage clientMessage = createTextMessage("heyho!", clientSession);
clientMessage.setExpiration(System.currentTimeMillis());
+
+ System.out.println("initialPageSize = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
producer.send(clientMessage);
+
+ System.out.println("pageSize after message sent = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
ClientMessage m = clientConsumer.receive(500);
+
+ System.out.println("pageSize after message received = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
assertNull(m);
+
clientConsumer.close();
+
clientConsumer = clientSession.createConsumer(eq);
+
m = clientConsumer.receive(500);
+
assertNotNull(m);
+
m.acknowledge();
+
assertEquals(m.getBody().getString(), "heyho!");
+
clientConsumer.close();
+
clientConsumer = clientSession.createConsumer(eq2);
+
m = clientConsumer.receive(500);
+
assertNotNull(m);
+
m.acknowledge();
+
assertEquals(m.getBody().getString(), "heyho!");
+
clientConsumer.close();
+
+ // PageGlobalSize should be untouched as the message expired
+ assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
}
public void testBasicSendToNoQueue() throws Exception
@@ -188,7 +220,8 @@
messagingService.start();
// then we create a client as normal
ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- clientSession = sessionFactory.createSession(true, true, false);
+ sessionFactory.setBlockOnAcknowledge(true); // There are assertions over sizes that needs to be done after the ACK was received on server
+ clientSession = sessionFactory.createSession(null, null, false, true, true, false, 0);
}
@Override
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -328,6 +328,8 @@
addSettings();
clientSession.createQueue(pageQueue, pageQueue, null, true, true, true);
+
+ long initialPageSize = this.messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
clientSession.start(xid, XAResource.TMNOFLAGS);
@@ -364,6 +366,10 @@
ClientConsumer pageConsumer = clientSession.createConsumer(pageQueue);
assertNull(pageConsumer.receive(100));
+
+ long globalSize = this.messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
+ // Management message (from createQueue) will not be taken into account again as it is nonPersistent
+ assertTrue(globalSize == initialPageSize || globalSize == 0l);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.DuplicateIDCache;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.ServerMessage;
@@ -156,4 +157,26 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.postoffice.PostOffice#deliver(java.util.List)
+ */
+ public void deliver(List<MessageReference> references)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.postoffice.PostOffice#scheduleReferences(long, java.util.List)
+ */
+ public void scheduleReferences(long scheduledDeliveryTime, List<MessageReference> references) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.postoffice.PostOffice#scheduleReferences(long, long, java.util.List)
+ */
+ public void scheduleReferences(long transactionID, long scheduledDeliveryTime, List<MessageReference> references) throws Exception
+ {
+ }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -181,6 +181,8 @@
EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
+ po.deliver((List<MessageReference>)EasyMock.anyObject());
+
EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
@@ -227,6 +229,8 @@
EasyMock.expect(queue.isDurable()).andStubReturn(true);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
EasyMock.expect(sm.generateUniqueID()).andReturn(1l);
+
+ po.deliver((List<MessageReference>)EasyMock.anyObject());
EasyMock.replay(sm, po, repos, serverMessage, queue, pm);
messageReference.expire(sm, po, repos);
@@ -282,6 +286,8 @@
EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
+
+ po.deliver((List<MessageReference>)EasyMock.anyObject());
EasyMock.replay(sm, po, repos, serverMessage, queue, expQBinding, pm);
@@ -334,6 +340,8 @@
EasyMock.expect(serverMessage.isDurable()).andStubReturn(false);
EasyMock.expect(serverMessage.getMessageID()).andReturn(messageID);
queue.referenceAcknowledged(messageReference);
+
+ postOffice.deliver((List<MessageReference>)EasyMock.anyObject());
EasyMock.replay(queue, toBinding, toQueue, postOffice, persistenceManager, serverMessage, copyMessage, pm);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -42,6 +42,7 @@
import org.easymock.EasyMock;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -1259,7 +1260,7 @@
storageManager.deleteMessageTransactional(EasyMock.anyLong(), EasyMock.eq(queue.getPersistenceID()), EasyMock.eq(messageID));
storageManager.commit(EasyMock.anyLong());
- PostOffice postOffice = createMock(PostOffice.class);
+ PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);
PagingManager pm = EasyMock.createNiceMock(PagingManager.class);
EasyMock.expect(pm.page(EasyMock.isA(ServerMessage.class))).andStubReturn(false);
EasyMock.expect(postOffice.getPagingManager()).andStubReturn(pm);
@@ -1319,7 +1320,7 @@
storageManager.deleteMessageTransactional(anyLong(), eq(queue.getPersistenceID()), eq(messageID));
storageManager.commit(anyLong());
- PostOffice postOffice = createMock(PostOffice.class);
+ PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);
PagingManager pm = EasyMock.createNiceMock(PagingManager.class);
EasyMock.expect(pm.page(EasyMock.isA(ServerMessage.class))).andStubReturn(false);
EasyMock.expect(postOffice.getPagingManager()).andStubReturn(pm);
@@ -1382,7 +1383,7 @@
storageManager.deleteMessageTransactional(EasyMock.anyLong(), EasyMock.eq(queue.getPersistenceID()), EasyMock.eq(messageID));
storageManager.commit(EasyMock.anyLong());
- PostOffice postOffice = EasyMock.createMock(PostOffice.class);
+ PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);
PagingManager pm = EasyMock.createNiceMock(PagingManager.class);
EasyMock.expect(pm.page(EasyMock.isA(ServerMessage.class))).andStubReturn(false);
@@ -1422,12 +1423,14 @@
/**
* @return
*/
- private PostOffice createMockPostOffice()
+ private PostOffice createMockPostOffice() throws Exception
{
+ PagingStore niceStore = EasyMock.createNiceMock(PagingStore.class);
PagingManager niceManager = EasyMock.createNiceMock(PagingManager.class);
PostOffice nicePostOffice = EasyMock.createNiceMock(PostOffice.class);
EasyMock.expect(nicePostOffice.getPagingManager()).andStubReturn(niceManager);
- EasyMock.replay(niceManager, nicePostOffice);
+ EasyMock.expect(niceManager.getPageStore((SimpleString)EasyMock.anyObject())).andStubReturn(niceStore);
+ EasyMock.replay(niceManager, nicePostOffice, niceStore);
return nicePostOffice;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java 2008-12-11 17:32:17 UTC (rev 5519)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java 2008-12-12 03:59:01 UTC (rev 5520)
@@ -24,6 +24,7 @@
import static org.jboss.messaging.tests.util.RandomUtil.randomXid;
+import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -534,8 +535,8 @@
{
PagingManager pagingManager = EasyMock.createStrictMock(PagingManager.class);
- PostOffice postOffice = EasyMock.createMock(PostOffice.class);
- PagingStore pagingStore = EasyMock.createStrictMock(PagingStore.class);
+ PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);
+ PagingStore pagingStore = EasyMock.createNiceMock(PagingStore.class);
EasyMock.expect(pagingManager.getPageStore((SimpleString)EasyMock.anyObject())).andStubReturn(pagingStore);
EasyMock.expect(postOffice.getPagingManager()).andStubReturn(pagingManager);
@@ -572,8 +573,6 @@
StorageManager sm = EasyMock.createStrictMock(StorageManager.class);
- PostOffice po= EasyMock.createStrictMock(PostOffice.class);
-
final long txID = 123;
EasyMock.expect(sm.generateUniqueID()).andReturn(txID);
@@ -582,7 +581,7 @@
EasyMock.replay(sm, postOffice, pagingManager, pagingStore);
- Transaction tx = new TransactionImpl(sm, po);
+ Transaction tx = new TransactionImpl(sm, postOffice);
assertFalse(tx.isContainsPersistent());
@@ -632,12 +631,15 @@
//Expect:
- sm.commit(txID);
+ postOffice.deliver((List<MessageReference>)EasyMock.anyObject());
- pagingManager.messageDone(message1);
+ EasyMock.expectLastCall().anyTimes();
- pagingManager.messageDone(message2);
-
+ sm.commit(txID);
+
+ EasyMock.expect(pagingManager.getPageStore((SimpleString)EasyMock.anyObject())).andStubReturn(pagingStore);
+ EasyMock.expect(postOffice.getPagingManager()).andStubReturn(pagingManager);
+
EasyMock.replay(sm, postOffice, pagingManager, pagingStore);
tx.commit();
@@ -651,22 +653,30 @@
// Private -------------------------------------------------------------------------
- private Transaction createTransaction()
+ private Transaction createTransaction() throws Exception
{
StorageManager sm = EasyMock.createStrictMock(StorageManager.class);
- PostOffice po = EasyMock.createStrictMock(PostOffice.class);
+ PostOffice po = EasyMock.createNiceMock(PostOffice.class);
final long txID = 123L;
EasyMock.expect(sm.generateUniqueID()).andReturn(txID);
- EasyMock.replay(sm);
+ EasyMock.replay(sm, po);
Transaction tx = new TransactionImpl(sm, po);
- EasyMock.verify(sm);
+ EasyMock.verify(sm, po);
+ EasyMock.reset(po);
+
+ po.deliver((List<MessageReference>)EasyMock.anyObject());
+
+ EasyMock.expectLastCall().anyTimes();
+
+ EasyMock.replay(po);
+
return tx;
}
More information about the jboss-cvs-commits
mailing list