[jboss-cvs] JBoss Messaging SVN: r7579 - in branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core: server/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 16 09:40:42 EDT 2009
Author: timfox
Date: 2009-07-16 09:40:42 -0400 (Thu, 16 Jul 2009)
New Revision: 7579
Modified:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
Log:
MT replication
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-16 13:40:42 UTC (rev 7579)
@@ -12,8 +12,6 @@
package org.jboss.messaging.core.remoting.impl;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -28,7 +26,6 @@
import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.SimpleIDGenerator;
@@ -323,9 +320,7 @@
// ----------------------------------------------------
private volatile Thread currentThread;
-
-
public void activate()
{
active = true;
@@ -387,99 +382,6 @@
return currentThread;
}
-// public void freeze()
-// {
-// if (currentThread != null)
-// {
-// long start = System.currentTimeMillis();
-//
-// while (true)
-// {
-// Thread thread = currentThread;
-//
-// if (thread != null)
-// {
-// log.info("waiting for thread to complete");
-//
-// try
-// {
-// Thread.sleep(1);
-// }
-// catch (InterruptedException ignore)
-// {
-// }
-//
-// if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
-// {
-// Exception e = new Exception();
-// e.setStackTrace(thread.getStackTrace());
-// log.info("Waiting for this thread", e);
-//
-// JBMThread jthread = null;
-//
-// if (thread instanceof JBMThread)
-// {
-// jthread = (JBMThread)thread;
-//
-// //jthread.setLastLockNonStrict();
-// }
-//
-// if (jthread != null)
-// {
-// SequencedLock lastLock = jthread.getLastLock();
-//
-// log.info("Owner of the lock which this thread is waiting for is ");
-//
-// if (lastLock != null)
-// {
-// log.info("Dumping last lock");
-//
-// lastLock.dump();
-// }
-// }
-//
-// // log.info("Gonna try a nudge");
-// // SequencedLock.dumpLocks();
-//
-// throw new IllegalStateException("Timed out waiting for thread to complete");
-// }
-// }
-// else
-// {
-// break;
-// }
-// }
-//
-// }
-// //
-// //
-// //
-// // //TODO check this logic
-// //
-// // log.info("freezing connection");
-// //
-// // JBMThread thread = null;
-// //
-// // if (currentThread != null)
-// // {
-// // thread = (JBMThread)currentThread;
-// //
-// // thread.jbmInterrupt();
-// // }
-// //
-// // // Prevent any more packets being handled on this connection
-// //
-// // synchronized (freezeLock)
-// // {
-// // frozen = true;
-// // }
-// //
-// // if (thread != null)
-// // {
-// // thread.jbmResetInterrupt();
-// // }
-// }
-
// Package protected
// ----------------------------------------------------------------------------
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-16 13:40:42 UTC (rev 7579)
@@ -208,12 +208,11 @@
private int managementConnectorID;
private static AtomicInteger managementConnectorSequence = new AtomicInteger(0);
-
+
private ConnectionManager pooledReplicatingConnectionManager;
private ConnectionManager nonPooledReplicatingConnectionManager;
-
// Constructors
// ---------------------------------------------------------------------------------
@@ -887,7 +886,7 @@
0,
this.threadPool,
this.scheduledPool);
-
+
pooledReplicatingConnectionManager.setNeverFail();
nonPooledReplicatingConnectionManager = new ConnectionManagerImpl(null,
@@ -1058,7 +1057,7 @@
queueDeployer.start();
}
}
-
+
log.info("Backup server is now activated");
}
@@ -1081,153 +1080,108 @@
// while freezing is in progress
private final Lock flock = new ReentrantLock();
- private static final long FREEZE_TIMEOUT = 2000;
+ private static final long FREEZE_TIMEOUT = 5000;
private void freezeBackupConnections()
{
flock.lock();
-
- // log.info("** freezing backup connections");
+ log.info("** freezing backup connections");
+
synchronized (backupConnections)
{
frozen = true;
flock.unlock();
- // Needs to be done in two stages - once set locks non strict
-
freezeConnections();
- // Wait for threads to exit
-
- boolean timedOut = false;
+ //We set a latch on each ReplicationAwareMutex
+ ReplicationAwareMutex.setLatchAll();
long start = System.currentTimeMillis();
-
- outer: for (RemotingConnection rc : backupConnections)
- {
+
+ //We wait for all executing threads to end up on this latch. If they instead end up on a SequencedLock we interrupt them
+ //until they fall through to the other latch
+
+ for (RemotingConnection rc : backupConnections)
+ {
while (true)
{
- Thread executingThread = rc.getExecutingThread();
-
+ JBMThread executingThread = (JBMThread)rc.getExecutingThread();
+
if (executingThread == null)
{
break;
}
-
+
+ if (executingThread.isWaitingOnMutex())
+ {
+ executingThread.setNoReplayOrRecord(0);
+
+ break;
+ }
+
+ if (executingThread.isWaitingOnSequencedLock())
+ {
+ executingThread.setFrozen();
+
+ executingThread.interrupt();
+ }
+
try
{
Thread.sleep(1);
}
catch (InterruptedException ignore)
{
- }
-
+ }
+
if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
{
- timedOut = true;
-
- break outer;
+ throw new IllegalStateException("Timed out waiting for threads to exit or reach latch");
}
- }
+ }
}
+
+ //Now we release the latch and wait for all threads to exit
- if (timedOut)
+ ReplicationAwareMutex.setOwnerLatchAll();
+
+ start = System.currentTimeMillis();
+
+ // Wait for everything to exit
+ for (RemotingConnection rc : backupConnections)
{
- // log.info("*** timedout waiting for threads to exit");
- //Now we assert that all remaining threads are waiting for a SequencedLock
-
- for (RemotingConnection rc : backupConnections)
+ while (true)
{
- Thread executingThread = rc.getExecutingThread();
-
- if (executingThread != null & executingThread instanceof JBMThread)
+ JBMThread executingThread = (JBMThread)rc.getExecutingThread();
+
+ if (executingThread == null)
{
- JBMThread jthread = (JBMThread)executingThread;
-
- if (!jthread.isWaitingOnSequencedLock())
- {
- String msg = "Thread is not waiting on SequencedLock";
- Exception e = new Exception();
- e.setStackTrace(jthread.getStackTrace());
- log.error(msg, e);
- throw new IllegalStateException(msg);
- }
- }
- }
-
- // log.info("** all threads waiting on sequenced locks");
-
- //Now we can freeze out all the locks, after setting all threads to replay = false
- //and frozen to true
-
- for (RemotingConnection rc : backupConnections)
- {
- Thread executingThread = rc.getExecutingThread();
-
- if (executingThread != null & executingThread instanceof JBMThread)
+ break;
+ }
+
+ try
{
- JBMThread jthread = (JBMThread)executingThread;
-
- jthread.setNoReplayOrRecord();
-
- jthread.setFrozen();
- }
- }
-
- // log.info("** set all threads to frozen");
-
- ReplicationAwareMutex.freezeOutAll();
-
- //Now we interrupt all those threads
-
- for (RemotingConnection rc : backupConnections)
- {
- Thread executingThread = rc.getExecutingThread();
-
- if (executingThread != null & executingThread instanceof JBMThread)
+ Thread.sleep(1);
+ }
+ catch (InterruptedException ignore)
{
- JBMThread jthread = (JBMThread)executingThread;
-
- jthread.interrupt();
- }
- }
-
- //Now we wait again for the threads to complete
-
- start = System.currentTimeMillis();
-
- for (RemotingConnection rc : backupConnections)
- {
- while (true)
+ }
+
+ if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
{
- Thread executingThread = rc.getExecutingThread();
-
- if (executingThread == null)
- {
- break;
- }
-
- try
- {
- Thread.sleep(1);
- }
- catch (InterruptedException ignore)
- {
- }
-
- if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
- {
- String msg = "Thread won't freeze out";
- Exception e = new Exception();
- e.setStackTrace(executingThread.getStackTrace());
- log.error(msg, e);
- throw new IllegalStateException(msg);
- }
+ Exception e = new Exception();
+ e.setStackTrace(executingThread.getStackTrace());
+ log.error("Waiting for this thread " + executingThread, e);
+ log.error("Replay " + executingThread.isReplay());
+ throw new IllegalStateException("Timed out waiting for threads to exit");
}
}
}
+
+ ReplicationAwareMutex.clearLatchAll();
}
}
@@ -1498,7 +1452,7 @@
JBMThread thread = JBMThread.currentThread();
- thread.setNoReplayOrRecord();
+ thread.setNoReplayOrRecord(1);
channel1.send(new RegisterQueueReplicationChannelMessage(queueID));
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-16 13:40:42 UTC (rev 7579)
@@ -166,7 +166,7 @@
action.run();
- thread.setNoReplayOrRecord();
+ thread.setNoReplayOrRecord(2);
}
else
{
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java 2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java 2009-07-16 13:40:42 UTC (rev 7579)
@@ -90,7 +90,7 @@
if (queue == null)
{
- JBMThread.currentThread().setNoReplayOrRecord();
+ JBMThread.currentThread().setNoReplayOrRecord(3);
Binding binding = postOffice.getBindingByID(queueID);
@@ -108,7 +108,7 @@
channel.send(new ReplicationResponseMessage());
- thread.setNoReplayOrRecord();
+ thread.setNoReplayOrRecord(12);
break;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-16 13:40:42 UTC (rev 7579)
@@ -173,7 +173,7 @@
channel.send(new ReplicationResponseMessage());
}
- thread.setNoReplayOrRecord();
+ thread.setNoReplayOrRecord(4);
checkCloseSessionChannels(packet);
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-07-16 13:40:42 UTC (rev 7579)
@@ -25,7 +25,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.LockSupport;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.replication.Replicator;
@@ -47,7 +46,7 @@
RECORD, REPLAY, NONE;
}
- private ThreadState state;
+ private volatile ThreadState state;
private List<Triple<Long, Long, Integer>> objectSequences;
@@ -57,6 +56,10 @@
private volatile boolean waitingOnSequencedLock;
+ private volatile boolean waitingOnMutex;
+
+ private volatile boolean frozen;
+
public boolean isWaitingOnSequencedLock()
{
return this.waitingOnSequencedLock;
@@ -66,8 +69,16 @@
{
this.waitingOnSequencedLock = waiting;
}
+
+ public boolean isWaitingOnMutex()
+ {
+ return this.waitingOnMutex;
+ }
- private volatile boolean frozen;
+ public void setWaitingOnMutex(final boolean waiting)
+ {
+ this.waitingOnMutex = waiting;
+ }
public void setFrozen()
{
@@ -79,29 +90,6 @@
return frozen;
}
- //private volatile SequencedLock lastLock;
-
-// public void dumpSequences()
-// {
-// log.info("Dumping thread sequences, current pos = " + pos + " length is " + objectSequences.size());
-// int cnt = 0;
-// for (Triple<Long, Long, Integer> pair : objectSequences)
-// {
-// long id = pair.a;
-// if (id == -1)
-// {
-// log.info("[" + cnt++ + ", " + id + "] COUNTER: " + pair.b);
-// }
-// else
-// {
-// SequencedLock lock = SequencedLock.getLock(id);
-//
-// log.info("[" + cnt++ + ", " + id + "] " + lock.getName() + ": " + pair.b + ": " + pair.c);
-// }
-//
-// }
-// }
-
public static JBMThread currentThread()
{
return (JBMThread)Thread.currentThread();
@@ -133,6 +121,7 @@
public void setReplay(final List<Triple<Long, Long, Integer>> objectSequences)
{
+ // log.info(this + " set replay");
this.objectSequences = objectSequences;
this.state = ThreadState.REPLAY;
@@ -159,10 +148,12 @@
this.replicator = replicator;
}
- public void setNoReplayOrRecord()
+ public void setNoReplayOrRecord(int method)
{
this.state = ThreadState.NONE;
+ // log.info("setting thread " + this + " to no replay " + method);
+
//this.strict = true;
}
@@ -201,40 +192,4 @@
this.replicator = replicator;
}
-// private boolean strict = true;
-//
-// public synchronized SequencedLock getLastLock()
-// {
-// return this.lastLock;
-// }
-//
-// public synchronized void setLastLock(final SequencedLock lock)
-// {
-// this.lastLock = lock;
-//
-// if (!strict)
-// {
-// this.lastLock.setNonStrict();
-// }
-// }
-//
-// public synchronized void setLastLockNonStrict()
-// {
-// if (lastLock != null)
-// {
-// lastLock.setNonStrict();
-// }
-//
-// strict = false;
-// }
-
- public void jbmPark(final long toWait)
- {
- LockSupport.parkNanos(toWait);
- }
-
- public synchronized void jbmUnpark()
- {
- LockSupport.unpark(this);
- }
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java 2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java 2009-07-16 13:40:42 UTC (rev 7579)
@@ -54,13 +54,15 @@
private static final AtomicLong idSequence = new AtomicLong(0);
private final ReentrantLock lock;
-
+
private volatile Thread unfreezeOwner;
private volatile CountDownLatch freezeLatch;
-
+
private static final Set<ReplicationAwareMutex> allMutexes = new WeakHashSet<ReplicationAwareMutex>();
+ private volatile CountDownLatch otherLatch;
+
public ReplicationAwareMutex(final String name, final int initialCount, final boolean debug)
{
this.id = idSequence.getAndIncrement();
@@ -70,25 +72,79 @@
sequencedLock = new SequencedLock(name, initialCount);
counter = new AtomicInteger(initialCount);
-
+
synchronized (allMutexes)
{
allMutexes.add(this);
}
}
-
- public static void freezeOutAll()
+
+ public static void setOwnerLatchAll()
{
synchronized (allMutexes)
{
- for (ReplicationAwareMutex mutex: allMutexes)
+ for (ReplicationAwareMutex mutex : allMutexes)
{
- log.info("*** freezing out mutex");
- mutex.freezeOut();
+ mutex.setOwnerLatch();
}
+
+ for (ReplicationAwareMutex mutex : allMutexes)
+ {
+ mutex.releaseLatch();
+ }
}
}
+
+ public void setOwnerLatch()
+ {
+ if (sequencedLock.getOwner() != null)
+ {
+ freezeLatch = new CountDownLatch(1);
+
+ unfreezeOwner = sequencedLock.getOwner();
+ }
+ }
+
+ public static void setLatchAll()
+ {
+ synchronized (allMutexes)
+ {
+ for (ReplicationAwareMutex mutex : allMutexes)
+ {
+ mutex.setLatch();
+ }
+ }
+ }
+ public static void clearLatchAll()
+ {
+ synchronized (allMutexes)
+ {
+ for (ReplicationAwareMutex mutex : allMutexes)
+ {
+ mutex.clearLatches();
+ }
+ }
+ }
+
+ public void setLatch()
+ {
+ otherLatch = new CountDownLatch(1);
+ }
+
+ public void releaseLatch()
+ {
+ if (otherLatch != null)
+ {
+ otherLatch.countDown();
+ }
+ }
+
+ public void clearLatches()
+ {
+ otherLatch = freezeLatch = null;
+ }
+
public void lock(final int methodID)
{
try
@@ -105,16 +161,6 @@
this.doUnlock();
}
- public void freezeOut()
- {
- if (sequencedLock.getOwner() != null)
- {
- freezeLatch = new CountDownLatch(1);
-
- unfreezeOwner = sequencedLock.getOwner();
- }
- }
-
private boolean doLock(long time, TimeUnit unit, int methodID) throws InterruptedException
{
JBMThread thread = JBMThread.currentThread();
@@ -130,8 +176,32 @@
throw new IllegalStateException("Lock is NOT re-entrant!");
}
+ if (otherLatch != null)
+ {
+ thread.setWaitingOnMutex(true);
+
+ while (true)
+ {
+ try
+ {
+ otherLatch.await();
+ }
+ catch (InterruptedException e)
+ {
+ //This might get interrupted by mistake when we interrupt the thread thinking it's on the sequenced lock
+ //in which case we just wait again
+ continue;
+ }
+
+ break;
+ }
+
+ thread.setWaitingOnMutex(false);
+ }
+
if (thread.isReplay())
{
+ //log.info("Thread " + thread + " is replay");
Triple<Long, Long, Integer> pair = thread.getNextSequence();
// // Sanity check
@@ -158,6 +228,11 @@
try
{
+ if (!thread.isReplay())
+ {
+ throw new IllegalStateException("How can it be non replay?");
+ }
+
if (!sequencedLock.lock(sequence, unit.toNanos(time)))
{
// dumpLocksWithName(name);
@@ -167,8 +242,7 @@
{
if (thread.isFrozen())
{
- log.info("** interrupted and retrying");
- //We retry and this time it will use the standard mutex - this happens on freezing out
+ // We retry and this time it will use the standard mutex - this happens on freezing out
return doLock(time, unit, methodID);
}
}
@@ -181,11 +255,12 @@
{
if (unfreezeOwner != null)
{
- //There was an original owner on the SequencedLock when we interrupted so we can't allow any other threads to get the lock
- //until he has returned
+ // There was an original owner on the SequencedLock when we interrupted so we can't allow any other threads
+ // to get the lock
+ // until he has returned
freezeLatch.await();
}
-
+
boolean ok = lock.tryLock(time, unit);
if (ok)
@@ -203,7 +278,7 @@
return ok;
}
}
-
+
private void doUnlock()
{
JBMThread thread = JBMThread.currentThread();
@@ -211,9 +286,9 @@
if (thread == unfreezeOwner)
{
// Don't actually unlock this, since we never had the lock - we had the lock on the original SequencedLock
-
+
unfreezeOwner = null;
-
+
freezeLatch.countDown();
}
else
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-07-16 13:40:42 UTC (rev 7579)
@@ -142,7 +142,7 @@
action.run();
- thread.setNoReplayOrRecord();
+ thread.setNoReplayOrRecord(5);
List<Triple<Long, Long, Integer>> sequences = thread.getSequences();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java 2009-07-16 12:55:44 UTC (rev 7578)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java 2009-07-16 13:40:42 UTC (rev 7579)
@@ -29,6 +29,7 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
import org.jboss.messaging.core.logging.Logger;
@@ -177,28 +178,6 @@
// registerLock(this);
}
- // public void setNonStrict()
- // {
- // //log.info(System.identityHashCode(this) + " setting non strict");
- // if (strictOrder)
- // {
- // strictOrder = false;
- //
- // QueueEntry entry = peekEntry();
- //
- // if (entry != null)
- // {
- // // log.info(System.identityHashCode(this) + " setting non strict unparking " + entry.thread);
- // //LockSupport.unpark(entry.thread);
- // entry.thread.jbmUnpark();
- // }
- // }
- // }
- //
- // public boolean isStrict()
- // {
- // return strictOrder;
- // }
// TODO parking with a timeout seems to be a lot slower than parking without timeout
public boolean lock(final long sequence, final long timeout) throws InterruptedException
@@ -213,58 +192,55 @@
long toWait = timeout;
- while (true)
+ try
{
- QueueEntry peeked = peekEntry();
-
- if (peeked == null || // There are higher priority threads
- peeked.thread != currentThread ||
- // Next thread is not this one
- !locked.compareAndSet(false, true)) // Lock is already locked
+ while (true)
{
- //currentThread.setLastLock(this);
-
- currentThread.setWaitingOnSequencedLock(true);
-
- // LockSupport.parkNanos(toWait);
- // log.info(System.identityHashCode(this) + " parking " + currentThread);
- currentThread.jbmPark(toWait);
-
- currentThread.setWaitingOnSequencedLock(false);
-
- if (Thread.interrupted() && currentThread.isFrozen())
+ QueueEntry peeked = peekEntry();
+
+ if (peeked == null || // There are higher priority threads
+ peeked.thread != currentThread ||
+ !locked.compareAndSet(false, true)) // Lock is already locked
{
- log.info("** It's been interrupted");
- throw new InterruptedException();
+ currentThread.setWaitingOnSequencedLock(true);
+
+ LockSupport.parkNanos(toWait);
+
+ if (Thread.interrupted() && currentThread.isFrozen())
+ {
+ throw new InterruptedException();
+ }
+
+ long now = System.nanoTime();
+
+ toWait -= now - start;
+
+ if (toWait <= 0)
+ {
+ log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() +
+ " expected " +
+ sequence);
+
+ return false;
+ }
+
+ start = now;
}
-
- long now = System.nanoTime();
-
- toWait -= now - start;
-
- if (toWait <= 0)
+ else
{
- log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() +
- " expected " +
- sequence);
-
- return false;
+ break;
}
-
- start = now;
}
- else
- {
- break;
- }
}
+ finally
+ {
+ currentThread.setWaitingOnSequencedLock(false);
+ }
queue.remove();
owner = currentThread;
-
-
-
+
return true;
}
@@ -275,8 +251,6 @@
throw new IllegalMonitorStateException();
}
- // log.info(System.identityHashCode(this) + " unlocking");
-
currentSequence.incrementAndGet();
owner = null;
@@ -287,30 +261,11 @@
if (entry != null)
{
- // log.info(System.identityHashCode(this) + " unparking at unlock " + entry.thread);
- // LockSupport.unpark(entry.thread);
- entry.thread.jbmUnpark();
+ LockSupport.unpark(entry.thread);
}
- // else
- // {
- // log.info(System.identityHashCode(this) + " nothing to unpark at unlock");
- // }
+
}
- // private QueueEntry peekEntry()
- // {
- // QueueEntry entry = queue.peek();
- //
- // if (entry != null)
- // {
- // if (!strictOrder || entry.sequence == currentSequence.get())
- // {
- // return entry;
- // }
- // }
- //
- // return null;
- // }
private QueueEntry peekEntry()
{
More information about the jboss-cvs-commits
mailing list