[jboss-cvs] JBoss Messaging SVN: r7661 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/paging and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 4 04:56:07 EDT 2009
Author: timfox
Date: 2009-08-04 04:56:07 -0400 (Tue, 04 Aug 2009)
New Revision: 7661
Modified:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/QueuedWriteManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
MT replication
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -277,9 +277,7 @@
{
checkClosed();
- // log.info("sending delete queue");
channel.sendBlocking(new SessionDeleteQueueMessage(queueName));
- // log.info("sent delete queue");
}
public void deleteQueue(final String queueName) throws MessagingException
@@ -678,11 +676,7 @@
closedSent = true;
- // log.info(System.identityHashCode(this) + " session sending close message");
-
- channel.sendBlocking(new SessionCloseMessage());
-
- // log.info(System.identityHashCode(this) + " session sent close message");
+ channel.sendBlocking(new SessionCloseMessage());
}
catch (Throwable ignore)
{
@@ -727,14 +721,10 @@
try
{
- // log.info(System.identityHashCode(this) + " session handling failover");
-
channel.getConnection().freeze();
while (true)
{
- // Set<Thread> executingThreads = channel.getConnection().getExecutingThreads();
-
Thread thread = channel.getConnection().getExecutingThread();
if (thread == null)
@@ -777,8 +767,6 @@
channel.waitForAllExecutions();
channel.setConnection(backupConnection);
-
- // log.info("unfreezing");
remotingConnection = backupConnection;
@@ -786,29 +774,20 @@
channel.setFrozen(false);
- // log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
-
Packet request = new ReattachSessionMessage(name, lid);
Channel channel1 = backupConnection.getChannel(1);
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
- // log.info(System.identityHashCode(this) + " got response from reattach session");
-
if (!response.isRemoved())
- {
- // log.info(System.identityHashCode(this) + " found session, server last received command id is " +
- // response.getLastConfirmedCommandID());
-
+ {
channel.replayCommands(response.getLastConfirmedCommandID());
ok = true;
}
else
{
- // log.info(System.identityHashCode(this) + " didn't find session, closed sent " + closedSent);
-
if (closedSent)
{
// a session re-attach may fail, if the session close was sent before failover started, hit the server,
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingManager.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingManager.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -69,6 +69,8 @@
/** To return the PageStore associated with the address */
PagingStore getPageStore(SimpleString address) throws Exception;
+
+ PagingStore getPageStoreNoCreate(SimpleString address) throws Exception;
/** An injection point for the PostOffice to inject itself */
void setPostOffice(PostOffice postOffice);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingStore.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingStore.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -81,4 +81,6 @@
* @throws Exception
*/
void addSize(long memoryEstimate) throws Exception;
+
+ long getTotalPaged();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -179,6 +179,11 @@
return store;
}
+
+ public PagingStore getPageStoreNoCreate(final SimpleString storeName) throws Exception
+ {
+ return stores.get(storeName);
+ }
/** this will be set by the postOffice itself.
* There is no way to set this on the constructor as the PagingManager is constructed before the postOffice.
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -46,6 +46,7 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.ChannelImpl;
@@ -111,7 +112,9 @@
private final AtomicLong sizeInBytes = new AtomicLong();
private final AtomicBoolean depaging = new AtomicBoolean(false);
-
+
+ private final AtomicLong totalPaged = new AtomicLong(0);
+
private volatile int numberOfPages;
private volatile int firstPageId;
@@ -142,7 +145,7 @@
{
log.trace(message);
}
-
+
// Constructors --------------------------------------------------
public PagingStoreImpl(final PagingManager pagingManager,
@@ -192,13 +195,23 @@
channelManager.putChannel(channel);
}
- RemotingConnection connection = server.getPooledReplicatingConnection();
+ RemotingConnection replicatingConnection = server.getPooledReplicatingConnection();
- if (connection != null)
+ if (replicatingConnection != null)
{
- Channel replicatingChannel = new ChannelImpl(id, connection);
+ Channel replicatingChannel = new ChannelImpl(id, replicatingConnection);
+
+ replicatingConnection.putChannel(replicatingChannel);
replicator = new ReplicatorImpl("paging-store-" + storeName, replicatingChannel);
+
+ replicatingChannel.setHandler(new ChannelHandler()
+ {
+ public void handlePacket(Packet packet)
+ {
+ replicator.replicationResponseReceived();
+ }
+ });
}
else
{
@@ -213,6 +226,11 @@
// Public --------------------------------------------------------
// PagingStore implementation ------------------------------------
+
+ public long getTotalPaged()
+ {
+ return this.totalPaged.get();
+ }
public long getAddressSize()
{
@@ -423,6 +441,8 @@
// openNewPage will set currentPageSize to zero, we need to set it again
currentPageSize.addAndGet(bytesToWrite);
+
+ totalPaged.addAndGet(bytesToWrite);
}
finally
{
@@ -1095,6 +1115,7 @@
// Inner classes -------------------------------------------------
+
private class DepageRunnable implements Runnable
{
private final Executor followingExecutor;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -43,5 +43,5 @@
*/
Page depage() throws Exception;
- void forceAnotherPage() throws Exception;
+ void forceAnotherPage() throws Exception;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -251,6 +251,8 @@
{
long id = idGenerator.generateID();
+ // log.info(System.identityHashCode(this) + " generated " + id, new Exception());
+
return id;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -383,6 +383,8 @@
Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
DuplicateIDCache cache = null;
+
+ //log.info("duplicateID is " + duplicateID + " on backup " + backup);
if (duplicateID != null)
{
@@ -416,6 +418,8 @@
boolean startedTx = false;
+ // log.info("Cache is " + cache + " on backup " + backup);
+
if (cache != null)
{
if (tx == null)
@@ -435,8 +439,14 @@
{
if (pagingManager.page(message, true))
{
+ //log.info("message " + message.getMessageID() + " was paged on backup " + this.backup);
+
return;
}
+// else
+// {
+// log.info("message " + message.getMessageID() + " was not paged on backup " + this.backup);
+// }
}
else
{
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/QueuedWriteManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/QueuedWriteManager.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/QueuedWriteManager.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -140,7 +140,6 @@
count++;
}
}
-
}
else if (qw.done && iter == null)
{
@@ -150,7 +149,7 @@
if (send)
{
queuedWrites.remove();
-
+
channel.send(qw.packet);
}
else
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -710,6 +710,8 @@
{
throw new IllegalArgumentException("node id is null");
}
+
+ log.info("initialising backup with " + liveUniqueID);
synchronized (initialiseLock)
{
@@ -730,7 +732,7 @@
{
initialised = false;
- throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID +
+ throw new IllegalStateException("Live and backup sequences different (" + liveUniqueID +
":" +
backupID +
"). You're probably trying to restart a live backup pair after a crash");
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -90,9 +90,9 @@
{
log.error("Failed to read page", e);
}
-
+
channel.send(new ReplicationResponseMessage());
-
+
thread.setNoReplayOrRecord(12);
break;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -528,8 +528,6 @@
Packet response = null;
- // log.info("** handling create queue on backup " + this.server.getConfiguration().isBackup());
-
try
{
if (durable)
@@ -746,8 +744,6 @@
}
catch (Exception e)
{
- log.error("Failed to acknowledge", e);
-
if (packet.isRequiresResponse())
{
if (e instanceof MessagingException)
@@ -761,12 +757,10 @@
}
}
- // //channel.confirm(packet);
-
if (response != null)
{
channel.send(response);
- }
+ }
}
public void handleExpired(final SessionExpiredMessage packet)
@@ -1580,8 +1574,6 @@
channel.setFrozen(false);
- //log.info("telling channel to replay commands up to " + lastConfirmedCommandID);
-
channel.replayCommands(lastConfirmedCommandID);
if (wasStarted)
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -76,6 +76,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
@@ -141,10 +142,6 @@
if (config.isBackup())
{
- // log.info(System.identityHashCode(this) + " inv on backup");
-
- //log.info("Received packet " + packet + " on backup");
-
JBMThread thread = JBMThread.currentThread();
if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
@@ -164,9 +161,7 @@
thread.setNoReplayOrRecord(4);
}
-
-
-
+
checkCloseSessionChannels(packet);
}
else
@@ -235,9 +230,6 @@
sequences = msg.getSequences();
- // log.info("session got sequences");
- // dumpSequences(sequences);
-
break;
}
case SESS_CREATECONSUMER:
@@ -395,8 +387,16 @@
}
case SESS_SEND:
{
- SessionSendMessage message = (SessionSendMessage)packet;
+ //TODO - we need to make a copy so the message we replicate is the exact same one we received
+ //paging would add a dup id header which would make it different on backup
+ //this could otherwise cause locks to be obtained in a different order on backup after replication
+ SessionSendMessage message = (SessionSendMessage)packet;
+
+ ServerMessage msg = message.getServerMessage().copy();
+ packet = new SessionSendMessage(msg, message.isRequiresResponse());
+
session.handleSend(message);
+
break;
}
case SESS_SEND_LARGE:
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -195,5 +195,14 @@
{
this.replicator = replicator;
}
+
+ public void dumpSequences()
+ {
+ log.info("Sequences size is " + objectSequences.size());
+ for (Triple<Long, Long, Integer> sequence: objectSequences)
+ {
+ log.info(sequence.a + ": " + sequence.b);
+ }
+ }
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -402,13 +402,13 @@
// For debug
private Set<Thread> owners = new ConcurrentHashSet<Thread>();
- private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
- {
- log.info("Sequences size is " + sequences.size());
- for (Triple<Long, Long, Integer> sequence: sequences)
- {
- log.info(sequence.a + ": " + sequence.b);
- }
- }
+// private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+// {
+// log.info("Sequences size is " + sequences.size());
+// for (Triple<Long, Long, Integer> sequence: sequences)
+// {
+// log.info(sequence.a + ": " + sequence.b);
+// }
+// }
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -78,6 +78,10 @@
//Sanity check
if (pair.a != -1)
{
+ log.error("Sequences in wrong order, " + pair.a);
+
+ jthread.dumpSequences();
+
throw new IllegalStateException("Sequences in wrong order");
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -147,8 +147,7 @@
// dumpSequences(sequences);
// log.info("replicating packet " + action.getPacket());
-
-
+
// We then send the sequences to the backup
WaitingChannelsHolder holder = new WaitingChannelsHolder();
@@ -157,9 +156,7 @@
holder.sentPacket = action.getPacket();
waitingChannelsQueue.add(holder);
-
-
-
+
Packet packet = new ReplicateLockSequenceMessage(id, sequences);
replicatingChannel.send(packet);
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -34,6 +34,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
@@ -55,8 +56,9 @@
*/
public class FailoverTestBase extends ServiceTestBase
{
-
// Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(FailoverTestBase.class);
// Attributes ----------------------------------------------------
@@ -104,6 +106,8 @@
final long maxGlobalSize,
final int pageSize) throws Exception
{
+ log.info("Deleting dir " + getTestDir());
+
deleteDirectory(new File(getTestDir()));
Configuration backupConf = new ConfigurationImpl();
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-08-04 08:56:07 UTC (rev 7661)
@@ -69,7 +69,6 @@
// Public --------------------------------------------------------
-
public void testMultithreadFailoverReplicationOnly() throws Throwable
{
setUpFileBased(getMaxGlobal(), getPageSize());
@@ -112,7 +111,6 @@
assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
}
-
public void testFailoverOnPaging() throws Exception
{
@@ -180,7 +178,6 @@
for (int i = 0; i < numMessages; i++)
{
-
if (fail && i == numMessages / 2)
{
conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
@@ -258,8 +255,17 @@
conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
}
-
-
+
+ protected void tearDown() throws Exception
+ {
+ //Make sure paging actually occurred!
+
+ assertTrue(liveServer.getPostOffice().getPagingManager().getPageStoreNoCreate(ADDRESS).getTotalPaged() > 0);
+ assertTrue(backupServer.getPostOffice().getPagingManager().getPageStoreNoCreate(ADDRESS).getTotalPaged() > 0);
+
+ super.tearDown();
+ }
+
// Private -------------------------------------------------------
/**
@@ -282,12 +288,12 @@
if (connectedOnBackup)
{
factory = createBackupFactory();
- store = backupServer.getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ store = backupServer.getPostOffice().getPagingManager().getPageStoreNoCreate(ADDRESS);
}
else
{
factory = createFailoverFactory();
- store = liveServer.getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ store = liveServer.getPostOffice().getPagingManager().getPageStoreNoCreate(ADDRESS);
}
factory.setBlockOnNonPersistentSend(true);
@@ -443,8 +449,6 @@
{
final AtomicInteger numberOfMessages = new AtomicInteger(0);
- final PagingStore store = liveServer.getPostOffice().getPagingManager().getPageStore(ADDRESS);
-
final ClientSessionFactory factory = createFailoverFactory();
factory.setBlockOnNonPersistentSend(true);
@@ -485,26 +489,30 @@
started = true;
startFlag.await();
-
- while (!store.isPaging())
+
+ while (true)
{
-
ClientMessage msg = session.createClientMessage(true);
producer.send(msg);
numberOfMessages.incrementAndGet();
+
+ PagingStore store = liveServer.getPostOffice().getPagingManager().getPageStoreNoCreate(ADDRESS);
+
+ if (store != null && store.isPaging())
+ {
+ break;
+ }
}
flagPaging.countDown();
for (int i = 0; i < 100; i++)
{
-
ClientMessage msg = session.createClientMessage(true);
producer.send(msg);
numberOfMessages.incrementAndGet();
-
}
}
More information about the jboss-cvs-commits
mailing list