[jboss-cvs] JBoss Messaging SVN: r7573 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/paging/impl and 11 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 15 08:37:25 EDT 2009
Author: timfox
Date: 2009-07-15 08:37:24 -0400 (Wed, 15 Jul 2009)
New Revision: 7573
Added:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
Removed:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
Modified:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/spi/Connector.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java
Log:
MT replication
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -725,11 +725,22 @@
try
{
- // log.info(System.identityHashCode(this) + " session handling failover");
+ // log.info(System.identityHashCode(this) + " session handling failover");
//Prevent any more packets being handled on the old connection
channel.getConnection().freeze();
+ while (channel.getConnection().getExecutingThread() != null)
+ {
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+
channel.transferConnection(backupConnection);
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
@@ -738,7 +749,7 @@
int lid = channel.getLastConfirmedCommandID();
- // log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
+ // log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
Packet request = new ReattachSessionMessage(name, lid);
@@ -746,11 +757,11 @@
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
- // log.info(System.identityHashCode(this) + " got response from reattach session");
+ // log.info(System.identityHashCode(this) + " got response from reattach session");
if (!response.isRemoved())
{
- // log.info(System.identityHashCode(this) + " found session, server last received command id is " + response.getLastConfirmedCommandID());
+ //log.info(System.identityHashCode(this) + " found session, server last received command id is " + response.getLastConfirmedCommandID());
channel.replayCommands(response.getLastConfirmedCommandID());
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.client.impl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
@@ -33,6 +34,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
import org.jboss.messaging.utils.SimpleString;
/**
@@ -97,7 +99,7 @@
log.error("Received exception asynchronously from server", mem.getException());
break;
- }
+ }
default:
{
throw new IllegalStateException("Invalid packet: " + type);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -22,8 +22,6 @@
package org.jboss.messaging.core.client.impl;
-import java.util.Set;
-
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.RemotingConnection;
@@ -69,4 +67,6 @@
void returnConnection(RemotingConnection conn);
void close();
+
+ void setNeverFail();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -151,6 +151,8 @@
private Connector connector;
private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
+
+ private volatile boolean neverFail;
// debug
@@ -207,8 +209,10 @@
{
backupConnectorFactory = null;
- backupTransportParams = null;
+ backupTransportParams = null;
}
+
+ // log.info(System.identityHashCode(this) + " created cm with bcf " + this.backupConnectorFactory);
this.maxConnections = maxConnections;
@@ -498,6 +502,11 @@
}
}
}
+
+ public void setNeverFail()
+ {
+ neverFail = true;
+ }
// Public
// ---------------------------------------------------------------------------------------
@@ -534,12 +543,16 @@
private boolean failoverOrReconnect(final MessagingException me, final Object connectionID)
{
+ // log.info(System.identityHashCode(this) + " connection manager failover or reconnect");
// To prevent recursion
if (inFailoverOrReconnect)
{
+ // log.info("Already in it");
return false;
}
+ // log.info("Waiting on failover lock");
+
synchronized (failoverLock)
{
if (connectionID != null && !connections.containsKey(connectionID))
@@ -548,9 +561,13 @@
// over then a async connection exception or disconnect
// came in for one of the already closed connections, so we return true - we don't want to call the
// listeners again
+
+ // log.info("ALready failed over that connection");
return true;
}
+
+ // log.info("Got failover lock");
// Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
// There are either no threads executing in createSession, or one is blocking on a createSession
@@ -579,11 +596,16 @@
boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.DISCONNECTED);
+ // log.info("Attempt failover is " + attemptFailover);
+ // log.info("bcf " + backupConnectorFactory + " fss: " + failoverOnServerShutdown + " me code " + me.getCode());
+
boolean done = false;
if (attemptFailover || reconnectAttempts != 0)
{
+ // log.info("Locking all channels");
lockAllChannel1s();
+ // log.info("Locked all channels");
final boolean needToInterrupt;
@@ -594,6 +616,8 @@
unlockAllChannel1s();
+ // log.info("Need to interrupt is " + needToInterrupt);
+
if (needToInterrupt)
{
// Forcing return all channels won't guarantee that any blocked thread will return immediately
@@ -616,8 +640,12 @@
}
}
}
+
+ // log.info("waited for create session to exit");
}
+ //log.info("continuing");
+
// Now we absolutely know that no threads are executing in or blocked in createSession, and no
// more will execute it until failover is complete
@@ -656,6 +684,8 @@
transportParams = backupTransportParams;
+ //log.info(System.identityHashCode(this) + " set bcf to null");
+
backupConnectorFactory = null;
backupTransportParams = null;
@@ -779,6 +809,8 @@
}
}
+ // log.info("ok is " + ok);
+
if (ok)
{
// If all connections got ok, then handle failover
@@ -812,7 +844,7 @@
return null;
}
- RemotingConnection connection = getConnection(initialRefCount);
+ RemotingConnection connection = internalGetConnection(initialRefCount);
if (connection == null)
{
@@ -888,11 +920,23 @@
connector = null;
}
}
-
+
public RemotingConnection getConnection(final int initialRefCount)
{
+ synchronized (createSessionLock)
+ {
+ synchronized (failoverLock)
+ {
+ return internalGetConnection(initialRefCount);
+ }
+ }
+ }
+
+ private RemotingConnection internalGetConnection(final int initialRefCount)
+ {
RemotingConnection conn;
+
if (connections.size() < maxConnections)
{
// Create a new one
@@ -906,6 +950,12 @@
DelegatingBufferHandler handler = new DelegatingBufferHandler();
connector = connectorFactory.createConnector(transportParams, handler, this, threadPool);
+
+ //For testing only - this makes sure that invm connector failures don't happen for backup connections
+ if (neverFail)
+ {
+ connector.setNeverFail();
+ }
if (connector != null)
{
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -48,7 +48,7 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
@@ -117,7 +117,7 @@
* */
private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
- //private final Lock lock = ReplicationAwareReadWriteLock.createLock().writeLock();
+ //private final Lock lock = ReplicationAwareMutex.createLock().writeLock();
private volatile boolean running = false;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -44,7 +44,7 @@
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.utils.ExecutorFactory;
@@ -73,7 +73,7 @@
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
- private final Lock lock;
+ private final ReplicationAwareMutex lock;
private boolean started;
@@ -91,9 +91,7 @@
this.server = server;
- ReadWriteLock rwLock = new ReplicationAwareReadWriteLock("clusterqueuestatemanager", 0, true);
-
- lock = rwLock.writeLock();
+ lock = new ReplicationAwareMutex("clusterqueuestatemanager", 0, true);
}
public synchronized void start()
@@ -121,16 +119,11 @@
return started;
}
- public Lock getNotificationLock()
- {
- return lock;
- }
-
public void sendNotification(final Notification notification, final SimpleString dest) throws Exception
{
ServerMessage notificationMessage;
- lock.lock();
+ lock.lock(0);
try
{
@@ -198,7 +191,7 @@
Queue queue = (Queue)binding.getBindable();
// Need to lock to make sure all queue info and notifications are in the correct order with no gaps
- lock.lock();
+ lock.lock(1);
try
{
// First send a reset message
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -59,7 +59,7 @@
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.Transaction;
@@ -111,7 +111,7 @@
private final boolean persistIDCache;
- private final ReadWriteLock lock;
+ private final ReplicationAwareMutex lock;
public PostOfficeImpl(final StorageManager storageManager,
final PagingManager pagingManager,
@@ -151,7 +151,7 @@
this.persistIDCache = persistIDCache;
- lock = new ReplicationAwareReadWriteLock("postoffice", 0, true);
+ lock = new ReplicationAwareMutex("postoffice", 0, true);
}
// MessagingComponent implementation ---------------------------------------
@@ -175,7 +175,7 @@
}
public synchronized void stop() throws Exception
- {
+ {
if (reaper != null)
{
@@ -194,7 +194,6 @@
return started;
}
-
// PostOffice implementation -----------------------------------------------
// TODO - needs to be locked to prevent happening concurrently with activate().
@@ -205,39 +204,37 @@
public void addBinding(final Binding binding) throws Exception
{
boolean addressExists;
-
- lock.writeLock().lock();
-
+
+ lock.lock(0);
+
try
{
addressExists = addressManager.getBindingsForRoutingAddress(binding.getAddress()) != null;
addressManager.addBinding(binding);
-
- }
- finally
- {
- lock.writeLock().unlock();
- }
- if (binding.getType() == BindingType.LOCAL_QUEUE)
- {
- Queue queue = (Queue)binding.getBindable();
-
- if (backup)
+ if (binding.getType() == BindingType.LOCAL_QUEUE)
{
- queue.setBackup();
- }
+ Queue queue = (Queue)binding.getBindable();
- managementService.registerQueue(queue, binding.getAddress(), storageManager);
+ if (backup)
+ {
+ queue.setBackup();
+ }
- if (!addressExists)
- {
- managementService.registerAddress(binding.getAddress());
+ managementService.registerQueue(queue, binding.getAddress(), storageManager);
+
+ if (!addressExists)
+ {
+ managementService.registerAddress(binding.getAddress());
+ }
}
}
-
-
+ finally
+ {
+ lock.unlock();
+ }
+
TypedProperties props = new TypedProperties();
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, binding.getType().toInt());
@@ -257,16 +254,16 @@
if (filter != null)
{
props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filter.getFilterString());
- }
-
+ }
+
String uid = UUIDGenerator.getInstance().generateStringUUID();
-
+
managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
}
public Binding removeBinding(final SimpleString uniqueName) throws Exception
{
- lock.writeLock().lock();
+ lock.lock(1);
Binding binding;
try
{
@@ -274,7 +271,7 @@
}
finally
{
- lock.writeLock().unlock();
+ lock.unlock();
}
if (binding == null)
{
@@ -298,8 +295,8 @@
{
managementService.unregisterAddress(binding.getAddress());
}
- }
-
+ }
+
TypedProperties props = new TypedProperties();
props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
@@ -311,13 +308,13 @@
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
-
+
return binding;
}
public Bindings getBindingsForAddress(final SimpleString address)
{
- lock.readLock().lock();
+ lock.lock(2);
try
{
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
@@ -331,108 +328,109 @@
}
finally
{
- lock.readLock().unlock();
+ lock.unlock();
}
}
public Binding getBinding(final SimpleString name)
{
- lock.readLock().lock();
+ lock.lock(3);
try
{
return addressManager.getBinding(name);
}
finally
{
- lock.readLock().unlock();
+ lock.unlock();
}
}
-
+
public Binding getBindingByID(final long id)
{
- lock.readLock().lock();
+ lock.lock(4);
try
{
return addressManager.getBindingByID(id);
}
finally
{
- lock.readLock().unlock();
+ lock.unlock();
}
}
public Bindings getMatchingBindings(final SimpleString address)
{
- lock.readLock().lock();
+ lock.lock(5);
try
{
return addressManager.getMatchingBindings(address);
}
finally
{
- lock.readLock().unlock();
+ lock.unlock();
}
}
public void route(final ServerMessage message, Transaction tx) throws Exception
{
- SimpleString address = message.getDestination();
+ lock.lock(6);
+ try
+ {
+ SimpleString address = message.getDestination();
- byte[] duplicateIDBytes = null;
+ byte[] duplicateIDBytes = null;
- Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
+ Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
- DuplicateIDCache cache = null;
+ DuplicateIDCache cache = null;
- if (duplicateID != null)
- {
- cache = getDuplicateIDCache(message.getDestination());
-
- if (duplicateID instanceof SimpleString)
+ if (duplicateID != null)
{
- duplicateIDBytes = ((SimpleString)duplicateID).getData();
- }
- else
- {
- duplicateIDBytes = (byte[])duplicateID;
- }
+ cache = getDuplicateIDCache(message.getDestination());
- if (cache.contains(duplicateIDBytes))
- {
- if (tx == null)
+ if (duplicateID instanceof SimpleString)
{
- log.trace("Duplicate message detected - message will not be routed");
+ duplicateIDBytes = ((SimpleString)duplicateID).getData();
}
else
{
- log.trace("Duplicate message detected - transaction will be rejected");
-
- tx.markAsRollbackOnly(null);
+ duplicateIDBytes = (byte[])duplicateID;
}
- return;
+ if (cache.contains(duplicateIDBytes))
+ {
+ if (tx == null)
+ {
+ log.trace("Duplicate message detected - message will not be routed");
+ }
+ else
+ {
+ log.trace("Duplicate message detected - transaction will be rejected");
+
+ tx.markAsRollbackOnly(null);
+ }
+
+ return;
+ }
}
- }
- boolean startedTx = false;
+ boolean startedTx = false;
- if (cache != null)
- {
- if (tx == null)
+ if (cache != null)
{
- // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
+ if (tx == null)
+ {
+ // We need to store the duplicate id atomically with the message storage, so we need to create a tx for
+ // this
- tx = new TransactionImpl(storageManager);
+ tx = new TransactionImpl(storageManager);
- startedTx = true;
+ startedTx = true;
+ }
+
+ cache.addToCache(duplicateIDBytes, tx);
}
- cache.addToCache(duplicateIDBytes, tx);
- }
-
- lock.readLock().lock();
- try
- {
if (tx == null)
{
if (pagingManager.page(message, true))
@@ -468,7 +466,7 @@
}
finally
{
- lock.readLock().unlock();
+ lock.unlock();
}
}
@@ -479,7 +477,7 @@
public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
{
- lock.readLock().lock();
+ lock.lock(7);
try
{
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getDestination());
@@ -495,7 +493,7 @@
}
finally
{
- lock.readLock().unlock();
+ lock.unlock();
}
}
@@ -553,25 +551,23 @@
return cache;
}
-
// Private -----------------------------------------------------------------
private synchronized void startExpiryScanner()
{
- //TODO disabled for now
-// if (reaperPeriod > 0)
-// {
-// reaper = new Reaper();
-//
-// expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
-//
-// expiryReaper.setPriority(reaperPriority);
-//
-// expiryReaper.start();
-// }
+ // TODO disabled for now
+ // if (reaperPeriod > 0)
+ // {
+ // reaper = new Reaper();
+ //
+ // expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
+ //
+ // expiryReaper.setPriority(reaperPriority);
+ //
+ // expiryReaper.start();
+ // }
}
-
private final PageMessageOperation getPageOperation(final Transaction tx)
{
// you could have races on the case two sessions using the same XID
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -64,7 +64,9 @@
void activate();
void freeze();
-
+
+ //void freeze();
+
Connection getTransportConnection();
boolean isActive();
@@ -76,4 +78,6 @@
long getBlockingCallTimeout();
RemotingConnection getReplicatingConnection();
+
+ Thread getExecutingThread();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -402,30 +402,17 @@
public void replayCommands(final int otherLastConfirmedCommandID)
{
- // log.info("replaying, other last command id " + otherLastConfirmedCommandID);
+ // log.info(connection.isClient() + " replaying, other last command id " + otherLastConfirmedCommandID);
if (otherLastConfirmedCommandID != -1)
{
clearUpTo(otherLastConfirmedCommandID);
}
- //log.info("Resend cache size is " + resendCache.size());
+ // log.info("Resend cache size is " + resendCache.size());
for (final Packet packet : resendCache)
{
- //log.info("Replaying command " + packet);
-
-// if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
-// {
-// SessionReceiveMessage sm = (SessionReceiveMessage)packet;
-//
-// ServerMessage msg = sm.getServerMessage();
-//
-// int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-//
-// log.info("resending message " + cnt);
-// }
-
doWrite(packet);
}
}
@@ -463,7 +450,7 @@
{
receivedBytes = 0;
- //log.info("Sending packets confirmed from flush");
+ // log.info("Sending packets confirmed from flush");
sendConfirmation(lcid);
}
@@ -478,6 +465,8 @@
//We need to queue packet confirmations too
if (!queuedWriteManager.tryQueue(confirmed))
{
+ // log.info(connection.isClient() + " writing packets confirmed " + lastConfirmedID + " " + System.identityHashCode(this));
+
doWrite(confirmed);
}
}
@@ -488,7 +477,7 @@
{
lastConfirmedCommandID++;
- //log.info("last confirmed id is now " + lastConfirmedCommandID);
+ // log.info("sending confirm from confirm");
receivedBytes += packet.getPacketSize();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -12,12 +12,13 @@
package org.jboss.messaging.core.remoting.impl;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
@@ -27,12 +28,9 @@
import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
import org.jboss.messaging.core.remoting.spi.Connection;
-import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.remoting.spi.Connector;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.replication.impl.JBMThread;
import org.jboss.messaging.utils.SimpleIDGenerator;
/**
@@ -47,40 +45,6 @@
private static final Logger log = Logger.getLogger(RemotingConnectionImpl.class);
- // Static
- // ---------------------------------------------------------------------------------------
-
- public static RemotingConnection createConnection(final ConnectorFactory connectorFactory,
- final Map<String, Object> params,
- final long callTimeout,
- final Executor threadPool,
- final ConnectionLifeCycleListener listener)
- {
- DelegatingBufferHandler handler = new DelegatingBufferHandler();
-
- Connector connector = connectorFactory.createConnector(params, handler, listener, threadPool);
-
- if (connector == null)
- {
- return null;
- }
-
- connector.start();
-
- Connection tc = connector.createConnection();
-
- if (tc == null)
- {
- return null;
- }
-
- RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, null);
-
- handler.conn = connection;
-
- return connection;
- }
-
// Attributes
// -----------------------------------------------------------------------------------
@@ -112,8 +76,6 @@
private boolean idGeneratorSynced = false;
- // private final Object freezeLock = new Object();
-
private volatile boolean frozen;
private final Object failLock = new Object();
@@ -163,7 +125,7 @@
this.active = active;
- this.client = client;
+ this.client = client;
}
// RemotingConnection implementation
@@ -362,6 +324,13 @@
private volatile Thread currentThread;
+
+
+ public void activate()
+ {
+ active = true;
+ }
+
public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
@@ -400,14 +369,6 @@
{
channel.handlePacket(packet);
}
- else
- {
- log.info("cannot find handler for packet " + packet +
- " client " +
- this.isClient() +
- " active " +
- this.isActive());
- }
}
}
finally
@@ -415,93 +376,109 @@
currentThread = null;
}
}
-
- public void activate()
- {
- active = true;
- }
-
- private static final long FREEZE_TIMEOUT = 5000;
-
+
public void freeze()
{
- if (frozen)
- {
- return;
- }
-
- frozen = true;
-
- if (currentThread != null)
- {
- // long count = 0;
- long start = System.currentTimeMillis();
- while (true)
- {
- Thread thread = currentThread;
-
- if (thread != null)
- {
- if (thread instanceof JBMThread)
- {
- JBMThread jthread = (JBMThread)thread;
-
- jthread.setLastLockNonStrict();
- }
-
- log.info("waiting for thread to complete");
+ frozen = true;
+ }
- Thread.yield();
-
- if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
- {
- throw new IllegalStateException("Timed out waiting for thread to complete");
- }
-
- // count++;
- //
- // if (count > 1)
- // {
- // Exception e = new Exception();
- // e.setStackTrace(thread.getStackTrace());
- // log.info("Waiting for this thread", e);
- // }
- }
- else
- {
- break;
- }
- }
-
- }
- //
- //
- //
- // //TODO check this logic
- //
- // log.info("freezing connection");
- //
- // JBMThread thread = null;
- //
- // if (currentThread != null)
- // {
- // thread = (JBMThread)currentThread;
- //
- // thread.jbmInterrupt();
- // }
- //
- // // Prevent any more packets being handled on this connection
- //
- // synchronized (freezeLock)
- // {
- // frozen = true;
- // }
- //
- // if (thread != null)
- // {
- // thread.jbmResetInterrupt();
- // }
+ public Thread getExecutingThread()
+ {
+ return currentThread;
}
+
+// public void freeze()
+// {
+// if (currentThread != null)
+// {
+// long start = System.currentTimeMillis();
+//
+// while (true)
+// {
+// Thread thread = currentThread;
+//
+// if (thread != null)
+// {
+// log.info("waiting for thread to complete");
+//
+// try
+// {
+// Thread.sleep(1);
+// }
+// catch (InterruptedException ignore)
+// {
+// }
+//
+// if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
+// {
+// Exception e = new Exception();
+// e.setStackTrace(thread.getStackTrace());
+// log.info("Waiting for this thread", e);
+//
+// JBMThread jthread = null;
+//
+// if (thread instanceof JBMThread)
+// {
+// jthread = (JBMThread)thread;
+//
+// //jthread.setLastLockNonStrict();
+// }
+//
+// if (jthread != null)
+// {
+// SequencedLock lastLock = jthread.getLastLock();
+//
+// log.info("Owner of the lock which this thread is waiting for is ");
+//
+// if (lastLock != null)
+// {
+// log.info("Dumping last lock");
+//
+// lastLock.dump();
+// }
+// }
+//
+// // log.info("Gonna try a nudge");
+// // SequencedLock.dumpLocks();
+//
+// throw new IllegalStateException("Timed out waiting for thread to complete");
+// }
+// }
+// else
+// {
+// break;
+// }
+// }
+//
+// }
+// //
+// //
+// //
+// // //TODO check this logic
+// //
+// // log.info("freezing connection");
+// //
+// // JBMThread thread = null;
+// //
+// // if (currentThread != null)
+// // {
+// // thread = (JBMThread)currentThread;
+// //
+// // thread.jbmInterrupt();
+// // }
+// //
+// // // Prevent any more packets being handled on this connection
+// //
+// // synchronized (freezeLock)
+// // {
+// // frozen = true;
+// // }
+// //
+// // if (thread != null)
+// // {
+// // thread.jbmResetInterrupt();
+// // }
+// }
// Package protected
// ----------------------------------------------------------------------------
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -52,6 +52,8 @@
public static volatile int numberOfFailures = -1;
private static volatile int failures;
+
+ private volatile boolean neverFail;
public static synchronized void resetFailures()
{
@@ -125,10 +127,10 @@
{
return started;
}
-
+
public Connection createConnection()
{
- if (failOnCreateConnection)
+ if (failOnCreateConnection && !neverFail)
{
incFailures();
// For testing only
@@ -166,7 +168,13 @@
conn.close();
}
}
+
+ public void setNeverFail()
+ {
+ this.neverFail = true;
+ }
+
// This may be an injection point for mocks on tests
protected Connection internalCreateConnection(final BufferHandler handler,
final ConnectionLifeCycleListener listener,
@@ -218,5 +226,7 @@
}
}
+
+
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -18,6 +18,8 @@
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.Triple;
/**
*
@@ -33,13 +35,13 @@
// Attributes ----------------------------------------------------
- private List<Long> sequences;
+ private List<Triple<Long, Long, Integer>> sequences;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ReplicateLockSequenceMessage(final List<Long> sequences)
+ public ReplicateLockSequenceMessage(final List<Triple<Long, Long, Integer>> sequences)
{
super(REPLICATE_LOCK_SEQUENCES);
@@ -55,16 +57,21 @@
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + sequences.size() * DataConstants.SIZE_LONG;
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+ sequences.size() *
+ (2 * DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT);
}
@Override
public void encodeBody(final MessagingBuffer buffer)
{
buffer.writeInt(sequences.size());
- for (long sequence : sequences)
+ for (Triple<Long, Long, Integer> sequence : sequences)
{
- buffer.writeLong(sequence);
+ buffer.writeLong(sequence.a);
+ buffer.writeLong(sequence.b);
+ buffer.writeInt(sequence.c);
}
}
@@ -72,14 +79,17 @@
public void decodeBody(final MessagingBuffer buffer)
{
int len = buffer.readInt();
- sequences = new ArrayList<Long>(len);
+ sequences = new ArrayList<Triple<Long, Long, Integer>>(len);
for (int i = 0; i < len; i++)
{
- sequences.add(buffer.readLong());
+ Triple<Long, Long, Integer> pair = new Triple<Long, Long, Integer>(buffer.readLong(),
+ buffer.readLong(),
+ buffer.readInt());
+ sequences.add(pair);
}
}
- public List<Long> getSequences()
+ public List<Triple<Long, Long, Integer>> getSequences()
{
return sequences;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/spi/Connector.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/spi/Connector.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/spi/Connector.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -45,4 +45,6 @@
* @return The connection, or null if unable to create a connection (e.g. network is unavailable)
*/
Connection createConnection();
+
+ void setNeverFail();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -28,6 +28,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.management.MBeanServer;
@@ -99,7 +101,9 @@
import org.jboss.messaging.core.server.cluster.impl.ClusterManagerImpl;
import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
import org.jboss.messaging.core.server.replication.impl.ReplicatorImpl;
+import org.jboss.messaging.core.server.replication.impl.SequencedLock;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
@@ -204,7 +208,12 @@
private int managementConnectorID;
private static AtomicInteger managementConnectorSequence = new AtomicInteger(0);
+
+ private ConnectionManager pooledReplicatingConnectionManager;
+ private ConnectionManager nonPooledReplicatingConnectionManager;
+
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -539,7 +548,7 @@
" not compatible with version: " +
version.getFullVersion());
}
-
+
if (!direct)
{
if (!registerBackupConnection(connection))
@@ -642,7 +651,7 @@
session.setHandler(handler);
channel.setHandler(handler);
-
+
return new CreateSessionResponseMessage(version.getIncrementingVersion());
}
@@ -716,7 +725,7 @@
"). You're probably trying to restart a live backup pair after a crash");
}
- log.info("Backup server is now operational");
+ log.info("Backup server is now ready");
}
}
@@ -852,14 +861,6 @@
activateCallbacks.remove(callback);
}
- // private ConnectorFactory backupConnectorFactory;
- //
- // private Map<String, Object> backupConnectorParams;
-
- private volatile ConnectionManager pooledReplicatingConnectionManager;
-
- private volatile ConnectionManager nonPooledReplicatingConnectionManager;
-
private void setupConnectionManagers()
{
String backupConnectorName = configuration.getBackupConnectorName();
@@ -886,6 +887,8 @@
0,
this.threadPool,
this.scheduledPool);
+
+ pooledReplicatingConnectionManager.setNeverFail();
nonPooledReplicatingConnectionManager = new ConnectionManagerImpl(null,
backupConnector,
@@ -900,6 +903,7 @@
0,
this.threadPool,
this.scheduledPool);
+ nonPooledReplicatingConnectionManager.setNeverFail();
}
}
}
@@ -965,8 +969,6 @@
private Set<RemotingConnection> backupConnections = new WeakHashSet<RemotingConnection>();
-
-
private static class NoCacheConnectionLifeCycleListener implements ConnectionLifeCycleListener
{
private RemotingConnection conn;
@@ -1056,7 +1058,8 @@
queueDeployer.start();
}
}
- log.info("Backup server is now ACTIVATED");
+
+ log.info("Backup server is now activated");
}
connection.activate();
@@ -1071,30 +1074,183 @@
// connection 1 delivery
// connection 1 delivery gets replicated
// can't find message in queue since active was delivered immediately
-
+
private boolean frozen;
-
+
+ // We have to do a bit of locking jiggery-pokery to ensure we don't get connections being registered and blocking
+ // while freezing is in progress
+ private final Lock flock = new ReentrantLock();
+
+ private static final long FREEZE_TIMEOUT = 2000;
+
private void freezeBackupConnections()
{
+ flock.lock();
+
+ // log.info("** freezing backup connections");
+
synchronized (backupConnections)
{
frozen = true;
+
+ flock.unlock();
+
+ // Needs to be done in two stages - once set locks non strict
+
+ freezeConnections();
+
+ // Wait for threads to exit
- for (RemotingConnection rc : backupConnections)
+ boolean timedOut = false;
+
+ long start = System.currentTimeMillis();
+
+ outer: for (RemotingConnection rc : backupConnections)
+ {
+ while (true)
+ {
+ Thread executingThread = rc.getExecutingThread();
+
+ if (executingThread == null)
+ {
+ break;
+ }
+
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
+ {
+ timedOut = true;
+
+ break outer;
+ }
+ }
+ }
+
+ if (timedOut)
{
- rc.freeze();
+ // log.info("*** timedout waiting for threads to exit");
+ //Now we assert that all remaining threads are waiting for a SequencedLock
+
+ for (RemotingConnection rc : backupConnections)
+ {
+ Thread executingThread = rc.getExecutingThread();
+
+ if (executingThread != null & executingThread instanceof JBMThread)
+ {
+ JBMThread jthread = (JBMThread)executingThread;
+
+ if (!jthread.isWaitingOnSequencedLock())
+ {
+ String msg = "Thread is not waiting on SequencedLock";
+ Exception e = new Exception();
+ e.setStackTrace(jthread.getStackTrace());
+ log.error(msg, e);
+ throw new IllegalStateException(msg);
+ }
+ }
+ }
+
+ // log.info("** all threads waiting on sequenced locks");
+
+ //Now we can freeze out all the locks, after setting all threads to replay = false
+ //and frozen to true
+
+ for (RemotingConnection rc : backupConnections)
+ {
+ Thread executingThread = rc.getExecutingThread();
+
+ if (executingThread != null & executingThread instanceof JBMThread)
+ {
+ JBMThread jthread = (JBMThread)executingThread;
+
+ jthread.setNoReplayOrRecord();
+
+ jthread.setFrozen();
+ }
+ }
+
+ // log.info("** set all threads to frozen");
+
+ ReplicationAwareMutex.freezeOutAll();
+
+ //Now we interrupt all those threads
+
+ for (RemotingConnection rc : backupConnections)
+ {
+ Thread executingThread = rc.getExecutingThread();
+
+ if (executingThread != null & executingThread instanceof JBMThread)
+ {
+ JBMThread jthread = (JBMThread)executingThread;
+
+ jthread.interrupt();
+ }
+ }
+
+ //Now we wait again for the threads to complete
+
+ start = System.currentTimeMillis();
+
+ for (RemotingConnection rc : backupConnections)
+ {
+ while (true)
+ {
+ Thread executingThread = rc.getExecutingThread();
+
+ if (executingThread == null)
+ {
+ break;
+ }
+
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
+ {
+ String msg = "Thread won't freeze out";
+ Exception e = new Exception();
+ e.setStackTrace(executingThread.getStackTrace());
+ log.error(msg, e);
+ throw new IllegalStateException(msg);
+ }
+ }
+ }
}
}
}
-
+
+ private void freezeConnections()
+ {
+ for (RemotingConnection rc : backupConnections)
+ {
+ rc.freeze();
+ }
+ }
+
public boolean registerBackupConnection(final RemotingConnection connection)
{
- synchronized (backupConnections)
+ flock.lock();
+
+ try
{
if (!frozen)
{
- backupConnections.add(connection);
-
+ synchronized (backupConnections)
+ {
+ backupConnections.add(connection);
+ }
return true;
}
else
@@ -1102,6 +1258,10 @@
return false;
}
}
+ finally
+ {
+ flock.unlock();
+ }
}
private void initialisePart1() throws Exception
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -41,6 +41,7 @@
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.utils.Triple;
/**
* A packet handler for all packets that need to be handled at the server level
@@ -61,7 +62,7 @@
private Replicator replicator;
- private volatile List<Long> sequences;
+ private volatile List<Triple<Long, Long, Integer>> sequences;
public MessagingServerPacketHandler(final MessagingServer server,
final Channel channel1,
@@ -116,19 +117,20 @@
RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
-
- server.registerBackupConnection(channel.getConnection());
- channel.setHandler(new QueueReplicationPacketHandler(msg.getBindingID(), server.getPostOffice(), channel));
+ if (server.registerBackupConnection(channel.getConnection()))
+ {
+ channel.setHandler(new QueueReplicationPacketHandler(msg.getBindingID(), server.getPostOffice(), channel));
+ }
break;
}
case UNREGISTER_QUEUE_REPLICATION_CHANNEL:
- {
+ {
UnregisterQueueReplicationChannelMessage msg = (UnregisterQueueReplicationChannelMessage)packet;
Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
-
+
channel.setHandler(null);
channel.close();
@@ -199,7 +201,7 @@
if (server.getConfiguration().isBackup() || type == REPLICATE_STARTUP_INFO)
{
- channel1.send(new ReplicationResponseMessage());
+ channel1.send(new ReplicationResponseMessage());
}
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -56,7 +56,7 @@
import org.jboss.messaging.core.server.cluster.impl.Redistributor;
import org.jboss.messaging.core.server.replication.ReplicableCall;
import org.jboss.messaging.core.server.replication.Replicator;
-import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.Transaction;
@@ -143,7 +143,7 @@
private final Set<Consumer> consumers = new HashSet<Consumer>();
- private final Lock lock;
+ private final ReplicationAwareMutex lock;
private final Executor executor;
@@ -193,9 +193,9 @@
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
- ReplicationAwareReadWriteLock rwLock = new ReplicationAwareReadWriteLock(name.toString(), 0, true);
+ lock = new ReplicationAwareMutex(name.toString(), 0, true);
- lock = rwLock.writeLock();
+ // lock = rwLock.writeLock();
this.replicator = replicator;
@@ -406,15 +406,15 @@
return;
}
- if (waitingToDeliver.compareAndSet(false, true))
- {
+ // if (waitingToDeliver.compareAndSet(false, true))
+ // {
executor.execute(deliverRunner);
- }
+ // }
}
public void addConsumer(final Consumer consumer) throws Exception
{
- lock.lock();
+ lock.lock(0);
try
{
cancelRedistributor();
@@ -431,7 +431,7 @@
public boolean removeConsumer(final Consumer consumer) throws Exception
{
- lock.lock();
+ lock.lock(1);
try
{
boolean removed = distributionPolicy.removeConsumer(consumer);
@@ -453,7 +453,7 @@
public void addRedistributor(final long delay, final Executor executor)
{
- lock.lock();
+ lock.lock(2);
try
{
@@ -511,7 +511,7 @@
public int getConsumerCount()
{
- lock.lock();
+ lock.lock(3);
try
{
@@ -530,7 +530,7 @@
public List<MessageReference> list(final Filter filter)
{
- lock.lock();
+ lock.lock(4);
try
{
@@ -561,7 +561,7 @@
public MessageReference removeReferenceWithID(final long id) throws Exception
{
- lock.lock();
+ lock.lock(5);
try
{
@@ -601,7 +601,7 @@
public MessageReference removeFirstReference(final long id) throws Exception
{
- lock.lock();
+ lock.lock(6);
try
{
@@ -628,7 +628,7 @@
public MessageReference getReference(final long id)
{
- lock.lock();
+ lock.lock(7);
try
{
@@ -654,7 +654,7 @@
public int getMessageCount()
{
- lock.lock();
+ lock.lock(8);
try
{
@@ -680,7 +680,7 @@
public int getScheduledCount()
{
- lock.lock();
+ lock.lock(9);
try
{
@@ -694,7 +694,7 @@
public List<MessageReference> getScheduledMessages()
{
- lock.lock();
+ lock.lock(10);
try
{
@@ -761,6 +761,8 @@
if (oper == null)
{
+ //log.info("Creating new refs operation");
+
oper = new RefsOperation();
tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
@@ -779,7 +781,7 @@
public void cancel(final MessageReference reference) throws Exception
{
- lock.lock();
+ lock.lock(11);
try
{
@@ -853,7 +855,7 @@
Transaction tx = new TransactionImpl(storageManager);
- lock.lock();
+ lock.lock(12);
try
{
@@ -899,7 +901,7 @@
Transaction tx = new TransactionImpl(storageManager);
- lock.lock();
+ lock.lock(13);
try
{
@@ -930,7 +932,7 @@
public boolean expireReference(final long messageID) throws Exception
{
- lock.lock();
+ lock.lock(14);
try
{
@@ -961,7 +963,7 @@
int count = 0;
- lock.lock();
+ lock.lock(15);
try
{
@@ -991,7 +993,7 @@
public void expireReferences() throws Exception
{
- lock.lock();
+ lock.lock(16);
try
{
@@ -1011,7 +1013,7 @@
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
{
- lock.lock();
+ lock.lock(17);
try
{
@@ -1038,7 +1040,7 @@
public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
{
- lock.lock();
+ lock.lock(18);
try
{
@@ -1069,7 +1071,7 @@
int count = 0;
- lock.lock();
+ lock.lock(19);
try
{
@@ -1111,7 +1113,7 @@
public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
{
- lock.lock();
+ lock.lock(20);
try
{
List<MessageReference> refs = list(null);
@@ -1204,7 +1206,7 @@
public void lock()
{
- lock.lock();
+ lock.lock(21);
}
public void unlock()
@@ -1417,7 +1419,7 @@
public HandleStatus deliverOne()
{
- lock.lock();
+ lock.lock(22);
direct = false;
@@ -1518,7 +1520,7 @@
private void add(final MessageReference ref, final boolean first)
{
- lock.lock();
+ lock.lock(23);
try
{
if (!first)
@@ -1667,7 +1669,7 @@
protected void postRollback(LinkedList<MessageReference> refs) throws Exception
{
- lock.lock();
+ lock.lock(24);
try
{
for (MessageReference ref : refs)
@@ -1696,7 +1698,7 @@
public void run()
{
// Must be set to false *before* executing to avoid race
- waitingToDeliver.set(false);
+ //waitingToDeliver.set(false);
deliverAll();
}
@@ -1762,6 +1764,8 @@
synchronized void addRef(final MessageReference ref)
{
refsToAdd.add(ref);
+
+ //log.info("adding ref, now there are " + refsToAdd.size());
}
synchronized void addAck(final MessageReference ref)
@@ -1825,6 +1829,8 @@
public void afterCommit(final Transaction tx) throws Exception
{
+ //log.info("after commit , to add " + refsToAdd.size() + " to ack " + refsToAck.size());
+
for (MessageReference ref : refsToAdd)
{
ref.getQueue().addLast(ref);
@@ -1883,7 +1889,7 @@
public void run()
{
- lock.lock();
+ lock.lock(25);
try
{
internalAddRedistributor(executor);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -35,9 +35,10 @@
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
-import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.Triple;
/**
* A QueueReplicationPacketHandler
@@ -54,7 +55,7 @@
private volatile Queue queue;
- private volatile List<Long> sequences;
+ private volatile List<Triple<Long, Long, Integer>> sequences;
private final PostOffice postOffice;
@@ -105,7 +106,7 @@
queue.deliverOne();
- channel.send(new ReplicationResponseMessage());
+ channel.send(new ReplicationResponseMessage());
thread.setNoReplayOrRecord();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -51,7 +51,7 @@
import org.jboss.messaging.core.server.ServerConsumer;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
-import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.utils.TypedProperties;
@@ -87,7 +87,7 @@
private final Executor executor;
- private final Lock lock;
+ private final ReplicationAwareMutex lock;
private int availableCredits;
@@ -112,8 +112,6 @@
private final Channel channel;
- private volatile boolean closed;
-
private final boolean preAcknowledge;
private final ManagementService managementService;
@@ -134,8 +132,7 @@
final PagingManager pagingManager,
final Channel channel,
final boolean preAcknowledge,
- final boolean updateDeliveries,
- // final Executor executor,
+ final boolean updateDeliveries,
final ManagementService managementService) throws Exception
{
this.id = id;
@@ -168,9 +165,7 @@
this.updateDeliveries = updateDeliveries;
- ReplicationAwareReadWriteLock rwLock = new ReplicationAwareReadWriteLock("consumer " + id, 0, true);
-
- lock = rwLock.writeLock();
+ lock = new ReplicationAwareMutex("consumer " + session.getName() + "-" + id, 0, true);
binding.getQueue().addConsumer(this);
}
@@ -210,8 +205,6 @@
Iterator<MessageReference> iter = refs.iterator();
- closed = true;
-
Transaction tx = new TransactionImpl(storageManager);
while (iter.hasNext())
@@ -282,7 +275,7 @@
public void setStarted(final boolean started)
{
- lock.lock();
+ lock.lock(0);
try
{
@@ -304,7 +297,7 @@
{
boolean promptDelivery = false;
- lock.lock();
+ lock.lock(1);
try
{
@@ -364,9 +357,7 @@
" backup = " +
messageQueue.isBackup() +
" queue = " +
- messageQueue.getName() +
- " closed = " +
- closed);
+ messageQueue.getName());
}
if (autoCommitAcks)
@@ -412,9 +403,7 @@
{
throw new IllegalStateException("Could not find reference with id " + messageID +
" backup " +
- messageQueue.isBackup() +
- " closed " +
- closed);
+ messageQueue.isBackup());
}
return ref;
@@ -436,7 +425,7 @@
/** To be used on tests only */
public int getAvailableCredits()
{
- lock.lock();
+ lock.lock(2);
try
{
return availableCredits;
@@ -485,7 +474,7 @@
private HandleStatus doHandle(final MessageReference ref) throws Exception
{
- lock.lock();
+ lock.lock(3);
try
{
@@ -591,7 +580,7 @@
{
public void run()
{
- lock.lock();
+ lock.lock(4);
try
{
if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
@@ -637,7 +626,7 @@
public boolean deliver()
{
- lock.lock();
+ lock.lock(5);
try
{
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -76,12 +76,12 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.server.replication.impl.JBMThread;
-import org.jboss.messaging.utils.SimpleString;
+import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.Triple;
/**
* A ServerSessionPacketHandler
@@ -104,7 +104,7 @@
//TODO the sequences and repl response can be encapsulated in a super class
- private volatile List<Long> sequences;
+ private volatile List<Triple<Long, Long, Integer>> sequences;
private final Channel channel;
@@ -156,6 +156,7 @@
{
//checkConfirm(packet);
+ // log.info("sending confirm on sess handle packet");
channel.confirm(packet);
// if (packet.getType() == PacketImpl.SESS_SEND)
@@ -169,7 +170,6 @@
// log.info("confirmed send " + cnt);
// }
-
channel.send(new ReplicationResponseMessage());
}
@@ -210,6 +210,7 @@
//TODO this is a bit hacky
if (packet.getType() != PacketImpl.SESS_CLOSE)
{
+ // log.info("sending confirm from sess close");
channel.confirm(packet);
}
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -24,9 +24,12 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.replication.Replicator;
+import org.jboss.messaging.utils.Triple;
/**
* A JBMThread
@@ -43,27 +46,76 @@
{
RECORD, REPLAY, NONE;
}
-
+
private ThreadState state;
- private List<Long> objectSequences;
+ private List<Triple<Long, Long, Integer>> objectSequences;
- private int pos;
+ private final AtomicInteger pos = new AtomicInteger(0);
+
+ private Replicator replicator;
- private Replicator replicator;
+ private volatile boolean waitingOnSequencedLock;
- // private volatile boolean jbmInterrupted;
-
+ public boolean isWaitingOnSequencedLock()
+ {
+ return this.waitingOnSequencedLock;
+ }
+
+ public void setWaitingOnSequencedLock(final boolean waiting)
+ {
+ this.waitingOnSequencedLock = waiting;
+ }
+
+ private volatile boolean frozen;
+
+ public void setFrozen()
+ {
+ this.frozen = true;
+ }
+
+ public boolean isFrozen()
+ {
+ return frozen;
+ }
+
+ //private volatile SequencedLock lastLock;
+
+// public void dumpSequences()
+// {
+// log.info("Dumping thread sequences, current pos = " + pos + " length is " + objectSequences.size());
+// int cnt = 0;
+// for (Triple<Long, Long, Integer> pair : objectSequences)
+// {
+// long id = pair.a;
+// if (id == -1)
+// {
+// log.info("[" + cnt++ + ", " + id + "] COUNTER: " + pair.b);
+// }
+// else
+// {
+// SequencedLock lock = SequencedLock.getLock(id);
+//
+// log.info("[" + cnt++ + ", " + id + "] " + lock.getName() + ": " + pair.b + ": " + pair.c);
+// }
+//
+// }
+// }
+
public static JBMThread currentThread()
{
return (JBMThread)Thread.currentThread();
}
-
+
+ public JBMThread()
+ {
+ }
+
public JBMThread(final ThreadGroup threadGroup, final String name)
{
super(threadGroup, name);
}
-
+
public JBMThread(final ThreadGroup threadGroup, final Runnable target, final String name)
{
super(threadGroup, target, name);
@@ -73,115 +125,116 @@
{
return state == ThreadState.REPLAY;
}
-
+
public boolean isRecording()
{
return state == ThreadState.RECORD;
}
-
- public void setReplay(final List<Long> objectSequences)
+
+ public void setReplay(final List<Triple<Long, Long, Integer>> objectSequences)
{
this.objectSequences = objectSequences;
-
+
this.state = ThreadState.REPLAY;
-
- this.pos = 0;
+
+ this.pos.set(0);
}
-
-// private volatile long sequence;
-//
-// private AtomicLong seqCounter = new AtomicLong(0);
-
+
public void setRecord(final Replicator replicator)
{
- //this.sequence = seqCounter.getAndIncrement();
+ // this.sequence = seqCounter.getAndIncrement();
if (this.objectSequences == null)
{
- this.objectSequences = new ArrayList<Long>();
+ this.objectSequences = new ArrayList<Triple<Long, Long, Integer>>();
}
else
{
this.objectSequences.clear();
}
-
+
this.state = ThreadState.RECORD;
-
- this.pos = 0;
-
+
+ this.pos.set(0);
+
this.replicator = replicator;
}
-
+
public void setNoReplayOrRecord()
{
this.state = ThreadState.NONE;
+
+ //this.strict = true;
}
public void resumeRecording()
{
this.state = ThreadState.RECORD;
}
-
+
public void resumeReplay()
{
this.state = ThreadState.REPLAY;
}
-
- public long getNextSequence()
+
+ public Triple<Long, Long, Integer> getNextSequence()
{
- return objectSequences.get(pos++);
+ return objectSequences.get(pos.getAndIncrement());
}
- public void addSequence(final long currentSequence)
+ public void addSequence(final Triple<Long, Long, Integer> currentSequence)
{
objectSequences.add(currentSequence);
}
-
- public List<Long> getSequences()
+
+ public List<Triple<Long, Long, Integer>> getSequences()
{
return objectSequences;
}
-
+
public Replicator getReplicator()
{
return replicator;
}
-
+
public void setReplicator(final Replicator replicator)
{
this.replicator = replicator;
}
-
-// public boolean isJBMInterrupted()
+
+// private boolean strict = true;
+//
+// public synchronized SequencedLock getLastLock()
// {
-// return jbmInterrupted;
+// return this.lastLock;
// }
-
- public void setLastLock(final SequencedLock lock)
+//
+// public synchronized void setLastLock(final SequencedLock lock)
+// {
+// this.lastLock = lock;
+//
+// if (!strict)
+// {
+// this.lastLock.setNonStrict();
+// }
+// }
+//
+// public synchronized void setLastLockNonStrict()
+// {
+// if (lastLock != null)
+// {
+// lastLock.setNonStrict();
+// }
+//
+// strict = false;
+// }
+
+ public void jbmPark(final long toWait)
{
- this.lastLock = lock;
+ LockSupport.parkNanos(toWait);
}
-
- private volatile SequencedLock lastLock;
-
- public void setLastLockNonStrict()
+
+ public synchronized void jbmUnpark()
{
- if (lastLock != null)
- {
- lastLock.setStrict(false);
- }
+ LockSupport.unpark(this);
}
-
-// public void jbmInterrupt()
-// {
-// log.info("Interrupting jbm thread");
-//
-// jbmInterrupted = true;
-//
-// interrupt();
-// }
-//
-// public void jbmResetInterrupt()
-// {
-// jbmInterrupted = false;
-// }
}
Copied: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java (from rev 7559, branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java)
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -0,0 +1,249 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.replication.impl;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.ConcurrentHashSet;
+import org.jboss.messaging.utils.Triple;
+import org.jboss.messaging.utils.WeakHashSet;
+
+/**
+ * A ReplicationAwareMutex
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ReplicationAwareMutex
+{
+ private static final Logger log = Logger.getLogger(ReplicationAwareMutex.class);
+
+ private final AtomicInteger counter;
+
+ private final SequencedLock sequencedLock;
+
+ private final long id;
+
+ private static final AtomicLong idSequence = new AtomicLong(0);
+
+ private final ReentrantLock lock;
+
+ private volatile Thread unfreezeOwner;
+
+ private volatile CountDownLatch freezeLatch;
+
+ private static final Set<ReplicationAwareMutex> allMutexes = new WeakHashSet<ReplicationAwareMutex>();
+
+ public ReplicationAwareMutex(final String name, final int initialCount, final boolean debug)
+ {
+ this.id = idSequence.getAndIncrement();
+
+ lock = new ReentrantLock();
+
+ sequencedLock = new SequencedLock(name, initialCount);
+
+ counter = new AtomicInteger(initialCount);
+
+ synchronized (allMutexes)
+ {
+ allMutexes.add(this);
+ }
+ }
+
+ public static void freezeOutAll()
+ {
+ synchronized (allMutexes)
+ {
+ for (ReplicationAwareMutex mutex: allMutexes)
+ {
+ log.info("*** freezing out mutex");
+ mutex.freezeOut();
+ }
+ }
+ }
+
+ public void lock(final int methodID)
+ {
+ try
+ {
+ doLock(10000, TimeUnit.MILLISECONDS, methodID);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ public void unlock()
+ {
+ this.doUnlock();
+ }
+
+ public void freezeOut()
+ {
+ if (sequencedLock.getOwner() != null)
+ {
+ freezeLatch = new CountDownLatch(1);
+
+ unfreezeOwner = sequencedLock.getOwner();
+ }
+ }
+
+ private boolean doLock(long time, TimeUnit unit, int methodID) throws InterruptedException
+ {
+ JBMThread thread = JBMThread.currentThread();
+
+ // debug only
+ if (owners.contains(thread))
+ {
+ // Exception e = new Exception();
+ // e.setStackTrace(stackTraces.get(thread));
+ // log.error("Stateful staticLock is not re-entrant, first obtained here", e);
+ // Exception e2 = new Exception();
+ // log.info("Second attempt to obtain here", e2);
+ throw new IllegalStateException("Lock is NOT re-entrant!");
+ }
+
+ if (thread.isReplay())
+ {
+ Triple<Long, Long, Integer> pair = thread.getNextSequence();
+
+ // // Sanity check
+ // String otherName = SequencedLock.getLock(pair.a).getName();
+ //
+ // //If sequencedLock
+ //
+ // if ((!otherName.equals(name) || methodID != pair.c))
+ // {
+ // String msg = "Invalid object id, expecting " + name + ": " + methodID + " got " + otherName + ": " + pair.c
+ // +
+ // " lock id is " + pair.a;
+ //
+ // log.error(msg);
+ //
+ // thread.dumpSequences();
+ //
+ // SequencedLock.dumpLockMap();
+ //
+ // throw new IllegalStateException(msg);
+ // }
+
+ long sequence = pair.b;
+
+ try
+ {
+ if (!sequencedLock.lock(sequence, unit.toNanos(time)))
+ {
+ // dumpLocksWithName(name);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ if (thread.isFrozen())
+ {
+ log.info("** interrupted and retrying");
+ //We retry and this time it will use the standard mutex - this happens on freezing out
+ return doLock(time, unit, methodID);
+ }
+ }
+
+ addOwner(thread);
+
+ return true;
+ }
+ else
+ {
+ if (unfreezeOwner != null)
+ {
+ //There was an original owner on the SequencedLock when we interrupted so we can't allow any other threads to get the lock
+ //until he has returned
+ freezeLatch.await();
+ }
+
+ boolean ok = lock.tryLock(time, unit);
+
+ if (ok)
+ {
+ if (thread.isRecording())
+ {
+ long sequence = counter.getAndIncrement();
+
+ thread.addSequence(new Triple<Long, Long, Integer>(id, sequence, methodID));
+ }
+
+ addOwner(thread);
+ }
+
+ return ok;
+ }
+ }
+
+ private void doUnlock()
+ {
+ JBMThread thread = JBMThread.currentThread();
+
+ if (thread == unfreezeOwner)
+ {
+ // Don't actually unlock this, since we never had the lock - we had the lock on the original SequencedLock
+
+ unfreezeOwner = null;
+
+ freezeLatch.countDown();
+ }
+ else
+ {
+ if (thread.isReplay())
+ {
+ sequencedLock.unlock();
+ }
+ else
+ {
+ lock.unlock();
+ }
+ }
+
+ removeOwner(thread);
+ }
+
+ // debug only
+ private void addOwner(final JBMThread thread)
+ {
+ owners.add(thread);
+ }
+
+ // debug only
+ private void removeOwner(final JBMThread thread)
+ {
+ owners.remove(thread);
+ }
+
+ // For debug
+ private Set<Thread> owners = new ConcurrentHashSet<Thread>();
+
+}
Deleted: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -1,333 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.replication.impl;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.ConcurrentHashSet;
-
-/**
- * A ReplicationAwareReadWriteLock
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class ReplicationAwareReadWriteLock implements ReadWriteLock
-{
- private static final Logger log = Logger.getLogger(ReplicationAwareReadWriteLock.class);
-
- private final Lock writeLock = new StatefulObjectWriteLock();
-
- private final ReadWriteLock rwLock;
-
- private final AtomicInteger counter;
-
- private final SequencedLock sequencedLock;
-
- private String name;
-
- private volatile boolean debug;
-
- // private Map<Long, StackTraceElement[]> acquirers;
-
- public ReplicationAwareReadWriteLock(final String name, final int initialCount, final boolean debug)
- {
- this.name = name;
-
- rwLock = new ReentrantReadWriteLock();
-
- sequencedLock = new SequencedLock(initialCount);
-
- counter = new AtomicInteger(initialCount);
-
- this.debug = debug;
-
-// if (debug)
-// {
-// acquirers = new HashMap<Long, StackTraceElement[]>();
-//
-// addLock(this);
-// }
- }
-
- public Lock readLock()
- {
- return writeLock;
- }
-
- public Lock writeLock()
- {
- return writeLock;
- }
-
- // private Map<JBMThread, StackTraceElement[]> stackTraces = new ConcurrentHashMap<JBMThread, StackTraceElement[]>();
-
- // debug only
- private void addOwner(final JBMThread thread)
- {
- owners.add(thread);
-
- //stackTraces.put(thread, thread.getStackTrace());
- }
-
- // debug only
- private void removeOwner(final JBMThread thread)
- {
- owners.remove(thread);
-
- // stackTraces.remove(thread);
- }
-
- // For debug
- private Set<Thread> owners = new ConcurrentHashSet<Thread>();
-
- // private static Map<String, List<ReplicationAwareReadWriteLock>> staticMap = new HashMap<String, List<ReplicationAwareReadWriteLock>>();
-
-// private static synchronized void addLock(final ReplicationAwareReadWriteLock lock)
-// {
-// List<ReplicationAwareReadWriteLock> locks = staticMap.get(lock.name);
-//
-// if (locks == null)
-// {
-// locks = new ArrayList<ReplicationAwareReadWriteLock>();
-//
-// staticMap.put(lock.name, locks);
-// }
-//
-// locks.add(lock);
-// }
-
-// private synchronized static void dumpLocksWithName(final String lockName)
-// {
-// List<ReplicationAwareReadWriteLock> locks = staticMap.get(lockName);
-//
-// log.info("******* DUMPING LOCKS WITH NAME " + lockName);
-// log.info("There are " + locks.size() + " locks with name " + lockName);
-//
-// int count = 0;
-// for (ReplicationAwareReadWriteLock lock: locks)
-// {
-// log.info("**** DUMPING LOCK " + count++);
-// for (Map.Entry<Long, StackTraceElement[]> entry : lock.acquirers.entrySet())
-// {
-// Exception e = new Exception();
-// e.setStackTrace(entry.getValue());
-// log.info("sequence: " + entry.getKey(), e);
-// }
-// }
-// }
-
- private AtomicLong lastSequence = new AtomicLong(0);
-
- private boolean doLock(long time, TimeUnit unit, boolean read) throws InterruptedException
- {
- JBMThread thread = JBMThread.currentThread();
-
- // debug only
- if (owners.contains(thread))
- {
-// Exception e = new Exception();
-// e.setStackTrace(stackTraces.get(thread));
-// log.error("Stateful staticLock is not re-entrant, first obtained here", e);
-// Exception e2 = new Exception();
-// log.info("Second attempt to obtain here", e2);
- throw new IllegalStateException("Lock is NOT re-entrant!");
- }
-
- if (thread.isReplay())
- {
- long sequence;
-
- try
- {
- sequence = thread.getNextSequence();
-
- lastSequence.set(sequence);
- }
- catch (IndexOutOfBoundsException e)
- {
- //This can occur if waiting for thread to complete on failover and we've allowed threads to executing in a different
- //order due to setting non strict, so we just set sequence to last sequence + 1
- sequence = lastSequence.incrementAndGet();
- }
-
-// if (debug)
-// {
-// this.acquirers.put(sequence, thread.getStackTrace());
-// }
-
- if (!sequencedLock.lock(sequence, unit.toNanos(time)))
- {
- // dumpLocksWithName(name);
- }
-
- addOwner(thread);
-
- return true;
- }
- else
- {
- boolean ok;
-
- if (read)
- {
- ok = rwLock.readLock().tryLock(time, unit);
- }
- else
- {
- ok = rwLock.writeLock().tryLock(time, unit);
- }
-
- if (ok)
- {
- if (thread.isRecording())
- {
- long sequence = counter.getAndIncrement();
-
- thread.addSequence(sequence);
-
-// if (debug)
-// {
-// this.acquirers.put(sequence, thread.getStackTrace());
-// }
- }
-
- addOwner(thread);
- }
-
- return ok;
- }
- }
-
- private void doUnlock(final boolean read)
- {
- JBMThread thread = JBMThread.currentThread();
-
- if (thread.isReplay())
- {
- sequencedLock.unlock();
- }
- else
- {
- if (read)
- {
- rwLock.readLock().unlock();
- }
- else
- {
- rwLock.writeLock().unlock();
- }
- }
-
- removeOwner(thread);
- }
-
- private class StatefulObjectReadLock implements Lock
- {
- public void lock()
- {
- // throw new UnsupportedOperationException();
- try
- {
- doLock(10000, TimeUnit.MILLISECONDS, true);
- }
- catch (InterruptedException e)
- {
- }
- }
-
- public void lockInterruptibly() throws InterruptedException
- {
- throw new UnsupportedOperationException();
- }
-
- public Condition newCondition()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean tryLock()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
- {
- return doLock(time, unit, true);
- }
-
- public void unlock()
- {
- doUnlock(true);
- }
- }
-
- private class StatefulObjectWriteLock implements Lock
- {
- public void lock()
- {
- // throw new UnsupportedOperationException();
- try
- {
- doLock(10000, TimeUnit.MILLISECONDS, false);
- }
- catch (InterruptedException e)
- {
- }
- }
-
- public void lockInterruptibly() throws InterruptedException
- {
- throw new UnsupportedOperationException();
- }
-
- public Condition newCondition()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean tryLock()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
- {
- return doLock(time, unit, false);
- }
-
- public void unlock()
- {
- doUnlock(false);
- }
- }
-}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -26,6 +26,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.Triple;
/**
* A ReplicationAwareSharedCounter
@@ -61,7 +62,15 @@
throw new IllegalStateException("Thread should not be recording");
}
- long sequence = thread.getNextSequence();
+ Triple<Long, Long, Integer> pair = thread.getNextSequence();
+
+ //Sanity check
+ if (pair.a != -1)
+ {
+ throw new IllegalStateException("Sequences in wrong order");
+ }
+
+ long sequence = pair.b;
//FIXME - this will be slow - too much contention with many threads
synchronized (this)
@@ -80,11 +89,9 @@
if (thread.isRecording())
{
- thread.addSequence(sequence);
+ thread.addSequence(new Triple<Long, Long, Integer>(-1L, sequence, -1));
}
- // log.info(System.identityHashCode(this) + " got sequence non replicated " + sequence);
-
return sequence;
}
}
@@ -93,15 +100,4 @@
{
return al.get();
}
-
- // public long getAndIncrement(long expected)
- // {
- // while (!al.compareAndSet(expected, expected + 1))
- // {
- // Thread.yield();
- // }
- //
- // return expected;
- // }
-
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -32,10 +32,11 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.impl.QueuedWriteManager;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
+import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.Triple;
/**
* A ReplicatorImpl
@@ -143,8 +144,13 @@
thread.setNoReplayOrRecord();
- List<Long> sequences = JBMThread.currentThread().getSequences();
+ List<Triple<Long, Long, Integer>> sequences = thread.getSequences();
+ // log.info("Replicating:");
+
+ // thread.dumpSequences();
+
+
// dumpSequences(sequences);
// We then send the sequences to the backup
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -23,11 +23,12 @@
package org.jboss.messaging.core.server.replication.impl;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.LockSupport;
import org.jboss.messaging.core.logging.Logger;
@@ -50,31 +51,158 @@
private Thread owner;
private final AtomicBoolean locked = new AtomicBoolean(false);
-
- private volatile boolean strictOrder = true;
- public SequencedLock(final long sequence)
+ // private volatile boolean strictOrder = true;
+
+ private static Map<Long, SequencedLock> locks = new HashMap<Long, SequencedLock>();
+
+ // for debug only
+ // private final long id;
+
+ private final String name;
+
+ public Thread getOwner()
{
- queue = new PriorityBlockingQueue<QueueEntry>();
+ return owner;
+ }
- this.currentSequence = new AtomicLong(sequence);
+ // private static synchronized void registerLock(final SequencedLock lock)
+ // {
+ // locks.put(lock.id, lock);
+ // }
+ //
+ // public static synchronized SequencedLock getLock(final long id)
+ // {
+ // return locks.get(id);
+ // }
+
+ public String getName()
+ {
+ return name;
}
-
- public void setStrict(boolean strict)
+
+ public static synchronized void dumpLockMap()
{
- this.strictOrder = strict;
-
- QueueEntry entry = peekEntry();
+ log.info("Dumping lock name map");
+ for (Map.Entry<Long, SequencedLock> entry : locks.entrySet())
+ {
+ log.info(entry.getKey() + ": " + entry.getValue().getName());
+ }
+ }
- if (entry != null)
+ public static synchronized void dumpLocks()
+ {
+ for (SequencedLock lock : locks.values())
{
- LockSupport.unpark(entry.thread);
+ if (lock.owner != null)
+ {
+ log.info("************ Found lock with owner");
+ lock.dump();
+
+ // //lock.setNonStrict();
+ //
+ // log.info("Nudging lock");
+ //
+ // QueueEntry entry = lock.peekEntry();
+ //
+ // if (entry != null)
+ // {
+ // LockSupport.unpark(entry.thread);
+ // }
+ }
}
}
-
- //TODO parking with a timeout seems to be a lot slower than parking without timeout
- public boolean lock(final long sequence, final long timeout)
+
+ public synchronized void dump()
{
+ log.info("Dumping lock " + System.identityHashCode(this));
+ // log.info("*** This lock is strict " + strictOrder);
+
+ log.info("owner is " + owner);
+
+ if (owner != null)
+ {
+
+ Exception e = new Exception();
+ e.setStackTrace(owner.getStackTrace());
+ log.info("Owner trace", e);
+
+ JBMThread thr = (JBMThread)owner;
+
+// SequencedLock waitingFor = thr.getLastLock();
+//
+// if (waitingFor != null)
+// {
+// while (true)
+// {
+// log.info("waiting for...");
+// waitingFor.dump();
+//
+// thr = (JBMThread)waitingFor.owner;
+//
+// if (thr != null)
+// {
+// waitingFor = thr.getLastLock();
+//
+// if (waitingFor != null)
+// {
+// continue;
+// }
+// }
+//
+// break;
+// }
+// }
+ }
+
+ // log.info("Waiting threads: " + queue.size());
+ // for (QueueEntry entry: queue)
+ // {
+ // e = new Exception();
+ // e.setStackTrace(owner.getStackTrace());
+ // log.info("Waiting trace, sequence " + entry.sequence, e);
+ // }
+ }
+
+ public SequencedLock(final String name, final long sequence)
+ {
+ // this.id = id;
+
+ this.name = name;
+
+ queue = new PriorityBlockingQueue<QueueEntry>();
+
+ this.currentSequence = new AtomicLong(sequence);
+
+ // registerLock(this);
+ }
+
+ // public void setNonStrict()
+ // {
+ // //log.info(System.identityHashCode(this) + " setting non strict");
+ // if (strictOrder)
+ // {
+ // strictOrder = false;
+ //
+ // QueueEntry entry = peekEntry();
+ //
+ // if (entry != null)
+ // {
+ // // log.info(System.identityHashCode(this) + " setting non strict unparking " + entry.thread);
+ // //LockSupport.unpark(entry.thread);
+ // entry.thread.jbmUnpark();
+ // }
+ // }
+ // }
+ //
+ // public boolean isStrict()
+ // {
+ // return strictOrder;
+ // }
+
+ // TODO parking with a timeout seems to be a lot slower than parking without timeout
+ public boolean lock(final long sequence, final long timeout) throws InterruptedException
+ {
JBMThread currentThread = JBMThread.currentThread();
QueueEntry entry = new QueueEntry(sequence, currentThread);
@@ -89,19 +217,26 @@
{
QueueEntry peeked = peekEntry();
- if (peeked == null || peeked.thread != currentThread || !locked.compareAndSet(false, true))
- {
- currentThread.setLastLock(this);
+ if (peeked == null || // There are higher priority threads
+ peeked.thread != currentThread ||
+ // Next thread is not this one
+ !locked.compareAndSet(false, true)) // Lock is already locked
+ {
+ //currentThread.setLastLock(this);
+
+ currentThread.setWaitingOnSequencedLock(true);
+
+ // LockSupport.parkNanos(toWait);
+ // log.info(System.identityHashCode(this) + " parking " + currentThread);
+ currentThread.jbmPark(toWait);
- LockSupport.parkNanos(toWait);
+ currentThread.setWaitingOnSequencedLock(false);
-// if (currentThread.isJBMInterrupted())
-// {
-// log.info("**** setting strict order to false");
-// strictOrder = false;
-//
-// //Thread.interrupted();
-// }
+ if (Thread.interrupted() && currentThread.isFrozen())
+ {
+ log.info("** It's been interrupted");
+ throw new InterruptedException();
+ }
long now = System.nanoTime();
@@ -109,8 +244,10 @@
if (toWait <= 0)
{
- log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() + " expected " + sequence);
-
+ log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() +
+ " expected " +
+ sequence);
+
return false;
}
@@ -125,6 +262,8 @@
queue.remove();
owner = currentThread;
+
+
return true;
}
@@ -136,40 +275,64 @@
throw new IllegalMonitorStateException();
}
+ // log.info(System.identityHashCode(this) + " unlocking");
+
currentSequence.incrementAndGet();
+ owner = null;
+
locked.set(false);
QueueEntry entry = peekEntry();
if (entry != null)
{
- LockSupport.unpark(entry.thread);
+ // log.info(System.identityHashCode(this) + " unparking at unlock " + entry.thread);
+ // LockSupport.unpark(entry.thread);
+ entry.thread.jbmUnpark();
}
+ // else
+ // {
+ // log.info(System.identityHashCode(this) + " nothing to unpark at unlock");
+ // }
}
+ // private QueueEntry peekEntry()
+ // {
+ // QueueEntry entry = queue.peek();
+ //
+ // if (entry != null)
+ // {
+ // if (!strictOrder || entry.sequence == currentSequence.get())
+ // {
+ // return entry;
+ // }
+ // }
+ //
+ // return null;
+ // }
+
private QueueEntry peekEntry()
{
QueueEntry entry = queue.peek();
- if (entry != null)
+ if (entry != null && entry.sequence == currentSequence.get())
{
- if (!strictOrder ||entry.sequence == currentSequence.get())
- {
- return entry;
- }
+ return entry;
}
-
- return null;
+ else
+ {
+ return null;
+ }
}
private static final class QueueEntry implements Comparable<QueueEntry>
{
private final long sequence;
- private final Thread thread;
+ private final JBMThread thread;
- private QueueEntry(final long sequence, final Thread thread)
+ private QueueEntry(final long sequence, final JBMThread thread)
{
this.sequence = sequence;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -410,6 +410,10 @@
return null;
}
}
+
+ public void setNeverFail()
+ {
+ }
// Public --------------------------------------------------------
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -20,7 +20,6 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.tests.integration.cluster.failover;
import java.util.ArrayList;
@@ -52,7 +51,7 @@
{
// Constants -----------------------------------------------------
-
+
private final Logger log = Logger.getLogger(this.getClass());
// Attributes ----------------------------------------------------
@@ -72,41 +71,40 @@
protected abstract void start() throws Exception;
protected abstract void stop() throws Exception;
-
+
protected abstract ClientSessionFactoryInternal createSessionFactory();
-
+
protected void setUp() throws Exception
{
super.setUp();
timer = new Timer();
}
-
+
protected void tearDown() throws Exception
{
timer.cancel();
super.tearDown();
}
-
+
protected boolean shouldFail()
{
return true;
}
-
-
protected void runMultipleThreadsFailoverTest(final RunnableT runnable,
- final int numThreads,
- final int numIts,
- final boolean failOnCreateConnection,
- final long failDelay) throws Exception
+ final int numThreads,
+ final int numIts,
+ final boolean failOnCreateConnection,
+ final long failDelay) throws Exception
{
for (int its = 0; its < numIts; its++)
{
log.info("Beginning iteration " + its);
-
+
start();
final ClientSessionFactoryInternal sf = createSessionFactory();
+ log.info("Created client session factory");
final ClientSession session = sf.createSession(false, true, true);
@@ -123,7 +121,7 @@
Runner(final String name, final RunnableT test, final int threadNum)
{
super(name);
-
+
this.test = test;
this.threadNum = threadNum;
@@ -144,7 +142,8 @@
// Case a failure happened here, it should print the Thread dump
// Sending it to System.out, as it would show on the Tests report
- //System.out.println(threadDump(" - fired by MultiThreadRandomFailoverTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() + ")"));
+ // System.out.println(threadDump(" - fired by
+ // MultiThreadRandomFailoverTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() + ")"));
}
}
}
@@ -184,14 +183,13 @@
assertEquals(0, sf.numSessions());
assertEquals(0, sf.numConnections());
-
+
sf.close();
stop();
}
}
-
// Private -------------------------------------------------------
private Failer startFailer(final long time, final ClientSession session, final boolean failOnCreateConnection)
@@ -207,10 +205,8 @@
return failer;
}
-
// Inner classes -------------------------------------------------
-
protected abstract class RunnableT extends Thread
{
private volatile String failReason;
@@ -238,8 +234,6 @@
public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
}
-
-
private class Failer extends TimerTask
{
private final ClientSession session;
@@ -285,6 +279,4 @@
}
}
-
-
}
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -90,7 +90,7 @@
{
doTestA(sf, threadNum);
}
- }, 1, false);
+ }, NUM_THREADS, false);
}
public void testB() throws Exception
@@ -117,7 +117,7 @@
}, NUM_THREADS, false);
}
- public void testD() throws Exception
+ public void testD() throws Exception
{
runTestMultipleThreads(new RunnableT()
{
@@ -1109,7 +1109,7 @@
*/
protected void doTestL(final ClientSessionFactory sf) throws Exception
{
- final int numSessions = 10;
+ final int numSessions = 100;
for (int i = 0; i < numSessions; i++)
{
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java 2009-07-15 12:28:58 UTC (rev 7572)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java 2009-07-15 12:37:24 UTC (rev 7573)
@@ -27,6 +27,7 @@
import junit.framework.TestCase;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.replication.impl.JBMThread;
import org.jboss.messaging.core.server.replication.impl.SequencedLock;
/**
@@ -39,64 +40,144 @@
public class SequencedLockTest extends TestCase
{
private static final Logger log = Logger.getLogger(SequencedLockTest.class);
-
+
private static final long TIMEOUT = 5000000000L;
- public void testAscending() throws Exception
+ public void testAscendingStrict() throws Exception
{
for (int i = 0; i < 1000; i++)
{
- //log.info("iter " + i);
- this.doTestSequences(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ // log.info("iter " + i);
+ this.doTestSequences(true, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
}
}
- public void testDescending() throws Exception
+ public void testDescendingStrict() throws Exception
{
for (int i = 0; i < 1000; i++)
{
- //log.info("iter " + i);
- this.doTestSequences(9, 8, 7, 6, 5, 4, 3, 2, 1, 0);
+ // log.info("iter " + i);
+ this.doTestSequences(true, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0);
}
}
- public void testAlternate() throws Exception
+ public void testAlternateStrict() throws Exception
{
for (int i = 0; i < 1000; i++)
{
- // log.info("iter " + i);
- this.doTestSequences(9, 0, 8, 1, 7, 2, 6, 3, 5, 4);
+ // log.info("iter " + i);
+ this.doTestSequences(true, 9, 0, 8, 1, 7, 2, 6, 3, 5, 4);
}
}
- public void testRandom() throws Exception
+ public void testRandomStrict() throws Exception
{
for (int i = 0; i < 1000; i++)
{
- // log.info("iter " + i);
- this.doTestSequences(3, 9, 5, 7, 1, 2, 0, 8, 6, 4);
+ // log.info("iter " + i);
+ this.doTestSequences(true, 3, 9, 5, 7, 1, 2, 0, 8, 6, 4);
}
}
+ public void testAscendingNonStrict() throws Exception
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ // log.info("iter " + i);
+ this.doTestSequences(false, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ }
+ }
+
+ public void testDescendingNonStrict() throws Exception
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ // log.info("iter " + i);
+ this.doTestSequences(false, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0);
+ }
+ }
+
+ public void testAlternateNonStrict() throws Exception
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ // log.info("iter " + i);
+ this.doTestSequences(false, 9, 0, 8, 1, 7, 2, 6, 3, 5, 4);
+ }
+ }
+
+ public void testRandomNonStrict() throws Exception
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ // log.info("iter " + i);
+ this.doTestSequences(false, 3, 9, 5, 7, 1, 2, 0, 8, 6, 4);
+ }
+ }
+
public void testSpeed() throws Exception
{
SequencedLock lock = new SequencedLock(0);
AtomicInteger counter = new AtomicInteger(0);
-
+
AtomicInteger obcounter = new AtomicInteger(0);
-
+
final int numThreads = 10;
-
+
final int acquires = 100000;
-
+
MyThread2[] threads = new MyThread2[numThreads];
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ threads[i] = new MyThread2(lock, acquires, counter, obcounter, true);
+ }
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ threads[i].join();
+ }
for (int i = 0; i < numThreads; i++)
{
- threads[i] = new MyThread2(lock, acquires, counter, obcounter);
+ assertNull(threads[i].exception);
}
-
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * ((double)acquires * numThreads) / (end - start);
+
+ System.out.println("Rate " + rate);
+
+ }
+
+ public void testSpeedChangeNonStrict() throws Exception
+ {
+ SequencedLock lock = new SequencedLock(0);
+
+ AtomicInteger counter = new AtomicInteger(0);
+
+ AtomicInteger obcounter = new AtomicInteger(0);
+
+ final int numThreads = 10;
+
+ final int acquires = 10000;
+
+ MyThread2[] threads = new MyThread2[numThreads];
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ threads[i] = new MyThread2(lock, acquires, counter, obcounter, false);
+ }
+
long start = System.currentTimeMillis();
for (int i = 0; i < numThreads; i++)
@@ -104,68 +185,104 @@
threads[i].start();
}
+ // Thread.sleep(500);
+
+ log.info("Setting non strict");
+
+ lock.setNonStrict();
+
for (int i = 0; i < numThreads; i++)
{
threads[i].join();
}
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ assertNull(threads[i].exception);
+ }
+ log.info("done");
+
long end = System.currentTimeMillis();
-
+
double rate = 1000 * ((double)acquires * numThreads) / (end - start);
System.out.println("Rate " + rate);
-
}
- class MyThread2 extends Thread
+ class MyThread2 extends JBMThread
{
private int acquireCount;
-
+
private final AtomicInteger counter;
-
+
private final AtomicInteger observedCounter;
-
+
private final SequencedLock lock;
- MyThread2(final SequencedLock lock, final int acquires, final AtomicInteger counter, final AtomicInteger observedCounter)
+ private final boolean strict;
+
+ private volatile Exception exception;
+
+ MyThread2(final SequencedLock lock,
+ final int acquires,
+ final AtomicInteger counter,
+ final AtomicInteger observedCounter,
+ final boolean strict)
{
- this.lock= lock;
-
+ this.lock = lock;
+
this.acquireCount = acquires;
-
+
this.counter = counter;
-
+
this.observedCounter = observedCounter;
+
+ this.strict = strict;
}
-
+
public void run()
{
- for (int i = 0; i < acquireCount; i++)
+ try
{
- int cnt = counter.getAndIncrement();
-
- // log.info("Trying lock " + cnt);
-
- lock.lock(cnt, TIMEOUT);
-
- int ob = observedCounter.getAndIncrement();
-
- // log.info("Got lock " + ob);
-
- if (cnt != ob)
+ for (int i = 0; i < acquireCount; i++)
{
- System.out.println("Out of order, cnt " + cnt + " ob " + ob);
+ int cnt = counter.getAndIncrement();
+
+ // log.info("Trying lock " + cnt);
+
+ lock.lock(cnt, TIMEOUT);
+
+ int ob = observedCounter.getAndIncrement();
+
+ // log.info("Got lock " + ob);
+
+ if (strict && cnt != ob)
+ {
+ System.out.println("Out of order, cnt " + cnt + " ob " + ob);
+ }
+
+ lock.unlock();
}
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to run", e);
- lock.unlock();
+ this.exception = e;
}
}
}
- private void doTestSequences(final int... sequences) throws Exception
+ private void doTestSequences(final boolean strict, final int... sequences) throws Exception
{
SequencedLock lock = new SequencedLock(0);
+ if (!strict)
+ {
+ lock.setNonStrict();
+ }
+
AtomicInteger counter = new AtomicInteger(0);
MyThread[] threads = new MyThread[sequences.length];
@@ -184,14 +301,22 @@
{
threads[i].join();
}
-
+
for (int i = 0; i < sequences.length; i++)
{
- assertEquals(sequences[i], threads[i].getObtainedSeq());
+ assertNull(threads[i].exception);
}
+
+ if (strict)
+ {
+ for (int i = 0; i < sequences.length; i++)
+ {
+ assertEquals(sequences[i], threads[i].getObtainedSeq());
+ }
+ }
}
- class MyThread extends Thread
+ class MyThread extends JBMThread
{
private AtomicInteger counter;
@@ -200,6 +325,8 @@
private final int seq;
private volatile int obtainedSeq;
+
+ private volatile Exception exception;
MyThread(final int seq, final AtomicInteger counter, final SequencedLock lock)
{
@@ -212,19 +339,27 @@
public void run()
{
- // log.info(System.identityHashCode(this) + " Will attempt to obtain lock with sequence " + seq);
-
- lock.lock(seq, TIMEOUT);
-
- // log.info(System.identityHashCode(this) + " Obtained lock with sequence " + seq);
-
try
{
- obtainedSeq = counter.getAndIncrement();
+ // log.info(System.identityHashCode(this) + " Will attempt to obtain lock with sequence " + seq);
+
+ lock.lock(seq, TIMEOUT);
+
+ // log.info(System.identityHashCode(this) + " Obtained lock with sequence " + seq);
+
+ try
+ {
+ obtainedSeq = counter.getAndIncrement();
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
- finally
+ catch (Exception e)
{
- lock.unlock();
+ log.error("Failed to run", e);
+ this.exception = e;
}
}
More information about the jboss-cvs-commits
mailing list