[jboss-cvs] JBoss Messaging SVN: r7579 - in branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core: server/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 16 09:40:42 EDT 2009


Author: timfox
Date: 2009-07-16 09:40:42 -0400 (Thu, 16 Jul 2009)
New Revision: 7579

Modified:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
Log:
MT replication

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-07-16 13:40:42 UTC (rev 7579)
@@ -12,8 +12,6 @@
 
 package org.jboss.messaging.core.remoting.impl;
 
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -28,7 +26,6 @@
 import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.SimpleIDGenerator;
@@ -323,9 +320,7 @@
    // ----------------------------------------------------
 
    private volatile Thread currentThread;
-
    
-
    public void activate()
    {
       active = true;
@@ -387,99 +382,6 @@
       return currentThread;
    }
    
-//   public void freeze()
-//   {
-//      if (currentThread != null)
-//      {
-//         long start = System.currentTimeMillis();
-//
-//         while (true)
-//         {
-//            Thread thread = currentThread;
-//
-//            if (thread != null)
-//            {              
-//               log.info("waiting for thread to complete");
-//
-//               try
-//               {
-//                  Thread.sleep(1);
-//               }
-//               catch (InterruptedException ignore)
-//               {
-//               }
-//
-//               if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
-//               {
-//                  Exception e = new Exception();
-//                  e.setStackTrace(thread.getStackTrace());
-//                  log.info("Waiting for this thread", e);
-//                  
-//                  JBMThread jthread = null;
-//
-//                  if (thread instanceof JBMThread)
-//                  {
-//                     jthread = (JBMThread)thread;
-//
-//                     //jthread.setLastLockNonStrict();
-//                  }
-//
-//                  if (jthread != null)
-//                  {
-//                     SequencedLock lastLock = jthread.getLastLock();
-//
-//                     log.info("Owner of the lock which this thread is waiting for is ");
-//
-//                     if (lastLock != null)
-//                     {
-//                        log.info("Dumping last lock");
-//
-//                        lastLock.dump();
-//                     }                  
-//                  }
-//
-//                  // log.info("Gonna try a nudge");
-//                  // SequencedLock.dumpLocks();
-//
-//                  throw new IllegalStateException("Timed out waiting for thread to complete");
-//               }
-//            }
-//            else
-//            {
-//               break;
-//            }
-//         }
-//
-//      }
-//      //      
-//      //      
-//      //      
-//      // //TODO check this logic
-//      //      
-//      // log.info("freezing connection");
-//      //      
-//      // JBMThread thread = null;
-//      //      
-//      // if (currentThread != null)
-//      // {
-//      // thread = (JBMThread)currentThread;
-//      //         
-//      // thread.jbmInterrupt();
-//      // }
-//      //            
-//      // // Prevent any more packets being handled on this connection
-//      //
-//      // synchronized (freezeLock)
-//      // {
-//      // frozen = true;
-//      // }
-//      //      
-//      // if (thread != null)
-//      // {
-//      // thread.jbmResetInterrupt();
-//      // }
-//   }
-
    // Package protected
    // ----------------------------------------------------------------------------
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-16 13:40:42 UTC (rev 7579)
@@ -208,12 +208,11 @@
    private int managementConnectorID;
 
    private static AtomicInteger managementConnectorSequence = new AtomicInteger(0);
-   
+
    private ConnectionManager pooledReplicatingConnectionManager;
 
    private ConnectionManager nonPooledReplicatingConnectionManager;
 
-
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -887,7 +886,7 @@
                                                                            0,
                                                                            this.threadPool,
                                                                            this.scheduledPool);
-            
+
             pooledReplicatingConnectionManager.setNeverFail();
 
             nonPooledReplicatingConnectionManager = new ConnectionManagerImpl(null,
@@ -1058,7 +1057,7 @@
                queueDeployer.start();
             }
          }
-         
+
          log.info("Backup server is now activated");
       }
 
@@ -1081,153 +1080,108 @@
    // while freezing is in progress
    private final Lock flock = new ReentrantLock();
 
-   private static final long FREEZE_TIMEOUT = 2000;
+   private static final long FREEZE_TIMEOUT = 5000;
 
    private void freezeBackupConnections()
    {
       flock.lock();
-      
-     // log.info("** freezing backup connections");
 
+      log.info("** freezing backup connections");
+
       synchronized (backupConnections)
       {
          frozen = true;
 
          flock.unlock();
 
-         // Needs to be done in two stages - once set locks non strict
-
          freezeConnections();
 
-         // Wait for threads to exit
-         
-         boolean timedOut = false;
+         //We set a latch on each ReplicationAwareMutex
+         ReplicationAwareMutex.setLatchAll();
 
          long start = System.currentTimeMillis();
-                  
-         outer: for (RemotingConnection rc : backupConnections)
-         {                      
+
+         //We wait for all executing threads to end up on this latch. If they instead end up on a SequencedLock we interrupt them
+         //until they fall through to the other latch
+         
+         for (RemotingConnection rc : backupConnections)
+         {
             while (true)
             {
-               Thread executingThread = rc.getExecutingThread();
-               
+               JBMThread executingThread = (JBMThread)rc.getExecutingThread();
+
                if (executingThread == null)
                {
                   break;
                }
-               
+
+               if (executingThread.isWaitingOnMutex())
+               {
+                  executingThread.setNoReplayOrRecord(0);
+                  
+                  break;
+               }
+
+               if (executingThread.isWaitingOnSequencedLock())
+               {
+                  executingThread.setFrozen();
+
+                  executingThread.interrupt();
+               }
+
                try
                {
                   Thread.sleep(1);
                }
                catch (InterruptedException ignore)
                {
-               }   
-               
+               }
+
                if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
                {
-                  timedOut = true;
-                  
-                  break outer;
+                  throw new IllegalStateException("Timed out waiting for threads to exit or reach latch");
                }
-            }                       
+            }
          }
+
+         //Now we release the latch and wait for all threads to exit
          
-         if (timedOut)
+         ReplicationAwareMutex.setOwnerLatchAll();
+
+         start = System.currentTimeMillis();
+
+         // Wait for everything to exit
+         for (RemotingConnection rc : backupConnections)
          {
-           // log.info("*** timedout waiting for threads to exit");
-            //Now we assert that all remaining threads are waiting for a SequencedLock
-            
-            for (RemotingConnection rc : backupConnections)
+            while (true)
             {
-               Thread executingThread = rc.getExecutingThread();
-               
-               if (executingThread != null & executingThread instanceof JBMThread)
+               JBMThread executingThread = (JBMThread)rc.getExecutingThread();
+
+               if (executingThread == null)
                {
-                  JBMThread jthread = (JBMThread)executingThread;
-                  
-                  if (!jthread.isWaitingOnSequencedLock())
-                  {
-                     String msg = "Thread is not waiting on SequencedLock";
-                     Exception e = new Exception();
-                     e.setStackTrace(jthread.getStackTrace());
-                     log.error(msg, e);
-                     throw new IllegalStateException(msg);
-                  }
-               }                              
-            }
-            
-           // log.info("** all threads waiting on sequenced locks");
-            
-            //Now we can freeze out all the locks, after setting all threads to replay = false
-            //and frozen to true
-         
-            for (RemotingConnection rc : backupConnections)
-            {
-               Thread executingThread = rc.getExecutingThread();
-               
-               if (executingThread != null & executingThread instanceof JBMThread)
+                  break;
+               }
+
+               try
                {
-                  JBMThread jthread = (JBMThread)executingThread;
-                  
-                  jthread.setNoReplayOrRecord();
-                  
-                  jthread.setFrozen();
-               }                              
-            }
-            
-           // log.info("** set all threads to frozen");
-            
-            ReplicationAwareMutex.freezeOutAll();
-         
-            //Now we interrupt all those threads
-            
-            for (RemotingConnection rc : backupConnections)
-            {
-               Thread executingThread = rc.getExecutingThread();
-               
-               if (executingThread != null & executingThread instanceof JBMThread)
+                  Thread.sleep(1);
+               }
+               catch (InterruptedException ignore)
                {
-                  JBMThread jthread = (JBMThread)executingThread;
-                  
-                  jthread.interrupt();
-               }                              
-            }
-            
-            //Now we wait again for the threads to complete
-            
-            start = System.currentTimeMillis();
-                        
-            for (RemotingConnection rc : backupConnections)
-            {                         
-               while (true)
+               }
+
+               if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
                {
-                  Thread executingThread = rc.getExecutingThread();
-                  
-                  if (executingThread == null)
-                  {
-                     break;
-                  }
-                  
-                  try
-                  {
-                     Thread.sleep(1);
-                  }
-                  catch (InterruptedException ignore)
-                  {
-                  }
-                  
-                  if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
-                  {
-                     String msg = "Thread won't freeze out";
-                     Exception e = new Exception();
-                     e.setStackTrace(executingThread.getStackTrace());
-                     log.error(msg, e);
-                     throw new IllegalStateException(msg);
-                  }
+                  Exception e = new Exception();
+                  e.setStackTrace(executingThread.getStackTrace());
+                  log.error("Waiting for this thread " + executingThread, e);
+                  log.error("Replay " + executingThread.isReplay());
+                  throw new IllegalStateException("Timed out waiting for threads to exit");
                }
             }
          }
+
+         ReplicationAwareMutex.clearLatchAll();
       }
    }
 
@@ -1498,7 +1452,7 @@
 
          JBMThread thread = JBMThread.currentThread();
 
-         thread.setNoReplayOrRecord();
+         thread.setNoReplayOrRecord(1);
 
          channel1.send(new RegisterQueueReplicationChannelMessage(queueID));
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-07-16 13:40:42 UTC (rev 7579)
@@ -166,7 +166,7 @@
 
                action.run();
 
-               thread.setNoReplayOrRecord();
+               thread.setNoReplayOrRecord(2);
             }
             else
             {

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java	2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java	2009-07-16 13:40:42 UTC (rev 7579)
@@ -90,7 +90,7 @@
 
             if (queue == null)
             {
-               JBMThread.currentThread().setNoReplayOrRecord();
+               JBMThread.currentThread().setNoReplayOrRecord(3);
                
                Binding binding = postOffice.getBindingByID(queueID);
 
@@ -108,7 +108,7 @@
             
             channel.send(new ReplicationResponseMessage());
             
-            thread.setNoReplayOrRecord();
+            thread.setNoReplayOrRecord(12);
 
             break;
          }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-07-16 13:40:42 UTC (rev 7579)
@@ -173,7 +173,7 @@
             channel.send(new ReplicationResponseMessage());
          }
          
-         thread.setNoReplayOrRecord();
+         thread.setNoReplayOrRecord(4);
          
          checkCloseSessionChannels(packet);         
       }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java	2009-07-16 13:40:42 UTC (rev 7579)
@@ -25,7 +25,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.LockSupport;
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.replication.Replicator;
@@ -47,7 +46,7 @@
       RECORD, REPLAY, NONE;
    }
 
-   private ThreadState state;
+   private volatile ThreadState state;
 
    private List<Triple<Long, Long, Integer>> objectSequences;
 
@@ -57,6 +56,10 @@
    
    private volatile boolean waitingOnSequencedLock;
    
+   private volatile boolean waitingOnMutex;
+   
+   private volatile boolean frozen;
+         
    public boolean isWaitingOnSequencedLock()
    {
       return this.waitingOnSequencedLock;
@@ -66,8 +69,16 @@
    {
       this.waitingOnSequencedLock = waiting;
    }
+      
+   public boolean isWaitingOnMutex()
+   {
+      return this.waitingOnMutex;
+   }
    
-   private volatile boolean frozen;
+   public void setWaitingOnMutex(final boolean waiting)
+   {
+      this.waitingOnMutex = waiting;
+   }
    
    public void setFrozen()
    {
@@ -79,29 +90,6 @@
       return frozen;
    }
 
-   //private volatile SequencedLock lastLock;
-
-//   public void dumpSequences()
-//   {
-//      log.info("Dumping thread sequences, current pos = " + pos + " length is " + objectSequences.size());
-//      int cnt = 0;
-//      for (Triple<Long, Long, Integer> pair : objectSequences)
-//      {
-//         long id = pair.a;
-//         if (id == -1)
-//         {
-//            log.info("[" + cnt++ + ", " + id + "] COUNTER: " + pair.b);
-//         }
-//         else
-//         {
-//            SequencedLock lock = SequencedLock.getLock(id);
-//
-//            log.info("[" + cnt++ + ", " + id + "] " + lock.getName() + ": " + pair.b + ": " + pair.c);
-//         }
-//
-//      }
-//   }
-
    public static JBMThread currentThread()
    {
       return (JBMThread)Thread.currentThread();
@@ -133,6 +121,7 @@
 
    public void setReplay(final List<Triple<Long, Long, Integer>> objectSequences)
    {
+     // log.info(this + " set replay");
       this.objectSequences = objectSequences;
 
       this.state = ThreadState.REPLAY;
@@ -159,10 +148,12 @@
       this.replicator = replicator;
    }
 
-   public void setNoReplayOrRecord()
+   public void setNoReplayOrRecord(int method)
    {
       this.state = ThreadState.NONE;
 
+     // log.info("setting thread " + this + " to no replay " + method);
+      
       //this.strict = true;
    }
 
@@ -201,40 +192,4 @@
       this.replicator = replicator;
    }
 
-//   private boolean strict = true;
-//
-//   public synchronized SequencedLock getLastLock()
-//   {
-//      return this.lastLock;
-//   }
-//
-//   public synchronized void setLastLock(final SequencedLock lock)
-//   {
-//      this.lastLock = lock;
-//
-//      if (!strict)
-//      {
-//         this.lastLock.setNonStrict();
-//      }
-//   }
-//
-//   public synchronized void setLastLockNonStrict()
-//   {
-//      if (lastLock != null)
-//      {
-//         lastLock.setNonStrict();
-//      }
-//
-//      strict = false;
-//   }
-
-   public void jbmPark(final long toWait)
-   {
-      LockSupport.parkNanos(toWait);
-   }
-
-   public synchronized void jbmUnpark()
-   {
-      LockSupport.unpark(this);
-   }
 }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java	2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java	2009-07-16 13:40:42 UTC (rev 7579)
@@ -54,13 +54,15 @@
    private static final AtomicLong idSequence = new AtomicLong(0);
 
    private final ReentrantLock lock;
-   
+
    private volatile Thread unfreezeOwner;
 
    private volatile CountDownLatch freezeLatch;
-   
+
    private static final Set<ReplicationAwareMutex> allMutexes = new WeakHashSet<ReplicationAwareMutex>();
 
+   private volatile CountDownLatch otherLatch;
+
    public ReplicationAwareMutex(final String name, final int initialCount, final boolean debug)
    {
       this.id = idSequence.getAndIncrement();
@@ -70,25 +72,79 @@
       sequencedLock = new SequencedLock(name, initialCount);
 
       counter = new AtomicInteger(initialCount);
-      
+
       synchronized (allMutexes)
       {
          allMutexes.add(this);
       }
    }
-   
-   public static void freezeOutAll()
+
+   public static void setOwnerLatchAll()
    {
       synchronized (allMutexes)
       {
-         for (ReplicationAwareMutex mutex: allMutexes)
+         for (ReplicationAwareMutex mutex : allMutexes)
          {
-            log.info("*** freezing out mutex");
-            mutex.freezeOut();
+            mutex.setOwnerLatch();
          }
+
+         for (ReplicationAwareMutex mutex : allMutexes)
+         {
+            mutex.releaseLatch();
+         }
       }
    }
+
+   public void setOwnerLatch()
+   {
+      if (sequencedLock.getOwner() != null)
+      {
+         freezeLatch = new CountDownLatch(1);
+
+         unfreezeOwner = sequencedLock.getOwner();
+      }
+   }
+
+   public static void setLatchAll()
+   {
+      synchronized (allMutexes)
+      {
+         for (ReplicationAwareMutex mutex : allMutexes)
+         {            
+            mutex.setLatch();
+         }
+      }
+   }
    
+   public static void clearLatchAll()
+   {
+      synchronized (allMutexes)
+      {
+         for (ReplicationAwareMutex mutex : allMutexes)
+         {            
+            mutex.clearLatches();
+         }
+      }
+   }
+
+   public void setLatch()
+   {
+      otherLatch = new CountDownLatch(1);
+   }
+
+   public void releaseLatch()
+   {
+      if (otherLatch != null)
+      {
+         otherLatch.countDown();
+      }
+   }
+   
+   public void clearLatches()
+   {
+      otherLatch = freezeLatch = null;
+   }
+
    public void lock(final int methodID)
    {
       try
@@ -105,16 +161,6 @@
       this.doUnlock();
    }
 
-   public void freezeOut()
-   {
-      if (sequencedLock.getOwner() != null)
-      {
-         freezeLatch = new CountDownLatch(1);
-         
-         unfreezeOwner = sequencedLock.getOwner();
-      }           
-   }
-
    private boolean doLock(long time, TimeUnit unit, int methodID) throws InterruptedException
    {
       JBMThread thread = JBMThread.currentThread();
@@ -130,8 +176,32 @@
          throw new IllegalStateException("Lock is NOT re-entrant!");
       }
 
+      if (otherLatch != null)
+      {
+         thread.setWaitingOnMutex(true);
+        
+         while (true)
+         {
+            try
+            {
+               otherLatch.await();
+            }
+            catch (InterruptedException e)
+            {       
+               //This might get interrupted by mistake when we interrupt the thread thinking it's on the sequenced lock
+               //in which case we just wait again
+               continue;
+            }
+            
+            break;
+         }
+
+         thread.setWaitingOnMutex(false);
+      }
+
       if (thread.isReplay())
       {
+         //log.info("Thread " + thread + " is replay");
          Triple<Long, Long, Integer> pair = thread.getNextSequence();
 
          // // Sanity check
@@ -158,6 +228,11 @@
 
          try
          {
+            if (!thread.isReplay())
+            {
+               throw new IllegalStateException("How can it be non replay?");
+            }
+            
             if (!sequencedLock.lock(sequence, unit.toNanos(time)))
             {
                // dumpLocksWithName(name);
@@ -167,8 +242,7 @@
          {
             if (thread.isFrozen())
             {
-               log.info("** interrupted and retrying");
-               //We retry and this time it will use the standard mutex - this happens on freezing out
+               // We retry and this time it will use the standard mutex - this happens on freezing out
                return doLock(time, unit, methodID);
             }
          }
@@ -181,11 +255,12 @@
       {
          if (unfreezeOwner != null)
          {
-            //There was an original owner on the SequencedLock when we interrupted so we can't allow any other threads to get the lock
-            //until he has returned
+            // There was an original owner on the SequencedLock when we interrupted so we can't allow any other threads
+            // to get the lock
+            // until he has returned
             freezeLatch.await();
          }
-         
+
          boolean ok = lock.tryLock(time, unit);
 
          if (ok)
@@ -203,7 +278,7 @@
          return ok;
       }
    }
-   
+
    private void doUnlock()
    {
       JBMThread thread = JBMThread.currentThread();
@@ -211,9 +286,9 @@
       if (thread == unfreezeOwner)
       {
          // Don't actually unlock this, since we never had the lock - we had the lock on the original SequencedLock
-         
+
          unfreezeOwner = null;
-         
+
          freezeLatch.countDown();
       }
       else

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java	2009-07-16 13:40:42 UTC (rev 7579)
@@ -142,7 +142,7 @@
       
       action.run();
       
-      thread.setNoReplayOrRecord();
+      thread.setNoReplayOrRecord(5);
       
       List<Triple<Long, Long, Integer>> sequences = thread.getSequences();
       

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java	2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java	2009-07-16 13:40:42 UTC (rev 7579)
@@ -29,6 +29,7 @@
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
 
 import org.jboss.messaging.core.logging.Logger;
 
@@ -177,28 +178,6 @@
       // registerLock(this);
    }
 
-   // public void setNonStrict()
-   // {
-   // //log.info(System.identityHashCode(this) + " setting non strict");
-   // if (strictOrder)
-   // {
-   // strictOrder = false;
-   //
-   // QueueEntry entry = peekEntry();
-   //
-   // if (entry != null)
-   // {
-   // // log.info(System.identityHashCode(this) + " setting non strict unparking " + entry.thread);
-   // //LockSupport.unpark(entry.thread);
-   // entry.thread.jbmUnpark();
-   // }
-   // }
-   // }
-   //   
-   // public boolean isStrict()
-   // {
-   // return strictOrder;
-   // }
 
    // TODO parking with a timeout seems to be a lot slower than parking without timeout
    public boolean lock(final long sequence, final long timeout) throws InterruptedException
@@ -213,58 +192,55 @@
 
       long toWait = timeout;
 
-      while (true)
+      try
       {
-         QueueEntry peeked = peekEntry();
-
-         if (peeked == null || // There are higher priority threads
-             peeked.thread != currentThread ||
-             // Next thread is not this one
-             !locked.compareAndSet(false, true)) // Lock is already locked
+         while (true)
          {
-            //currentThread.setLastLock(this);
-
-            currentThread.setWaitingOnSequencedLock(true);
-                        
-            // LockSupport.parkNanos(toWait);
-            // log.info(System.identityHashCode(this) + " parking " + currentThread);
-            currentThread.jbmPark(toWait);
-            
-            currentThread.setWaitingOnSequencedLock(false);
-            
-            if (Thread.interrupted() && currentThread.isFrozen())
+            QueueEntry peeked = peekEntry();
+   
+            if (peeked == null || // There are higher priority threads
+                peeked.thread != currentThread ||
+                !locked.compareAndSet(false, true)) // Lock is already locked
             {
-               log.info("** It's been interrupted");
-               throw new InterruptedException();
+               currentThread.setWaitingOnSequencedLock(true);
+                      
+               LockSupport.parkNanos(toWait);
+               
+               if (Thread.interrupted() && currentThread.isFrozen())
+               {
+                  throw new InterruptedException();
+               }
+   
+               long now = System.nanoTime();
+   
+               toWait -= now - start;
+   
+               if (toWait <= 0)
+               {
+                  log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() +
+                           " expected " +
+                           sequence);
+   
+                  return false;
+               }
+   
+               start = now;
             }
-
-            long now = System.nanoTime();
-
-            toWait -= now - start;
-
-            if (toWait <= 0)
+            else
             {
-               log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() +
-                        " expected " +
-                        sequence);
-
-               return false;
+               break;
             }
-
-            start = now;
          }
-         else
-         {
-            break;
-         }
       }
+      finally
+      {
+         currentThread.setWaitingOnSequencedLock(false);
+      }
 
       queue.remove();
 
       owner = currentThread;
-      
-      
-
+            
       return true;
    }
 
@@ -275,8 +251,6 @@
          throw new IllegalMonitorStateException();
       }
 
-      // log.info(System.identityHashCode(this) + " unlocking");
-
       currentSequence.incrementAndGet();
 
       owner = null;
@@ -287,30 +261,11 @@
 
       if (entry != null)
       {
-         // log.info(System.identityHashCode(this) + " unparking at unlock " + entry.thread);
-         // LockSupport.unpark(entry.thread);
-         entry.thread.jbmUnpark();
+         LockSupport.unpark(entry.thread);
       }
-      // else
-      // {
-      // log.info(System.identityHashCode(this) + " nothing to unpark at unlock");
-      // }
+
    }
 
-   // private QueueEntry peekEntry()
-   // {
-   // QueueEntry entry = queue.peek();
-   //
-   // if (entry != null)
-   // {
-   // if (!strictOrder || entry.sequence == currentSequence.get())
-   // {
-   // return entry;
-   // }
-   // }
-   //
-   // return null;
-   // }
 
    private QueueEntry peekEntry()
    {




More information about the jboss-cvs-commits mailing list