[jboss-cvs] JBoss Messaging SVN: r7661 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/paging and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 4 04:56:07 EDT 2009


Author: timfox
Date: 2009-08-04 04:56:07 -0400 (Tue, 04 Aug 2009)
New Revision: 7661

Modified:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/QueuedWriteManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
MT replication

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -277,9 +277,7 @@
    {
       checkClosed();
 
-     // log.info("sending delete queue");
       channel.sendBlocking(new SessionDeleteQueueMessage(queueName));
-     // log.info("sent delete queue");
    }
 
    public void deleteQueue(final String queueName) throws MessagingException
@@ -678,11 +676,7 @@
 
          closedSent = true;
 
-         // log.info(System.identityHashCode(this) + " session sending close message");
-
-         channel.sendBlocking(new SessionCloseMessage());
-
-         // log.info(System.identityHashCode(this) + " session sent close message");
+         channel.sendBlocking(new SessionCloseMessage());        
       }
       catch (Throwable ignore)
       {
@@ -727,14 +721,10 @@
 
       try
       {
-         // log.info(System.identityHashCode(this) + " session handling failover");
-
           channel.getConnection().freeze();
           
           while (true)
           {
-             // Set<Thread> executingThreads = channel.getConnection().getExecutingThreads();
-
              Thread thread = channel.getConnection().getExecutingThread();
 
              if (thread == null)
@@ -777,8 +767,6 @@
          channel.waitForAllExecutions();
 
          channel.setConnection(backupConnection);
-         
-     //    log.info("unfreezing");
 
          remotingConnection = backupConnection;
 
@@ -786,29 +774,20 @@
          
          channel.setFrozen(false);
          
-         // log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
-
          Packet request = new ReattachSessionMessage(name, lid);
 
          Channel channel1 = backupConnection.getChannel(1);
 
          ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
 
-         // log.info(System.identityHashCode(this) + " got response from reattach session");
-
          if (!response.isRemoved())
-         {
-            // log.info(System.identityHashCode(this) + " found session, server last received command id is " +
-            // response.getLastConfirmedCommandID());
-
+         {            
             channel.replayCommands(response.getLastConfirmedCommandID());
 
             ok = true;
          }
          else
          {
-            // log.info(System.identityHashCode(this) + " didn't find session, closed sent " + closedSent);
-
             if (closedSent)
             {
                // a session re-attach may fail, if the session close was sent before failover started, hit the server,

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingManager.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingManager.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -69,6 +69,8 @@
 
    /** To return the PageStore associated with the address */
    PagingStore getPageStore(SimpleString address) throws Exception;
+   
+   PagingStore getPageStoreNoCreate(SimpleString address) throws Exception;
 
    /** An injection point for the PostOffice to inject itself */
    void setPostOffice(PostOffice postOffice);

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingStore.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/PagingStore.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -81,4 +81,6 @@
     * @throws Exception 
     */
    void addSize(long memoryEstimate) throws Exception;
+   
+   long getTotalPaged();
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -179,6 +179,11 @@
 
       return store;
    }
+   
+   public PagingStore getPageStoreNoCreate(final SimpleString storeName) throws Exception
+   {
+      return stores.get(storeName);
+   }
 
    /** this will be set by the postOffice itself.
     *  There is no way to set this on the constructor as the PagingManager is constructed before the postOffice.

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -46,6 +46,7 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.ChannelImpl;
@@ -111,7 +112,9 @@
    private final AtomicLong sizeInBytes = new AtomicLong();
 
    private final AtomicBoolean depaging = new AtomicBoolean(false);
-
+   
+   private final AtomicLong totalPaged = new AtomicLong(0);
+   
    private volatile int numberOfPages;
 
    private volatile int firstPageId;
@@ -142,7 +145,7 @@
    {
       log.trace(message);
    }
-
+   
    // Constructors --------------------------------------------------
 
    public PagingStoreImpl(final PagingManager pagingManager,
@@ -192,13 +195,23 @@
          channelManager.putChannel(channel);
       }
 
-      RemotingConnection connection = server.getPooledReplicatingConnection();
+      RemotingConnection replicatingConnection = server.getPooledReplicatingConnection();
 
-      if (connection != null)
+      if (replicatingConnection != null)
       {
-         Channel replicatingChannel = new ChannelImpl(id, connection);
+         Channel replicatingChannel = new ChannelImpl(id, replicatingConnection);
+         
+         replicatingConnection.putChannel(replicatingChannel);
 
          replicator = new ReplicatorImpl("paging-store-" + storeName, replicatingChannel);
+         
+         replicatingChannel.setHandler(new ChannelHandler()
+         {
+            public void handlePacket(Packet packet)
+            {
+               replicator.replicationResponseReceived();
+            }
+         });
       }
       else
       {
@@ -213,6 +226,11 @@
    // Public --------------------------------------------------------
 
    // PagingStore implementation ------------------------------------
+   
+   public long getTotalPaged()
+   {
+      return this.totalPaged.get();
+   }
 
    public long getAddressSize()
    {
@@ -423,6 +441,8 @@
 
                // openNewPage will set currentPageSize to zero, we need to set it again
                currentPageSize.addAndGet(bytesToWrite);
+               
+               totalPaged.addAndGet(bytesToWrite);
             }
             finally
             {
@@ -1095,6 +1115,7 @@
 
    // Inner classes -------------------------------------------------
 
+   
    private class DepageRunnable implements Runnable
    {
       private final Executor followingExecutor;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -43,5 +43,5 @@
     */
    Page depage() throws Exception;
 
-   void forceAnotherPage() throws Exception;
+   void forceAnotherPage() throws Exception;     
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -251,6 +251,8 @@
    {            
       long id = idGenerator.generateID();
       
+     // log.info(System.identityHashCode(this) + " generated " + id, new Exception());
+      
       return id;
    }
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -383,6 +383,8 @@
          Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
 
          DuplicateIDCache cache = null;
+         
+         //log.info("duplicateID is " + duplicateID + " on backup " + backup);
 
          if (duplicateID != null)
          {
@@ -416,6 +418,8 @@
 
          boolean startedTx = false;
 
+        // log.info("Cache is " + cache + " on backup " + backup);
+         
          if (cache != null)
          {
             if (tx == null)
@@ -435,8 +439,14 @@
          {
             if (pagingManager.page(message, true))
             {
+               //log.info("message " + message.getMessageID() + " was paged on backup " + this.backup);
+               
                return;
             }
+//            else
+//            {
+//               log.info("message " + message.getMessageID() + " was not paged on backup " + this.backup);
+//            }
          }
          else
          {

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/QueuedWriteManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/QueuedWriteManager.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/QueuedWriteManager.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -140,7 +140,6 @@
                   count++;
                }
             }
-
          }
          else if (qw.done && iter == null)
          {
@@ -150,7 +149,7 @@
          if (send)
          {
             queuedWrites.remove();
-
+  
             channel.send(qw.packet);
          }
          else

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -710,6 +710,8 @@
       {
          throw new IllegalArgumentException("node id is null");
       }
+      
+      log.info("initialising backup with " + liveUniqueID);
 
       synchronized (initialiseLock)
       {
@@ -730,7 +732,7 @@
          {
             initialised = false;
 
-            throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID +
+            throw new IllegalStateException("Live and backup sequences different (" + liveUniqueID +
                                             ":" +
                                             backupID +
                                             "). You're probably trying to restart a live backup pair after a crash");

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -90,9 +90,9 @@
             {
                log.error("Failed to read page", e);
             }
-            
+                        
             channel.send(new ReplicationResponseMessage());
-            
+                        
             thread.setNoReplayOrRecord(12);
 
             break;

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -528,8 +528,6 @@
 
       Packet response = null;
       
-     // log.info("** handling create queue on backup " + this.server.getConfiguration().isBackup());
-
       try
       {
          if (durable)
@@ -746,8 +744,6 @@
       }
       catch (Exception e)
       {
-         log.error("Failed to acknowledge", e);
-
          if (packet.isRequiresResponse())
          {
             if (e instanceof MessagingException)
@@ -761,12 +757,10 @@
          }
       }
 
-      // //channel.confirm(packet);
-
       if (response != null)
       {
          channel.send(response);
-      }
+      }            
    }
 
    public void handleExpired(final SessionExpiredMessage packet)
@@ -1580,8 +1574,6 @@
       
       channel.setFrozen(false);
 
-      //log.info("telling channel to replay commands up to " + lastConfirmedCommandID);
-      
       channel.replayCommands(lastConfirmedCommandID);
 
       if (wasStarted)

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -76,6 +76,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.core.server.replication.ReplicableAction;
 import org.jboss.messaging.core.server.replication.Replicator;
@@ -141,10 +142,6 @@
 
       if (config.isBackup())
       {
-         // log.info(System.identityHashCode(this) + " inv on backup");
-         
-         //log.info("Received packet " + packet + " on backup");
-
          JBMThread thread = JBMThread.currentThread();
          
          if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
@@ -164,9 +161,7 @@
             
             thread.setNoReplayOrRecord(4);
          }
-
-         
-
+        
          checkCloseSessionChannels(packet);
       }
       else
@@ -235,9 +230,6 @@
              
                sequences = msg.getSequences();
 
-             //  log.info("session got sequences");
-             //  dumpSequences(sequences);
-
                break;
             }
             case SESS_CREATECONSUMER:
@@ -395,8 +387,16 @@
             }
             case SESS_SEND:
             {
-               SessionSendMessage message = (SessionSendMessage)packet;
+               //TODO - we need to make a copy so the message we replicate is the exact same one we received
+               //paging would add a dup id header which would make it different on backup
+               //this could otherwise cause locks to be obtained in a different order on backup after replication
+               SessionSendMessage message = (SessionSendMessage)packet;  
+               
+               ServerMessage msg = message.getServerMessage().copy();
+               packet = new SessionSendMessage(msg, message.isRequiresResponse());
+               
                session.handleSend(message);
+
                break;
             }
             case SESS_SEND_LARGE:

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -195,5 +195,14 @@
    {
       this.replicator = replicator;
    }
+   
+   public void dumpSequences()
+   {
+      log.info("Sequences size is " + objectSequences.size());
+      for (Triple<Long, Long, Integer> sequence: objectSequences)
+      {
+         log.info(sequence.a + ": " + sequence.b);
+      }
+   }
 
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -402,13 +402,13 @@
    // For debug
    private Set<Thread> owners = new ConcurrentHashSet<Thread>();
    
-   private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
-   {
-      log.info("Sequences size is " + sequences.size());
-      for (Triple<Long, Long, Integer> sequence: sequences)
-      {
-         log.info(sequence.a + ": " + sequence.b);
-      }
-   }
+//   private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+//   {
+//      log.info("Sequences size is " + sequences.size());
+//      for (Triple<Long, Long, Integer> sequence: sequences)
+//      {
+//         log.info(sequence.a + ": " + sequence.b);
+//      }
+//   }
 
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -78,6 +78,10 @@
          //Sanity check
          if (pair.a != -1)
          {
+            log.error("Sequences in wrong order, " + pair.a);
+            
+            jthread.dumpSequences();
+                        
             throw new IllegalStateException("Sequences in wrong order");
          }
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -147,8 +147,7 @@
 //      dumpSequences(sequences);
       
      // log.info("replicating packet " + action.getPacket());
-      
-      
+            
       // We then send the sequences to the backup
       
       WaitingChannelsHolder holder = new WaitingChannelsHolder();
@@ -157,9 +156,7 @@
       holder.sentPacket = action.getPacket();
       
       waitingChannelsQueue.add(holder);      
-      
-      
-
+            
       Packet packet = new ReplicateLockSequenceMessage(id, sequences);
 
       replicatingChannel.send(packet);

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -34,6 +34,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
@@ -55,8 +56,9 @@
  */
 public class FailoverTestBase extends ServiceTestBase
 {
-
    // Constants -----------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(FailoverTestBase.class);
 
    // Attributes ----------------------------------------------------
 
@@ -104,6 +106,8 @@
                                        final long maxGlobalSize,
                                        final int pageSize) throws Exception
    {
+      log.info("Deleting dir " + getTestDir());
+      
       deleteDirectory(new File(getTestDir()));
 
       Configuration backupConf = new ConfigurationImpl();

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-08-03 19:11:48 UTC (rev 7660)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-08-04 08:56:07 UTC (rev 7661)
@@ -69,7 +69,6 @@
 
    // Public --------------------------------------------------------
 
-   
    public void testMultithreadFailoverReplicationOnly() throws Throwable
    {
       setUpFileBased(getMaxGlobal(), getPageSize());
@@ -112,7 +111,6 @@
       assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
 
    }
-
    
    public void testFailoverOnPaging() throws Exception
    {
@@ -180,7 +178,6 @@
 
          for (int i = 0; i < numMessages; i++)
          {
-
             if (fail && i == numMessages / 2)
             {
                conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
@@ -258,8 +255,17 @@
       conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
 
    }
-
-
+   
+   protected void tearDown() throws Exception
+   {
+      //Make sure paging actually occurred!
+      
+      assertTrue(liveServer.getPostOffice().getPagingManager().getPageStoreNoCreate(ADDRESS).getTotalPaged() > 0);
+      assertTrue(backupServer.getPostOffice().getPagingManager().getPageStoreNoCreate(ADDRESS).getTotalPaged() > 0);
+      
+      super.tearDown();
+   }
+   
    // Private -------------------------------------------------------
    
    /**
@@ -282,12 +288,12 @@
          if (connectedOnBackup)
          {
             factory = createBackupFactory();
-            store = backupServer.getPostOffice().getPagingManager().getPageStore(ADDRESS);
+            store = backupServer.getPostOffice().getPagingManager().getPageStoreNoCreate(ADDRESS);
          }
          else
          {
             factory = createFailoverFactory();
-            store = liveServer.getPostOffice().getPagingManager().getPageStore(ADDRESS);
+            store = liveServer.getPostOffice().getPagingManager().getPageStoreNoCreate(ADDRESS);
          }
          
          factory.setBlockOnNonPersistentSend(true);
@@ -443,8 +449,6 @@
    {
 
       final AtomicInteger numberOfMessages = new AtomicInteger(0);
-      final PagingStore store = liveServer.getPostOffice().getPagingManager().getPageStore(ADDRESS);
-
       final ClientSessionFactory factory = createFailoverFactory();
       
       factory.setBlockOnNonPersistentSend(true);
@@ -485,26 +489,30 @@
 
                      started = true;
                      startFlag.await();
-
-                     while (!store.isPaging())
+                                                              
+                     while (true)
                      {
-
                         ClientMessage msg = session.createClientMessage(true);
 
                         producer.send(msg);
                         numberOfMessages.incrementAndGet();
+                        
+                        PagingStore store = liveServer.getPostOffice().getPagingManager().getPageStoreNoCreate(ADDRESS);
+                        
+                        if (store != null && store.isPaging())
+                        {
+                           break;
+                        }
                      }
 
                      flagPaging.countDown();
 
                      for (int i = 0; i < 100; i++)
                      {
-
                         ClientMessage msg = session.createClientMessage(true);
 
                         producer.send(msg);
                         numberOfMessages.incrementAndGet();
-
                      }
 
                   }




More information about the jboss-cvs-commits mailing list