[jboss-cvs] JBoss Messaging SVN: r6771 - in trunk: src/main/org/jboss/messaging/core/persistence/impl/journal and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 13 11:16:39 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-05-13 11:16:38 -0400 (Wed, 13 May 2009)
New Revision: 6771
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/TransactionOperation.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PreserveOrderDuringFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
trunk/tests/src/org/jboss/messaging/tests/util/SpawnedVMSupport.java
trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1558 - Order of Queues on backup node during rollback and session.close (redelivery)
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -129,6 +129,11 @@
log.trace(message);
}
+ private static void trace(final String message, Exception t)
+ {
+ log.trace(message, t);
+ }
+
// Constructors --------------------------------------------------
public PagingStoreImpl(final PagingManager pagingManager,
@@ -389,6 +394,17 @@
}
currentPageLock.readLock().lock();
+ if (isTrace)
+ {
+ if (pagingManager.isBackup())
+ {
+ trace("Paging Reference[" + message.getMessage(null).getMessageID() + "] on Backup");
+ }
+ else
+ {
+ trace("Paging Reference[" + message.getMessage(null).getMessageID() + "] on Live");
+ }
+ }
try
{
@@ -469,6 +485,11 @@
{
if (!depaging.get())
{
+ if (isTrace)
+ {
+ trace("Starting depaging for " + this.getStoreName(), new Exception ("trace"));
+ }
+
depaging.set(true);
Runnable depageAction = new DepageRunnable(executor);
executor.execute(depageAction);
@@ -936,6 +957,11 @@
final boolean globalFull = isGlobalFull(getPageSizeBytes());
if (pageFull || globalFull || !isPaging())
{
+ if (isTrace)
+ {
+ trace("clearDepage::true");
+ }
+
depaging.set(false);
if (!globalFull)
{
@@ -945,6 +971,11 @@
}
else
{
+ if (isTrace)
+ {
+ trace("clearDepage::false");
+ }
+
return false;
}
}
@@ -1033,13 +1064,16 @@
// the lock and this would dead lock
if (running && !clearDepage())
{
+ if (isTrace)
+ {
+ trace("Scheduling to depage " + PagingStoreImpl.this.getStoreName());
+ }
followingExecutor.execute(this);
}
}
}
catch (Throwable e)
{
- e.printStackTrace();
log.error(e, e);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -28,6 +28,8 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -1584,6 +1586,14 @@
{
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.transaction.TransactionOperation#getDistinctQueues()
+ */
+ public Collection<Queue> getDistinctQueues()
+ {
+ return Collections.emptySet();
+ }
+
}
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -23,12 +23,15 @@
package org.jboss.messaging.core.postoffice.impl;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.DuplicateIDCache;
+import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionOperation;
import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
@@ -242,6 +245,14 @@
{
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.transaction.TransactionOperation#getDistinctQueues()
+ */
+ public Collection<Queue> getDistinctQueues()
+ {
+ return Collections.emptySet();
+ }
+
}
private static final class ByteArrayHolder
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -23,6 +23,8 @@
package org.jboss.messaging.core.postoffice.impl;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -969,6 +971,16 @@
messagesToPage.add(message);
}
+
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.transaction.TransactionOperation#getDistinctQueues()
+ */
+ public Collection<Queue> getDistinctQueues()
+ {
+ return Collections.emptySet();
+ }
+
public void afterCommit(final Transaction tx) throws Exception
{
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -147,4 +147,8 @@
void deliverNow();
boolean checkDLQ(MessageReference ref) throws Exception;
+
+ void lockDelivery();
+
+ void unlockDelivery();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -40,6 +40,8 @@
//void handleClose(Packet packet);
void close() throws Exception;
+
+ int getCountOfPendingDeliveries();
List<MessageReference> cancelRefs(boolean lastConsumedAsDelivered, Transaction tx) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -17,6 +17,7 @@
import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIG_MESSAGE_ID;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -27,6 +28,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -112,6 +114,8 @@
private final PagingManager pagingManager;
+ private final Semaphore lock = new Semaphore(1);
+
private volatile PagingStore pagingStore;
private final StorageManager storageManager;
@@ -328,6 +332,48 @@
// Queue implementation ----------------------------------------------------------------------------------------
+ public void lockDelivery()
+ {
+ if (backup)
+ {
+ return;
+ }
+
+ if (trace)
+ {
+ log.trace("Trying to lock queue=" + this.name + ", backup=" + this.backup, new Exception("trace"));
+ }
+
+ try
+ {
+ lock.acquire();
+ }
+ catch (InterruptedException e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+
+ if (trace)
+ {
+ log.trace("Locked, queue=" + this.name + ", backup=" + this.backup, new Exception("trace"));
+ }
+ }
+
+ public void unlockDelivery()
+ {
+ if (backup)
+ {
+ return;
+ }
+
+ lock.release();
+
+ if (trace)
+ {
+ log.trace("UN-Locked, queue=" + this.name + ", backup = " + this.backup, new Exception("trace"));
+ }
+ }
+
public boolean isDurable()
{
return durable;
@@ -359,8 +405,13 @@
}
public void addLast(final MessageReference ref)
- {
- add(ref, false);
+ {
+
+ if (trace)
+ {
+ log.trace("AddLast(" + this.getName() + (backup?"@Backup":"@Live") + "::" + ref);
+ }
+ add(ref, false);
}
public void addFirst(final MessageReference ref)
@@ -425,11 +476,11 @@
if (delay > 0)
{
if (consumers.size() == 0 && messageReferences.size() > 0)
- {
+ {
DelayedAddRedistributor dar = new DelayedAddRedistributor(executor, replicatingChannel);
-
+
future = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
-
+
futures.add(future);
}
}
@@ -989,6 +1040,12 @@
// Public
// -----------------------------------------------------------------------------
+ /** To be used on tests only. Do not use it otherwise */
+ public PriorityLinkedList<MessageReference> getReferencesList()
+ {
+ return this.messageReferences;
+ }
+
@Override
public boolean equals(final Object other)
{
@@ -1196,7 +1253,7 @@
/*
* Attempt to deliver all the messages in the queue
*/
- private void deliver()
+ private synchronized void deliver()
{
// We don't do actual delivery if the queue is on a backup node - this is
// because it's async and could get out of step
@@ -1470,10 +1527,15 @@
// Must be set to false *before* executing to avoid race
waitingToDeliver.set(false);
- synchronized (QueueImpl.this)
+ QueueImpl.this.lockDelivery();
+ try
{
deliver();
}
+ finally
+ {
+ QueueImpl.this.unlockDelivery();
+ }
}
}
@@ -1535,6 +1597,26 @@
}
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.transaction.TransactionOperation#getDistinctQueues()
+ */
+ public synchronized Collection<Queue> getDistinctQueues()
+ {
+ HashSet<Queue> queues = new HashSet<Queue>();
+
+ for (MessageReference ref : refsToAck)
+ {
+ queues.add(ref.getQueue());
+ }
+
+ for (MessageReference ref : refsToAdd)
+ {
+ queues.add(ref.getQueue());
+ }
+
+ return queues;
+ }
+
public void afterCommit(final Transaction tx) throws Exception
{
for (MessageReference ref : refsToAdd)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -240,9 +240,18 @@
{
MessageReference ref = iter.next();
+ if (trace)
+ {
+ log.trace("Adding reference " + ref + " into a Transaction for close/cancel");
+ }
+
ref.getQueue().cancel(tx, ref);
}
+ if (trace)
+ {
+ log.trace("***************** tx.rollback being called now *****************");
+ }
tx.rollback();
if (!browseOnly)
@@ -267,6 +276,11 @@
}
}
+ public int getCountOfPendingDeliveries()
+ {
+ return deliveringRefs.size();
+ }
+
public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
{
boolean performACK = lastConsumedAsDelivered;
@@ -363,11 +377,12 @@
{
throw new IllegalStateException(System.identityHashCode(this) + " Could not find reference on consumerID=" +
id +
- ", messageId " +
+ ", messageId = " +
messageID +
- " backup " +
+ " backup = " +
messageQueue.isBackup() +
- " closed " +
+ " queue = " + messageQueue.getName() +
+ " closed = " +
closed);
}
@@ -424,6 +439,11 @@
public void deliverReplicated(final long messageID) throws Exception
{
+ if (trace)
+ {
+ log.trace("Replicating delivery Reference[" + messageID + "] queueOnConsumer=" + messageQueue.getName());
+ }
+
MessageReference ref = messageQueue.removeFirstReference(messageID);
if (ref == null)
@@ -434,8 +454,8 @@
// force a depage
if (!store.readPage()) // This returns false if there are no pages
{
- throw new IllegalStateException("Cannot find ref " + messageID +
- " in queue " +
+ throw new IllegalStateException("Cannot find Reference[" + messageID +
+ "] in queue " +
messageQueue.getName());
}
else
@@ -444,7 +464,8 @@
if (ref == null)
{
- throw new IllegalStateException("Cannot find ref after depaging");
+ throw new IllegalStateException("Cannot find Reference[" + messageID +
+ "] after depaging on Queue " + messageQueue.getName());
}
}
}
@@ -486,12 +507,30 @@
// Public ---------------------------------------------------------------------------------------
-
- /** Only use this on tests */
+ /** To be used on tests only */
public AtomicInteger getAvailableCredits()
{
return availableCredits;
}
+
+ /** To be used on tests only */
+ public java.util.Queue<MessageReference> getDeliveringRefs()
+ {
+ return deliveringRefs;
+ }
+
+ /** To be used on tests only */
+ public ServerSession getSession()
+ {
+ return session;
+ }
+
+ /** To be used on tests only */
+ public long getReplicatedSessionID()
+ {
+ return replicatedSessionID;
+ }
+
// Private --------------------------------------------------------------------------------------
private void promptDelivery()
@@ -642,7 +681,7 @@
if (replicatingChannel == null)
{
- // it doesn't need lock because deliverLargeMesasge is already inside the lock.lock()
+ // it doesn't need lock because deliverLargeMesasge is already inside the lock()
largeMessageDeliverer = localDeliverer;
largeMessageDeliverer.deliver();
}
@@ -697,6 +736,10 @@
{
// Not replicated - just send now
+ if (trace)
+ {
+ log.trace("delivering Message " + ref + " on backup");
+ }
channel.send(packet);
}
else
@@ -708,6 +751,10 @@
{
public void run()
{
+ if (trace)
+ {
+ log.trace("delivering Message " + ref + " on live");
+ }
channel.send(packet);
}
});
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -178,7 +178,7 @@
private final QueueFactory queueFactory;
private final SimpleString nodeID;
-
+
private boolean backup;
// The current currentLargeMessage being processed
@@ -263,7 +263,7 @@
this.nodeID = server.getNodeID();
this.replicatingChannel = replicatingChannel;
-
+
this.backup = backup;
}
@@ -316,7 +316,7 @@
throw new IllegalStateException("Cannot find consumer with id " + consumer.getID() + " to remove");
}
}
-
+
public void close() throws Exception
{
if (tx != null && tx.getXid() == null)
@@ -348,7 +348,7 @@
log.error("Failed to delete large message file", error);
}
}
-
+
remotingConnection.removeFailureListener(this);
}
@@ -378,15 +378,15 @@
public void handleCreateQueue(final CreateQueueMessage packet)
{
if (replicatingChannel == null)
- {
+ {
doHandleCreateQueue(packet);
}
else
- {
+ {
replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
{
public void run()
- {
+ {
doHandleCreateQueue(packet);
}
});
@@ -487,7 +487,7 @@
{
if (replicatingChannel == null)
{
- doHandleCommit(packet);
+ doHandleCommit(packet);
}
else
{
@@ -502,18 +502,32 @@
}
public void handleRollback(final RollbackMessage packet)
- {
+ {
+
+
if (replicatingChannel == null)
{
doHandleRollback(packet);
}
else
{
+ final HashSet<Queue> queues = lockUsedQueues();
+
replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
{
public void run()
{
- doHandleRollback(packet);
+ try
+ {
+ doHandleRollback(packet);
+ }
+ finally
+ {
+ for (Queue queue : queues)
+ {
+ queue.unlockDelivery();
+ }
+ }
}
});
}
@@ -734,7 +748,7 @@
});
}
}
-
+
private void lockConsumers()
{
for (ServerConsumer consumer : consumers.values())
@@ -750,23 +764,29 @@
consumer.unlock();
}
}
-
+
public void handleStart(final Packet packet)
{
- if (replicatingChannel != null)
- {
+ if (replicatingChannel == null)
+ {
+ setStarted(true);
+
+ channel.confirm(packet);
+ }
+ else
+ {
lockConsumers();
-
+
try
- {
+ {
setStarted(true);
-
+
replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
{
public void run()
{
- //setStarted(true);
-
+ // setStarted(true);
+
channel.confirm(packet);
}
});
@@ -776,12 +796,6 @@
unlockConsumers();
}
}
- else
- {
- setStarted(true);
-
- channel.confirm(packet);
- }
}
public void handleStop(final Packet packet)
@@ -797,23 +811,31 @@
// delivery is processed on backup
// it's stopped so barfs and cannot process delivery
- if (replicatingChannel != null)
+ if (replicatingChannel == null)
{
+ setStarted(false);
+
+ channel.confirm(packet);
+
+ channel.send(response);
+ }
+ else
+ {
lockConsumers();
-
+
try
{
-
+
setStarted(false);
-
+
replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
{
public void run()
{
channel.confirm(packet);
-
+
channel.send(response);
-
+
}
});
}
@@ -822,14 +844,6 @@
unlockConsumers();
}
}
- else
- {
- setStarted(false);
-
- channel.confirm(packet);
-
- channel.send(response);
- }
}
public void handleFailedOver(final Packet packet)
@@ -844,12 +858,15 @@
public void handleClose(final Packet packet)
{
+
if (replicatingChannel == null)
{
doHandleClose(packet);
}
else
{
+ final HashSet<Queue> queues = lockUsedQueues();
+
// We need to stop the consumers first before replicating, to ensure no deliveries occur after this,
// but we need to process the actual close() when the replication response returns, otherwise things
// can happen like acks can come in after close
@@ -863,22 +880,46 @@
{
public void run()
{
- doHandleClose(packet);
+ try
+ {
+ doHandleClose(packet);
+ }
+ finally
+ {
+ for (Queue queue : queues)
+ {
+ queue.unlockDelivery();
+ }
+ }
}
});
}
}
public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
- {
+ {
final ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
+
+ consumer.setStarted(false);
+
if (replicatingChannel == null)
- {
+ {
doHandleCloseConsumer(packet, consumer);
}
else
{
+ final Queue queue;
+
+ if (consumer.getCountOfPendingDeliveries() > 0)
+ {
+ queue = consumer.getQueue();
+ queue.lockDelivery();
+ }
+ else
+ {
+ queue = null;
+ }
+
// We need to stop the consumer first before replicating, to ensure no deliveries occur after this,
// but we need to process the actual close() when the replication response returns, otherwise things
// can happen like acks can come in after close
@@ -889,14 +930,22 @@
{
public void run()
{
- doHandleCloseConsumer(packet, consumer);
+ try
+ {
+ doHandleCloseConsumer(packet, consumer);
+ }
+ finally
+ {
+ if (queue != null)
+ {
+ queue.unlockDelivery();
+ }
+ }
}
});
}
}
-
-
public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
{
if (replicatingChannel == null)
@@ -957,17 +1006,17 @@
}
currentLargeMessage = msg;
-
+
doSendLargeMessage(packet);
}
});
}
}
-
+
public void handleSend(final SessionSendMessage packet)
{
if (replicatingChannel == null)
- {
+ {
doSend(packet);
}
else
@@ -1003,7 +1052,7 @@
public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
{
ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
+
if (consumer == null)
{
throw new IllegalStateException("Cannot handle replicated delivery, consumer is closed " + packet.getConsumerID() +
@@ -1044,21 +1093,21 @@
{
// Put the id back to the original client session id
this.id = this.oppositeChannelID;
-
+
this.oppositeChannelID = -1;
-
+
backup = false;
}
-
+
remotingConnection.removeFailureListener(this);
-
- //Note. We do not destroy the replicating connection here. In the case the live server has really crashed
- //then the connection will get cleaned up anyway when the server ping timeout kicks in.
- //In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
- //the replicating connection will cause the outstanding responses to be be replayed on the live server,
- //if these reach the client who then subsequently fails over, on reconnection to backup, it will have
- //received responses that the backup did not know about.
+ // Note. We do not destroy the replicating connection here. In the case the live server has really crashed
+ // then the connection will get cleaned up anyway when the server ping timeout kicks in.
+ // In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
+ // the replicating connection will cause the outstanding responses to be be replayed on the live server,
+ // if these reach the client who then subsequently fails over, on reconnection to backup, it will have
+ // received responses that the backup did not know about.
+
channel.transferConnection(newConnection, this.id, replicatingChannel);
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
@@ -1068,7 +1117,7 @@
remotingConnection.addFailureListener(this);
int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
-
+
channel.replayCommands(lastReceivedCommandID, this.id);
if (wasStarted)
@@ -1078,7 +1127,7 @@
return serverLastReceivedCommandID;
}
-
+
public Channel getChannel()
{
return channel;
@@ -1139,6 +1188,7 @@
}
}
+
// Public
// ----------------------------------------------------------------------------
@@ -1146,7 +1196,7 @@
{
return tx;
}
-
+
// Private
// ----------------------------------------------------------------------------
@@ -1176,9 +1226,9 @@
channel.confirm(packet);
- channel.send(response);
+ channel.send(response);
}
-
+
private void doHandleCreateConsumer(final SessionCreateConsumerMessage packet)
{
SimpleString name = packet.getQueueName();
@@ -1188,7 +1238,7 @@
boolean browseOnly = packet.isBrowseOnly();
Packet response = null;
-
+
try
{
Binding binding = postOffice.getBinding(name);
@@ -1231,7 +1281,8 @@
theQueue = (Queue)binding.getBindable();
}
- ServerConsumer consumer = new ServerConsumerImpl(server, idGenerator.generateID(),
+ ServerConsumer consumer = new ServerConsumerImpl(server,
+ idGenerator.generateID(),
oppositeChannelID,
this,
(QueueBinding)binding,
@@ -1262,7 +1313,7 @@
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
-
+
if (filterString != null)
{
props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
@@ -1473,7 +1524,7 @@
List<SimpleString> names = new ArrayList<SimpleString>();
Bindings bindings = postOffice.getMatchingBindings(address);
-
+
for (Binding binding : bindings.getBindings())
{
if (binding.getType() == BindingType.LOCAL_QUEUE)
@@ -1510,7 +1561,7 @@
try
{
ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
+
consumer.acknowledge(autoCommitAcks, tx, packet.getMessageID());
if (packet.isRequiresResponse())
@@ -2163,7 +2214,7 @@
channel.confirm(packet);
- //We flush the confirmations to make sure any send confirmations get handled on the client side
+ // We flush the confirmations to make sure any send confirmations get handled on the client side
channel.flushConfirmations();
channel.send(response);
@@ -2200,7 +2251,6 @@
log.error("Failed to create large message", e);
Packet response = null;
-
channel.confirm(packet);
if (response != null)
{
@@ -2228,7 +2278,7 @@
try
{
long id = storageManager.generateUniqueID();
-
+
currentLargeMessage.setMessageID(id);
}
catch (Exception e)
@@ -2246,9 +2296,9 @@
try
{
ServerMessage message = packet.getServerMessage();
-
+
long id = storageManager.generateUniqueID();
-
+
message.setMessageID(id);
if (message.getDestination().equals(managementAddress))
@@ -2315,7 +2365,7 @@
currentLargeMessage = null;
message.complete();
-
+
send(message);
}
@@ -2380,7 +2430,7 @@
{
LargeServerMessage largeMessage = storageManager.createLargeMessage();
- MessagingBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
+ MessagingBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
largeMessage.decodeProperties(headerBuffer);
@@ -2458,7 +2508,33 @@
postOffice.route(msg, tx);
}
}
+
+ /**
+ * We need to avoid delivery when rolling back while doing replication, or the backup node could be on a different order
+ * @return
+ */
+ private HashSet<Queue> lockUsedQueues()
+ {
+ final HashSet<Queue> queues = new HashSet<Queue>();
+
+ for (ServerConsumer consumer : consumers.values())
+ {
+ queues.add(consumer.getQueue());
+ }
+
+ if (tx != null)
+ {
+ queues.addAll(tx.getDistinctQueues());
+ }
+
+ for (Queue queue : queues)
+ {
+ queue.lockDelivery();
+ }
+ return queues;
+ }
+
private void doSecurity(final ServerMessage msg) throws Exception
{
try
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -22,9 +22,12 @@
package org.jboss.messaging.core.transaction;
+import java.util.Set;
+
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.Queue;
/**
* A JBoss Messaging internal transaction
@@ -67,6 +70,8 @@
void putProperty(int index, Object property);
Object getProperty(int index);
+
+ Set<Queue> getDistinctQueues();
static enum State
{
Modified: trunk/src/main/org/jboss/messaging/core/transaction/TransactionOperation.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/TransactionOperation.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/transaction/TransactionOperation.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -22,6 +22,10 @@
package org.jboss.messaging.core.transaction;
+import java.util.Collection;
+
+import org.jboss.messaging.core.server.Queue;
+
/**
*
* A TransactionOperation
@@ -31,6 +35,10 @@
*/
public interface TransactionOperation
{
+
+ /** rollback will need a distinct list of Queues in order to lock those queues before calling rollback */
+ Collection<Queue> getDistinctQueues();
+
void beforePrepare(Transaction tx) throws Exception;
void beforeCommit(Transaction tx) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -13,7 +13,10 @@
package org.jboss.messaging.core.transaction.impl;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import javax.transaction.xa.Xid;
@@ -21,6 +24,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionOperation;
import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
@@ -37,11 +41,11 @@
private List<TransactionOperation> operations;
private static final Logger log = Logger.getLogger(TransactionImpl.class);
-
+
private static final int INITIAL_NUM_PROPERTIES = 10;
-
+
private Object[] properties = new Object[INITIAL_NUM_PROPERTIES];
-
+
private final StorageManager storageManager;
private final Xid xid;
@@ -55,7 +59,7 @@
private final Object timeoutLock = new Object();
private final long createTime;
-
+
public TransactionImpl(final StorageManager storageManager)
{
this.storageManager = storageManager;
@@ -92,6 +96,29 @@
// Transaction implementation
// -----------------------------------------------------------
+ public Set<Queue> getDistinctQueues()
+ {
+ HashSet<Queue> queues = new HashSet<Queue>();
+
+ if (operations != null)
+ {
+ for (TransactionOperation op : operations)
+ {
+ Collection<Queue> q = op.getDistinctQueues();
+ if (q == null)
+ {
+ log.warn("Operation " + op + " returned null getDistinctQueues");
+ }
+ else
+ {
+ queues.addAll(q);
+ }
+ }
+ }
+
+ return queues;
+ }
+
public long getID()
{
return id;
@@ -114,7 +141,7 @@
}
else
{
- //Do nothing
+ // Do nothing
return;
}
}
@@ -127,7 +154,7 @@
{
throw new IllegalStateException("Cannot prepare non XA transaction");
}
-
+
if (operations != null)
{
for (TransactionOperation operation : operations)
@@ -146,12 +173,12 @@
{
operation.afterPrepare(this);
}
- }
+ }
}
}
public void commit() throws Exception
- {
+ {
commit(true);
}
@@ -167,7 +194,7 @@
}
else
{
- //Do nothing
+ // Do nothing
return;
}
}
@@ -176,7 +203,7 @@
{
if (onePhase)
{
- if(state == State.ACTIVE)
+ if (state == State.ACTIVE)
{
prepare();
}
@@ -202,7 +229,7 @@
}
}
- if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || (xid != null && state == State.PREPARED) )
+ if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || (xid != null && state == State.PREPARED))
{
storageManager.commit(id);
}
@@ -237,7 +264,7 @@
throw new IllegalStateException("Transaction is in invalid state " + state);
}
}
-
+
if (operations != null)
{
for (TransactionOperation operation : operations)
@@ -256,7 +283,7 @@
{
operation.afterRollback(this);
}
- }
+ }
}
}
@@ -282,7 +309,7 @@
{
return state;
}
-
+
public void setState(final State state)
{
this.state = state;
@@ -313,7 +340,7 @@
operations.remove(operation);
}
-
+
public int getOperationsCount()
{
return operations.size();
@@ -324,20 +351,20 @@
if (index >= properties.length)
{
Object[] newProperties = new Object[index];
-
+
System.arraycopy(properties, 0, newProperties, 0, properties.length);
-
+
properties = newProperties;
}
-
- properties[index] = property;
+
+ properties[index] = property;
}
-
+
public Object getProperty(int index)
{
return properties[index];
}
-
+
// Private
// -------------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -929,6 +929,8 @@
{
Thread.sleep(10);
}
+
+ assertNull(consumerImpl.getAvailableCredits());
}
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -1281,7 +1281,7 @@
protected int getLatchWait()
{
- return 20000;
+ return 60000;
}
protected int getNumIterations()
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java (from rev 6694, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -0,0 +1,924 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagedMessage;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.postoffice.QueueBinding;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.QueueImpl;
+import org.jboss.messaging.core.server.impl.ServerConsumerImpl;
+import org.jboss.messaging.tests.util.RandomUtil;
+import org.jboss.messaging.tests.util.SpawnedVMSupport;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ *
+ * It validates if the messages are in the same ordering on the page system between the backup and live nodes.
+ *
+ * This test is valid as long as we want to guarantee strict ordering on both nodes for paged messages between backup and live nodes.
+ *
+ * If we change this concept anyway this test may become invalid and we would need to delete it.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OrderingOnBackupTest extends FailoverTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(OrderingOnBackupTest.class);
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ private static void debug(String message)
+ {
+ log.info(message);
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ public void testPageOrderingLiveAndBackupProducerOnly() throws Exception
+ {
+ internalTestPageOrderingLiveAndBackup(false);
+ }
+
+ public void testPageOrderingLiveAndBackupConsume() throws Exception
+ {
+ internalTestPageOrderingLiveAndBackup(true);
+ }
+
+ private void internalTestPageOrderingLiveAndBackup(boolean consumeMessages) throws Exception
+ {
+ final SimpleString threadIDKey = new SimpleString("THREAD_ID");
+ final SimpleString sequenceIDKey = new SimpleString("SEQUENCE_ID");
+ final SimpleString ADDRESS = new SimpleString("SOME_QUEUE");
+
+ final int NUMBER_OF_THREADS = 100;
+ final int NUMBER_OF_MESSAGES = 200;
+
+ final int NUMBER_OF_HANDLERS = consumeMessages ? NUMBER_OF_THREADS : 0;
+
+ setUpFailoverServers(true, 100 * 1024, 50 * 1024);
+
+ final ClientSessionFactory factory = createFailoverFactory();
+
+ ClientSession session = factory.createSession(false, true, true);
+ for (int i = 0; i < NUMBER_OF_THREADS; i++)
+ {
+ session.createQueue(ADDRESS, ADDRESS.concat("-" + i), true);
+ }
+ session.close();
+
+ MyHandler handlers[] = new MyHandler[NUMBER_OF_HANDLERS];
+
+ for (int i = 0; i < handlers.length; i++)
+ {
+ handlers[i] = new MyHandler(factory, ADDRESS.concat("-" + i), NUMBER_OF_MESSAGES * 10);
+ }
+
+ final CountDownLatch flagAlign = new CountDownLatch(NUMBER_OF_THREADS);
+ final CountDownLatch flagStart = new CountDownLatch(1);
+
+ class ProducerThread extends Thread
+ {
+ Throwable e;
+
+ final int threadID;
+
+ ProducerThread(int threadID)
+ {
+ this.threadID = threadID;
+ }
+
+ public void run()
+ {
+ try
+ {
+ ClientSession session = factory.createSession(false, true, true);
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ // I want to jinx all this by having everybody start sending at the same time
+ flagAlign.countDown();
+ flagStart.await();
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ ClientMessage msg = session.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[512]));
+ msg.getProperties().putIntProperty(threadIDKey, this.threadID);
+ msg.getProperties().putIntProperty(sequenceIDKey, i);
+ producer.send(msg);
+ }
+
+ session.close();
+
+ }
+ catch (Throwable e)
+ {
+ // System.out => Hudson/JUNIT reports
+ e.printStackTrace();
+ this.e = e;
+ }
+
+ }
+ }
+
+ ProducerThread threads[] = new ProducerThread[NUMBER_OF_THREADS];
+
+ for (int i = 0; i < threads.length; i++)
+ {
+ threads[i] = new ProducerThread(i);
+ threads[i].start();
+ }
+
+ assertTrue("Error initializing some of the threads", flagAlign.await(10, TimeUnit.SECONDS));
+
+ flagStart.countDown();
+
+ for (ProducerThread t : threads)
+ {
+ t.join();
+ }
+
+ for (ProducerThread t : threads)
+ {
+ if (t.e != null)
+ {
+ throw new Exception("Test Failed", t.e);
+ }
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ handler.close();
+ if (handler.failure != null)
+ {
+ throw new Exception("Failure on consumer", handler.failure);
+ }
+ }
+
+ PagingManager livePagingManager = liveServer.getPostOffice().getPagingManager();
+ PagingManager backupPagingManager = backupServer.getPostOffice().getPagingManager();
+
+ TestSupportPageStore livePagingStore = (TestSupportPageStore)livePagingManager.getPageStore(ADDRESS);
+ TestSupportPageStore backupPagingStore = (TestSupportPageStore)backupPagingManager.getPageStore(ADDRESS);
+
+ debug("Pages: " + livePagingStore.getNumberOfPages() + " on backup: " + backupPagingStore.getNumberOfPages());
+
+ if (consumeMessages)
+ {
+ if (livePagingStore.getNumberOfPages() == backupPagingStore.getNumberOfPages() - 1)
+ {
+ // The live node may have one extra page in front of the backup
+ backupPagingStore.depage();
+ }
+ }
+
+ assertEquals(livePagingStore.getNumberOfPages(), backupPagingStore.getNumberOfPages());
+
+ Page livePage = null;
+ Page backupPage = null;
+
+ while (true)
+ {
+ livePage = livePagingStore.depage();
+
+ if (livePage == null)
+ {
+ assertNull(backupPagingStore.depage());
+ break;
+ }
+
+ backupPage = backupPagingStore.depage();
+
+ assertNotNull(backupPage);
+
+ livePage.open();
+ backupPage.open();
+
+ List<PagedMessage> liveMessages = livePage.read();
+ List<PagedMessage> backupMessages = backupPage.read();
+
+ livePage.close();
+ backupPage.close();
+
+ assertEquals(liveMessages.size(), backupMessages.size());
+
+ Iterator<PagedMessage> backupIterator = backupMessages.iterator();
+
+ for (PagedMessage liveMsg : liveMessages)
+ {
+ PagedMessage backupMsg = backupIterator.next();
+ assertNotNull(backupMsg);
+
+ ServerMessage liveSrvMsg = liveMsg.getMessage(null);
+ ServerMessage backupSrvMsg = liveMsg.getMessage(null);
+
+ assertEquals(liveSrvMsg.getMessageID(), backupSrvMsg.getMessageID());
+ assertEquals(liveSrvMsg.getProperty(threadIDKey), backupSrvMsg.getProperty(threadIDKey));
+ assertEquals(liveSrvMsg.getProperty(sequenceIDKey), backupSrvMsg.getProperty(sequenceIDKey));
+ }
+ }
+
+ }
+
+ public void testDeliveryOrderOnTransactionalRollbackMultiThread() throws Exception
+ {
+
+ final SimpleString ADDRESS = new SimpleString("TEST");
+ final SimpleString PROPERTY_KEY = new SimpleString("KEY-STR");
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ int NTHREADS = 30;
+ final int NMESSAGES = 1000;
+
+ class ProdThread extends Thread
+ {
+ final CountDownLatch latchAlign;
+
+ final CountDownLatch latchStart;
+
+ final ClientSessionFactory sf;
+
+ ProdThread(final CountDownLatch latchAlign, final CountDownLatch latchStart, final ClientSessionFactory sf)
+ {
+ this.latchAlign = latchAlign;
+ this.latchStart = latchStart;
+ this.sf = sf;
+ }
+
+ @Override
+ public void run()
+ {
+ ClientSession sess = null;
+ try
+ {
+ latchAlign.countDown();
+ latchStart.await();
+
+ sess = sf.createSession(false, false, false);
+
+ ClientProducer prod = sess.createProducer(ADDRESS);
+
+ for (int i = 0; i < NMESSAGES; i++)
+ {
+ ClientMessage msg = createTextMessage(sess, "test" + i, false);
+ msg.putStringProperty(PROPERTY_KEY, RandomUtil.randomSimpleString());
+ prod.send(msg);
+ }
+
+ sess.commit();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ sess.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+ };
+
+ class ConsumerThread extends Thread
+ {
+ final ClientSessionFactory sf;
+
+ volatile ClientSession sess;
+
+ final CountDownLatch latchAlign;
+
+ final CountDownLatch latchStart;
+
+ final boolean rollback;
+
+ ConsumerThread(final ClientSessionFactory sf,
+ final CountDownLatch latchAlign,
+ final CountDownLatch latchStart,
+ final boolean rollback)
+ {
+ this.sf = sf;
+ this.latchAlign = latchAlign;
+ this.latchStart = latchStart;
+ this.rollback = rollback;
+ }
+
+ @Override
+ public void run()
+ {
+ ClientConsumer cons = null;
+ try
+ {
+ latchAlign.countDown();
+ latchStart.await();
+
+ sess = sf.createSession(false, false, false);
+
+ cons = sess.createConsumer(ADDRESS);
+
+ sess.start();
+
+ ClientMessage msg = null;
+
+ while ((msg = cons.receive(1000)) != null)
+ {
+ msg.acknowledge();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ if (rollback)
+ {
+ sess.rollback();
+ cons.close();
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ }
+ };
+
+ this.setUpFailoverServers(false, -1, 512);
+
+ ClientSessionFactory sf = createFailoverFactory();
+ sf.setConsumerWindowSize(-1);
+
+ ClientSession s = sf.createSession(false, true, true);
+
+ s.createQueue(ADDRESS, ADDRESS, true);
+
+ s.close();
+
+ CountDownLatch latchAlign = new CountDownLatch(NTHREADS);
+
+ CountDownLatch latchStart = new CountDownLatch(1);
+
+ ProdThread pthreads[] = new ProdThread[NTHREADS];
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ pthreads[i] = new ProdThread(latchAlign, latchStart, sf);
+ pthreads[i].start();
+ }
+
+ latchAlign.await();
+ latchStart.countDown();
+
+ for (Thread t : pthreads)
+ {
+ t.join();
+ }
+
+ assertEquals(0, errors.get());
+
+ compareQueues(ADDRESS, PROPERTY_KEY, NTHREADS * NMESSAGES);
+
+ ConsumerThread cthreads[] = new ConsumerThread[NTHREADS];
+
+ log.info("********************** Consuming messages ****************************");
+
+ latchAlign = new CountDownLatch(NTHREADS);
+
+ latchStart = new CountDownLatch(1);
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ // 50% of the consumers will close the session without ACKing messages what cause them to be redelivered.
+ // This shouldn't affect delivery on backup
+ cthreads[i] = new ConsumerThread(sf, latchAlign, latchStart, i % 2 == 0);
+ cthreads[i].start();
+ }
+
+ latchAlign.await();
+ latchStart.countDown();
+
+ for (ConsumerThread t : cthreads)
+ {
+ t.join();
+ }
+
+ assertEquals(0, errors.get());
+
+ compareConsumers(ADDRESS, PROPERTY_KEY, NTHREADS / 2);
+
+ for (ConsumerThread t : cthreads)
+ {
+ if (t.sess != null)
+ {
+ t.sess.close();
+ }
+ }
+
+ sf.close();
+
+ compareQueues(ADDRESS, PROPERTY_KEY, NTHREADS * NMESSAGES);
+
+ stopServers();
+ // ClientProducer p = s
+
+ }
+
+ public void testDeliveryOrderOnRedeliveryMultiThread() throws Exception
+ {
+
+ final SimpleString ADDRESS = new SimpleString("TEST");
+ final SimpleString PROPERTY_KEY = new SimpleString("KEY-STR");
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ int NTHREADS = 30;
+ final int NMESSAGES = 1000;
+
+ class ProdThread extends Thread
+ {
+ final CountDownLatch latchAlign;
+
+ final CountDownLatch latchStart;
+
+ final ClientSessionFactory sf;
+
+ ProdThread(final CountDownLatch latchAlign, final CountDownLatch latchStart, final ClientSessionFactory sf)
+ {
+ this.latchAlign = latchAlign;
+ this.latchStart = latchStart;
+ this.sf = sf;
+ }
+
+ @Override
+ public void run()
+ {
+ ClientSession sess = null;
+ try
+ {
+ latchAlign.countDown();
+ latchStart.await();
+
+ sess = sf.createSession(false, true, true);
+
+ ClientProducer prod = sess.createProducer(ADDRESS);
+
+ for (int i = 0; i < NMESSAGES; i++)
+ {
+ ClientMessage msg = createTextMessage(sess, "test" + i, false);
+ msg.putStringProperty(PROPERTY_KEY, RandomUtil.randomSimpleString());
+ prod.send(msg);
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ sess.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+ };
+
+ class ConsumerThread extends Thread
+ {
+ final ClientSessionFactory sf;
+
+ volatile ClientSession sess;
+
+ final CountDownLatch latchAlign;
+
+ final CountDownLatch latchStart;
+
+ final boolean closeSession;
+
+ ConsumerThread(final ClientSessionFactory sf,
+ final CountDownLatch latchAlign,
+ final CountDownLatch latchStart,
+ final boolean closeSession)
+ {
+ this.sf = sf;
+ this.latchAlign = latchAlign;
+ this.latchStart = latchStart;
+ this.closeSession = closeSession;
+ }
+
+ @Override
+ public void run()
+ {
+ ClientConsumer cons = null;
+ try
+ {
+ latchAlign.countDown();
+ latchStart.await();
+
+ sess = sf.createSession(false, true, true);
+
+ cons = sess.createConsumer(ADDRESS);
+
+ sess.start();
+
+ ClientMessage msg = null;
+
+ while ((msg = cons.receive(1000)) != null)
+ {
+ // do not ack. Forcing it to come back to head of queue thorugh cancel & rollback
+ // debug("Received Msg = " + getTextMessage(msg));
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ sess.commit();
+ }
+ catch (MessagingException e)
+ {
+ e.printStackTrace();
+ }
+ if (closeSession)
+ {
+ try
+ {
+ cons.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+ }
+ };
+
+ this.setUpFailoverServers(false, -1, 512);
+
+ ClientSessionFactory sf = createFailoverFactory();
+ sf.setConsumerWindowSize(-1);
+
+ ClientSession s = sf.createSession(false, true, true);
+
+ s.createQueue(ADDRESS, ADDRESS, true);
+
+ s.close();
+
+ CountDownLatch latchAlign = new CountDownLatch(NTHREADS);
+
+ CountDownLatch latchStart = new CountDownLatch(1);
+
+ ProdThread pthreads[] = new ProdThread[NTHREADS];
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ pthreads[i] = new ProdThread(latchAlign, latchStart, sf);
+ pthreads[i].start();
+ }
+
+ latchAlign.await();
+ latchStart.countDown();
+
+ for (Thread t : pthreads)
+ {
+ t.join();
+ }
+
+ assertEquals(0, errors.get());
+
+ compareQueues(ADDRESS, PROPERTY_KEY, NTHREADS * NMESSAGES);
+
+ ConsumerThread cthreads[] = new ConsumerThread[NTHREADS];
+
+ log.info("********************** Consuming messages ****************************");
+
+ latchAlign = new CountDownLatch(NTHREADS);
+
+ latchStart = new CountDownLatch(1);
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ // 50% of the consumers will close the session without ACKing messages what cause them to be redelivered.
+ // This shouldn't affect delivery on backup
+ cthreads[i] = new ConsumerThread(sf, latchAlign, latchStart, i % 2 == 0);
+ cthreads[i].start();
+ }
+
+ latchAlign.await();
+ latchStart.countDown();
+
+ for (ConsumerThread t : cthreads)
+ {
+ t.join();
+ }
+
+ assertEquals(0, errors.get());
+
+ compareConsumers(ADDRESS, PROPERTY_KEY, NTHREADS / 2);
+
+ for (ConsumerThread t : cthreads)
+ {
+ if (t.sess != null)
+ {
+ t.sess.close();
+ }
+ }
+
+ sf.close();
+
+ compareQueues(ADDRESS, PROPERTY_KEY, NTHREADS * NMESSAGES);
+
+ stopServers();
+ // ClientProducer p = s
+
+ }
+
+ /**
+ * Compare if Consumers between Backup and Live server are identical
+ * @param ADDRESS
+ * @param propertyToAssert
+ * @param NTHREADS
+ * @param NMESSAGES
+ * @throws Exception
+ */
+ private void compareConsumers(final SimpleString ADDRESS,
+ final SimpleString propertyToAssert,
+ int expectedNumberOfConsumers) throws Exception
+ {
+ List<QueueBinding> blive = getLocalQueueBindings(liveServer.getPostOffice(), ADDRESS.toString());
+ List<QueueBinding> bbackup = getLocalQueueBindings(backupServer.getPostOffice(), ADDRESS.toString());
+
+ assertEquals(1, blive.size());
+ assertEquals(1, bbackup.size());
+
+ QueueImpl qlive = (QueueImpl)blive.get(0).getQueue();
+ QueueImpl qbackup = (QueueImpl)bbackup.get(0).getQueue();
+
+ assertEquals(expectedNumberOfConsumers, qlive.getConsumerCount());
+
+ assertEquals(expectedNumberOfConsumers, qbackup.getConsumerCount());
+
+ debug("*****************************************************************************************");
+ debug("LiveConsumers:");
+ for (Consumer c : qlive.getConsumers())
+ {
+ ServerConsumerImpl sc = ((ServerConsumerImpl)c);
+ debug("ID: " + sc.getID() +
+ " SessionID = " +
+ sc.getSession().getID() +
+ " ReplicateSession = " +
+ sc.getReplicatedSessionID());
+ }
+
+ debug("*****************************************************************************************");
+ debug("BackupConsumers:");
+ for (Consumer c : qbackup.getConsumers())
+ {
+ ServerConsumerImpl sc = ((ServerConsumerImpl)c);
+ debug("ID: " + sc.getID() +
+ " SessionID = " +
+ sc.getSession().getID() +
+ " ReplicateSession = " +
+ sc.getReplicatedSessionID());
+ }
+
+ for (Consumer c1 : qlive.getConsumers())
+ {
+ ServerConsumerImpl liveConsumer = (ServerConsumerImpl)c1;
+
+ ServerConsumerImpl backupConsumer = null;
+
+ for (Consumer c2 : qbackup.getConsumers())
+ {
+ ServerConsumerImpl tmp2 = (ServerConsumerImpl)c2;
+ if (liveConsumer.getID() == tmp2.getID() && tmp2.getSession().getID() == liveConsumer.getReplicatedSessionID())
+ {
+ backupConsumer = tmp2;
+ break;
+ }
+ }
+
+ assertNotNull("Couldn't find a consumerID=" + liveConsumer.getID() + " on the backup node", backupConsumer);
+
+ long timeout = System.currentTimeMillis() + 5000;
+
+ // This is async, so a timed out check
+ while (System.currentTimeMillis() < timeout && liveConsumer.getDeliveringRefs().size() != backupConsumer.getDeliveringRefs()
+ .size())
+ {
+ Thread.sleep(10);
+ }
+
+ assertEquals("Consumer ID = " + liveConsumer.getID() +
+ " didn't have the same number of deliveries between live and backup node",
+ liveConsumer.getDeliveringRefs().size(),
+ backupConsumer.getDeliveringRefs().size());
+
+ Iterator<MessageReference> iterBackup = backupConsumer.getDeliveringRefs().iterator();
+ for (MessageReference refLive : liveConsumer.getDeliveringRefs())
+ {
+ MessageReference refBackup = iterBackup.next();
+
+ assertEquals(refLive.getMessage().getMessageID(), refBackup.getMessage().getMessageID());
+
+ // debug("Property on live = " + refLive.getMessage().getProperty(propertyToAssert));
+ // debug("Property on backup = " + refBackup.getMessage().getProperty(propertyToAssert));
+
+ assertNotNull(refLive.getMessage().getProperty(propertyToAssert));
+ assertTrue(refLive.getMessage()
+ .getProperty(propertyToAssert)
+ .equals(refBackup.getMessage().getProperty(propertyToAssert)));
+ }
+
+ assertFalse(iterBackup.hasNext());
+ }
+ }
+
+ /**
+ * Compare if a Queue on Backup and Live server are identical
+ * @param ADDRESS
+ * @param propertyToAssert
+ * @param NTHREADS
+ * @param NMESSAGES
+ * @throws Exception
+ */
+ private void compareQueues(final SimpleString ADDRESS,
+ final SimpleString propertyToAssert,
+ int expectedNumberOfMessages) throws Exception
+ {
+ List<QueueBinding> blive = getLocalQueueBindings(liveServer.getPostOffice(), ADDRESS.toString());
+ List<QueueBinding> bbackup = getLocalQueueBindings(backupServer.getPostOffice(), ADDRESS.toString());
+
+ assertEquals(1, blive.size());
+ assertEquals(1, bbackup.size());
+
+ QueueImpl qlive = (QueueImpl)blive.get(0).getQueue();
+ QueueImpl qbackup = (QueueImpl)bbackup.get(0).getQueue();
+
+ assertEquals(expectedNumberOfMessages, qlive.getReferencesList().size());
+
+ assertEquals(expectedNumberOfMessages, qbackup.getReferencesList().size());
+
+ Iterator<MessageReference> iterBackup = qbackup.getReferencesList().iterator();
+
+ for (MessageReference refLive : qlive.getReferencesList())
+ {
+ assertTrue(iterBackup.hasNext());
+ MessageReference refBackup = iterBackup.next();
+
+ assertEquals(refLive.getMessage().getMessageID(), refBackup.getMessage().getMessageID());
+ assertNotNull(refLive.getMessage().getProperty(propertyToAssert));
+ assertEquals(refLive.getMessage().getProperty(propertyToAssert), refBackup.getMessage()
+ .getProperty(propertyToAssert));
+ }
+
+ assertFalse(iterBackup.hasNext());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ class MyHandler implements MessageHandler
+ {
+ final ClientSession session;
+
+ final ClientConsumer consumer;
+
+ volatile boolean started = true;
+
+ final int msgs;
+
+ volatile int receivedMsgs = 0;
+
+ final CountDownLatch latch;
+
+ Throwable failure;
+
+ MyHandler(ClientSessionFactory sf, SimpleString address, final int msgs) throws Exception
+ {
+ this.session = sf.createSession(null, null, false, true, true, false, 0);
+ this.consumer = session.createConsumer(address);
+ consumer.setMessageHandler(this);
+ this.session.start();
+ this.msgs = msgs;
+ latch = new CountDownLatch(msgs);
+ }
+
+ public synchronized void close() throws Exception
+ {
+ session.close();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
+ */
+ public synchronized void onMessage(ClientMessage message)
+ {
+ try
+ {
+ if (!started)
+ {
+ throw new IllegalStateException("Stopped Handler received message");
+ }
+
+ if (receivedMsgs++ == msgs)
+ {
+ debug("done");
+ started = false;
+ session.stop();
+ }
+
+ message.acknowledge();
+
+ if (!started)
+ {
+ latch.countDown();
+ }
+
+ }
+ catch (Throwable e)
+ {
+ this.failure = e;
+ }
+ }
+
+ }
+}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -1,331 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.failover;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.messaging.core.buffers.ChannelBuffers;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.paging.Page;
-import org.jboss.messaging.core.paging.PagedMessage;
-import org.jboss.messaging.core.paging.PagingManager;
-import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- *
- * It validates if the messages are in the same ordering on the page system between the backup and live nodes.
- *
- * This test is valid as long as we want to guarantee strict ordering on both nodes for paged messages between backup and live nodes.
- *
- * If we change this concept anyway this test may become invalid and we would need to delete it.
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class PageOrderingOnBackupTest extends FailoverTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
- public void testPageOrderingLiveAndBackupProducerOnly() throws Exception
- {
- internalTestPageOrderingLiveAndBackup(false);
- }
-
- public void testPageOrderingLiveAndBackupConsume() throws Exception
- {
- internalTestPageOrderingLiveAndBackup(true);
- }
-
- private void internalTestPageOrderingLiveAndBackup(boolean consumeMessages) throws Exception
- {
- final SimpleString threadIDKey = new SimpleString("THREAD_ID");
- final SimpleString sequenceIDKey = new SimpleString("SEQUENCE_ID");
- final SimpleString ADDRESS = new SimpleString("SOME_QUEUE");
-
- final int NUMBER_OF_THREADS = 100;
- final int NUMBER_OF_MESSAGES = 200;
-
- final int NUMBER_OF_HANDLERS = consumeMessages ? NUMBER_OF_THREADS : 0;
-
- setUpFailoverServers(true, 100 * 1024, 50 * 1024);
-
- final ClientSessionFactory factory = createFailoverFactory();
-
- ClientSession session = factory.createSession(false, true, true);
- for (int i = 0; i < NUMBER_OF_THREADS; i++)
- {
- session.createQueue(ADDRESS, ADDRESS.concat("-" + i), true);
- }
- session.close();
-
- MyHandler handlers[] = new MyHandler[NUMBER_OF_HANDLERS];
-
- for (int i = 0; i < handlers.length; i++)
- {
- handlers[i] = new MyHandler(factory, ADDRESS.concat("-" + i), NUMBER_OF_MESSAGES * 10);
- }
-
- final CountDownLatch flagAlign = new CountDownLatch(NUMBER_OF_THREADS);
- final CountDownLatch flagStart = new CountDownLatch(1);
-
- class ProducerThread extends Thread
- {
- Throwable e;
-
- final int threadID;
-
- ProducerThread(int threadID)
- {
- this.threadID = threadID;
- }
-
- public void run()
- {
- try
- {
- ClientSession session = factory.createSession(false, true, true);
- ClientProducer producer = session.createProducer(ADDRESS);
-
- // I want to jinx all this by having everybody start sending at the same time
- flagAlign.countDown();
- flagStart.await();
-
- for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
- {
- ClientMessage msg = session.createClientMessage(true);
- msg.setBody(ChannelBuffers.wrappedBuffer(new byte[512]));
- msg.getProperties().putIntProperty(threadIDKey, this.threadID);
- msg.getProperties().putIntProperty(sequenceIDKey, i);
- producer.send(msg);
- }
-
- session.close();
-
- }
- catch (Throwable e)
- {
- // System.out => Hudson/JUNIT reports
- e.printStackTrace();
- this.e = e;
- }
-
- }
- }
-
- ProducerThread threads[] = new ProducerThread[NUMBER_OF_THREADS];
-
- for (int i = 0; i < threads.length; i++)
- {
- threads[i] = new ProducerThread(i);
- threads[i].start();
- }
-
- assertTrue("Error initializing some of the threads", flagAlign.await(10, TimeUnit.SECONDS));
-
- flagStart.countDown();
-
- for (ProducerThread t : threads)
- {
- t.join();
- }
-
- for (ProducerThread t : threads)
- {
- if (t.e != null)
- {
- throw new Exception("Test Failed", t.e);
- }
- }
-
- Thread.sleep(5000);
-
- for (MyHandler handler : handlers)
- {
- handler.close();
- if (handler.failure != null)
- {
- throw new Exception("Failure on consumer", handler.failure);
- }
- }
-
- PagingManager livePagingManager = liveServer.getPostOffice().getPagingManager();
- PagingManager backupPagingManager = backupServer.getPostOffice().getPagingManager();
-
- TestSupportPageStore livePagingStore = (TestSupportPageStore)livePagingManager.getPageStore(ADDRESS);
- TestSupportPageStore backupPagingStore = (TestSupportPageStore)backupPagingManager.getPageStore(ADDRESS);
-
- System.out.println("Pages: " + livePagingStore.getNumberOfPages() +
- " on backup: " +
- backupPagingStore.getNumberOfPages());
-
-
- if (consumeMessages)
- {
- if (livePagingStore.getNumberOfPages() == backupPagingStore.getNumberOfPages() - 1)
- {
- // The live node may have one extra page in front of the backup
- backupPagingStore.depage();
- }
- }
-
- assertEquals(livePagingStore.getNumberOfPages(), backupPagingStore.getNumberOfPages());
-
- Page livePage = null;
- Page backupPage = null;
-
- while (true)
- {
- livePage = livePagingStore.depage();
-
- if (livePage == null)
- {
- assertNull(backupPagingStore.depage());
- break;
- }
-
- backupPage = backupPagingStore.depage();
-
- assertNotNull(backupPage);
-
- livePage.open();
- backupPage.open();
-
- List<PagedMessage> liveMessages = livePage.read();
- List<PagedMessage> backupMessages = backupPage.read();
-
- livePage.close();
- backupPage.close();
-
- assertEquals(liveMessages.size(), backupMessages.size());
-
- Iterator<PagedMessage> backupIterator = backupMessages.iterator();
-
- for (PagedMessage liveMsg : liveMessages)
- {
- PagedMessage backupMsg = backupIterator.next();
- assertNotNull(backupMsg);
-
- ServerMessage liveSrvMsg = liveMsg.getMessage(null);
- ServerMessage backupSrvMsg = liveMsg.getMessage(null);
-
- assertEquals(liveSrvMsg.getMessageID(), backupSrvMsg.getMessageID());
- assertEquals(liveSrvMsg.getProperty(threadIDKey), backupSrvMsg.getProperty(threadIDKey));
- assertEquals(liveSrvMsg.getProperty(sequenceIDKey), backupSrvMsg.getProperty(sequenceIDKey));
- }
- }
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- class MyHandler implements MessageHandler
- {
- final ClientSession session;
-
- final ClientConsumer consumer;
-
- volatile boolean started = true;
-
- final int msgs;
-
- volatile int receivedMsgs = 0;
-
- final CountDownLatch latch;
-
- Throwable failure;
-
- MyHandler(ClientSessionFactory sf, SimpleString address, final int msgs) throws Exception
- {
- this.session = sf.createSession(null, null, false, true, true, false, 0);
- this.consumer = session.createConsumer(address);
- consumer.setMessageHandler(this);
- this.session.start();
- this.msgs = msgs;
- latch = new CountDownLatch(msgs);
- }
-
- public synchronized void close() throws Exception
- {
- session.close();
- }
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
- */
- public synchronized void onMessage(ClientMessage message)
- {
- try
- {
- if (!started)
- {
- throw new IllegalStateException("Stopped Handler received message");
- }
-
- if (receivedMsgs++ == msgs)
- {
- System.out.println("done");
- started = false;
- session.stop();
- }
-
- message.acknowledge();
-
- if (!started)
- {
- latch.countDown();
- }
-
- }
- catch (Throwable e)
- {
- this.failure = e;
- }
- }
-
- }
-}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -25,6 +25,8 @@
import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -45,12 +47,15 @@
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.QueueBinding;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Messaging;
import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.impl.QueueImpl;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.jms.client.JBossBytesMessage;
import org.jboss.messaging.utils.SimpleString;
@@ -68,7 +73,6 @@
// Constants -----------------------------------------------------
private static final int RECEIVE_TIMEOUT = 20000;
-
final int PAGE_SIZE = 512;
final int MAX_GLOBAL = 40 * PAGE_SIZE;
@@ -87,9 +91,9 @@
protected static final SimpleString ADDRESS_GLOBAL = new SimpleString("FailoverTestAddress");
- protected MessagingServer liveService;
+ protected MessagingServer liveServer;
- protected MessagingServer backupService;
+ protected MessagingServer backupServer;
protected final Map<String, Object> backupParams = new HashMap<String, Object>();
@@ -228,7 +232,7 @@
for (int i = 0; i < numSessions; i++)
{
SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
+
s.deleteQueue(subName);
}
}
@@ -243,9 +247,9 @@
protected void stop() throws Exception
{
- backupService.stop();
+ backupServer.stop();
- liveService.stop();
+ liveServer.stop();
assertEquals(0, InVMRegistry.instance.size());
@@ -386,14 +390,14 @@
backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
backupConf.setPagingGlobalWatermarkSize(pageSize);
- backupService = Messaging.newMessagingServer(backupConf);
+ backupServer = Messaging.newMessagingServer(backupConf);
}
else
{
- backupService = Messaging.newMessagingServer(backupConf, false);
+ backupServer = Messaging.newMessagingServer(backupConf, false);
}
- backupService.start();
+ backupServer.start();
Configuration liveConf = new ConfigurationImpl();
liveConf.setSecurityEnabled(false);
@@ -427,22 +431,22 @@
if (fileBased)
{
- liveService = Messaging.newMessagingServer(liveConf);
+ liveServer = Messaging.newMessagingServer(liveConf);
}
else
{
- liveService = Messaging.newMessagingServer(liveConf, false);
+ liveServer = Messaging.newMessagingServer(liveConf, false);
}
AddressSettings settings = new AddressSettings();
settings.setPageSizeBytes(pageSize);
- liveService.getAddressSettingsRepository().addMatch("#", settings);
- backupService.getAddressSettingsRepository().addMatch("#", settings);
+ liveServer.getAddressSettingsRepository().addMatch("#", settings);
+ backupServer.getAddressSettingsRepository().addMatch("#", settings);
clearData(getTestDir() + "/live");
- liveService.start();
+ liveServer.start();
}
// Private -------------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -57,7 +57,7 @@
private final Logger log = Logger.getLogger(PagingFailoverTest.class);
- final int RECEIVE_TIMEOUT = 25000;
+ final int RECEIVE_TIMEOUT = 50000;
// Attributes ----------------------------------------------------
@@ -81,6 +81,8 @@
int numberOfConsumedMessages = multiThreadConsumer(getNumberOfThreads(), false, false);
assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ System.out.println("Done!");
}
@@ -130,6 +132,10 @@
try
{
ClientSessionFactory sf1 = createFailoverFactory();
+
+ sf1.setBlockOnAcknowledge(true);
+ sf1.setBlockOnNonPersistentSend(true);
+ sf1.setBlockOnPersistentSend(true);
session = sf1.createSession(null, null, false, true, true, false, 0);
@@ -283,6 +289,10 @@
factory = createFailoverFactory();
store = liveServer.getPostOffice().getPagingManager().getPageStore(ADDRESS);
}
+
+ factory.setBlockOnNonPersistentSend(true);
+ factory.setBlockOnAcknowledge(true);
+ factory.setBlockOnPersistentSend(true);
session = factory.createSession(false, true, true, false);
@@ -436,6 +446,10 @@
final PagingStore store = liveServer.getPostOffice().getPagingManager().getPageStore(ADDRESS);
final ClientSessionFactory factory = createFailoverFactory();
+
+ factory.setBlockOnNonPersistentSend(true);
+ factory.setBlockOnAcknowledge(true);
+ factory.setBlockOnPersistentSend(true);
ClientSession session = factory.createSession(false, true, true, false);
try
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PreserveOrderDuringFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PreserveOrderDuringFailoverTest.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PreserveOrderDuringFailoverTest.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -68,6 +68,7 @@
{
for (int i = 0; i < 20; i++)
{
+ log.info("testOrdering # " + i);
setUpFailoverServers(false, -1, -1);
failoverOrderTest();
stopServers();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.server.Messaging;
import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionOperation;
@@ -43,6 +44,9 @@
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -545,5 +549,10 @@
{
latch.countDown();
}
+
+ public Collection<Queue> getDistinctQueues()
+ {
+ return Collections.emptySet();
+ }
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -23,6 +23,7 @@
package org.jboss.messaging.tests.unit.core.postoffice.impl;
import java.io.InputStream;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -275,6 +276,14 @@
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.transaction.Transaction#getDistinctQueues()
+ */
+ public Set<Queue> getDistinctQueues()
+ {
+ return Collections.emptySet();
+ }
+
}
class FakeMessage implements ServerMessage
@@ -1370,6 +1379,20 @@
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.Queue#lock()
+ */
+ public void lockDelivery()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.Queue#unlock()
+ */
+ public void unlockDelivery()
+ {
+ }
+
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -38,11 +38,6 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.Bindings;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
import org.jboss.messaging.core.security.JBMSecurityManager;
@@ -54,7 +49,6 @@
import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
import org.jboss.messaging.jms.client.JBossBytesMessage;
import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.utils.SimpleString;
/**
*
@@ -366,38 +360,6 @@
message.getBody().writeBytes(b);
return message;
}
-
- protected int getMessageCount(final MessagingServer service, final String address) throws Exception
- {
- return getMessageCount(service.getPostOffice(), address);
- }
-
- /**
- * @param address
- * @param postOffice
- * @return
- * @throws Exception
- */
- protected int getMessageCount(final PostOffice postOffice, final String address) throws Exception
- {
- int messageCount;
- messageCount = 0;
-
- Bindings bindings = postOffice.getBindingsForAddress(new SimpleString(address));
-
- for (Binding binding : bindings.getBindings())
- {
- if ((binding instanceof LocalQueueBinding))
- {
- QueueBinding qBinding = (QueueBinding)binding;
-
- messageCount += qBinding.getQueue().getMessageCount();
-
- }
- }
- return messageCount;
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/util/SpawnedVMSupport.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/SpawnedVMSupport.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/util/SpawnedVMSupport.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -71,8 +71,17 @@
{
return spawnVM(className, vmargs, true, args);
}
+
+ public static Process spawnVM(final String className,
+ final String[] vmargs,
+ final boolean logOutput,
+ final String... args) throws Exception
+ {
+ return spawnVM(className, "-Xms512m -Xmx512m ", vmargs, logOutput, args);
+ }
public static Process spawnVM(final String className,
+ final String memoryArgs,
final String[] vmargs,
final boolean logOutput,
final String... args) throws Exception
@@ -81,7 +90,7 @@
sb.append("java").append(' ');
- sb.append("-Xms512m -Xmx512m ");
+ sb.append(memoryArgs);
for (String vmarg : vmargs)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-05-13 15:14:21 UTC (rev 6770)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-05-13 15:16:38 UTC (rev 6771)
@@ -36,6 +36,7 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -53,8 +54,14 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.Bindings;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.QueueBinding;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -737,6 +744,51 @@
return new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
}
+
+ protected int getMessageCount(final MessagingServer service, final String address) throws Exception
+ {
+ return getMessageCount(service.getPostOffice(), address);
+ }
+
+ /**
+ * @param address
+ * @param postOffice
+ * @return
+ * @throws Exception
+ */
+ protected int getMessageCount(final PostOffice postOffice, final String address) throws Exception
+ {
+ int messageCount = 0;
+
+ List<QueueBinding> bindings = getLocalQueueBindings(postOffice, address);
+
+ for (QueueBinding qBinding: bindings)
+ {
+ messageCount += qBinding.getQueue().getMessageCount();
+ }
+
+ return messageCount;
+ }
+
+ protected List<QueueBinding> getLocalQueueBindings(final PostOffice postOffice, final String address) throws Exception
+ {
+ ArrayList<QueueBinding> bindingsFound = new ArrayList<QueueBinding>();
+
+ Bindings bindings = postOffice.getBindingsForAddress(new SimpleString(address));
+
+ for (Binding binding : bindings.getBindings())
+ {
+ if ((binding instanceof LocalQueueBinding))
+ {
+ bindingsFound.add((QueueBinding)binding);
+ }
+ }
+ return bindingsFound;
+ }
+
+
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
More information about the jboss-cvs-commits
mailing list