[jboss-cvs] JBoss Messaging SVN: r7513 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/management/impl and 17 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 2 07:26:08 EDT 2009
Author: timfox
Date: 2009-07-02 07:26:07 -0400 (Thu, 02 Jul 2009)
New Revision: 7513
Added:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableCall.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
Removed:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableAction.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/StatefulObjectReadWriteLock.java
Modified:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/AddressManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/Binding.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicationResponseMessage.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/Queue.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/QueueFactory.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/PriorityLock.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
Log:
MT replication continued
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -67,6 +67,7 @@
case SESS_RECEIVE_CONTINUATION:
{
SessionReceiveContinuationMessage continuation = (SessionReceiveContinuationMessage)packet;
+
clientSession.handleReceiveContinuation(continuation.getConsumerID(), continuation);
break;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -613,8 +613,6 @@
public void sendNotification(final Notification notification) throws Exception
{
- log.info("messagingservercontrol " + this.messagingServerControl + " ne " + this.notificationsEnabled);
-
if (messagingServerControl != null && notificationsEnabled)
{
// This needs to be synchronized since we need to ensure notifications are processed in strict sequence
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -48,7 +48,7 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.replication.impl.StatefulObjectReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
@@ -117,7 +117,7 @@
* */
private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
- //private final Lock lock = StatefulObjectReadWriteLock.createLock().writeLock();
+ //private final Lock lock = ReplicationAwareReadWriteLock.createLock().writeLock();
private volatile boolean running = false;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -68,6 +68,7 @@
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareAtomicLong;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionOperation;
@@ -250,8 +251,6 @@
{
long id = idGenerator.generateID();
- log.info("Generating unique id on backup " + backup + " id " + id, new Exception());
-
return id;
}
@@ -1115,7 +1114,7 @@
private class BatchingIDGenerator implements IDGenerator
{
- private final AtomicLong counter;
+ private final ReplicationAwareAtomicLong counter;
private final long checkpointSize;
@@ -1123,7 +1122,7 @@
public BatchingIDGenerator(final long start, final long checkpointSize)
{
- this.counter = new AtomicLong(start);
+ this.counter = new ReplicationAwareAtomicLong(start);
this.checkpointSize = checkpointSize;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
@@ -38,6 +37,7 @@
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareAtomicLong;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.SimpleString;
@@ -55,7 +55,7 @@
{
private static final Logger log = Logger.getLogger(NullStorageManager.class);
- private final AtomicLong idSequence = new AtomicLong(0);
+ private final ReplicationAwareAtomicLong idSequence = new ReplicationAwareAtomicLong(0);
private UUID id;
@@ -193,8 +193,6 @@
{
long id = idSequence.getAndIncrement();
- // log.info("Generating unique id on backup " + backup + " id " + id, new Exception());
-
return id;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/AddressManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/AddressManager.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/AddressManager.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -45,6 +45,8 @@
void clear();
Binding getBinding(SimpleString queueName);
+
+ Binding getBindingByID(long id);
Map<SimpleString, Binding> getBindings();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/Binding.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/Binding.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/Binding.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -57,9 +57,7 @@
boolean isExclusive();
- int getID();
+ long getID();
- void setID(int id);
-
int getDistance();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -59,6 +59,8 @@
Binding getBinding(SimpleString uniqueName);
+ Binding getBindingByID(long id);
+
Bindings getMatchingBindings(SimpleString address);
void route(ServerMessage message) throws Exception;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -49,7 +49,7 @@
private final SimpleString filterString;
- private final int id;
+ private final long id;
private List<SimpleString> filterStrings;
@@ -57,7 +57,7 @@
private final int distance;
- public QueueInfo(final SimpleString routingName, final SimpleString clusterName, final SimpleString address, final SimpleString filterString, final int id,
+ public QueueInfo(final SimpleString routingName, final SimpleString clusterName, final SimpleString address, final SimpleString filterString, final long id,
final Integer distance)
{
if (routingName == null)
@@ -109,7 +109,7 @@
return distance;
}
- public int getID()
+ public long getID()
{
return id;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -60,7 +60,7 @@
private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<SimpleString, Integer>();
- private final Map<Integer, Binding> bindingsMap = new ConcurrentHashMap<Integer, Binding>();
+ private final Map<Long, Binding> bindingsMap = new ConcurrentHashMap<Long, Binding>();
private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -154,13 +154,13 @@
SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
- Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
+ Long bindingID = (Long)props.getProperty(ManagementHelper.HDR_BINDING_ID);
SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
- QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, transientID, distance);
+ QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, bindingID, distance);
queueInfos.put(clusterName, info);
@@ -373,7 +373,7 @@
message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
- message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
+ message.putLongProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -54,10 +54,12 @@
private final boolean exclusive;
- private int id;
+ private final long id;
- public DivertBinding(final SimpleString address, final Divert divert)
+ public DivertBinding(final long id, final SimpleString address, final Divert divert)
{
+ this.id = id;
+
this.address = address;
this.divert = divert;
@@ -71,16 +73,11 @@
this.exclusive = divert.isExclusive();
}
- public int getID()
+ public long getID()
{
return id;
}
- public void setID(final int id)
- {
- this.id = id;
- }
-
public Filter getFilter()
{
return filter;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -57,7 +57,7 @@
private final SimpleString name;
- private int id;
+ private final long id;
private SimpleString clusterName;
@@ -72,18 +72,15 @@
this.name = queue.getName();
this.clusterName = name.concat(nodeID);
+
+ this.id = queue.getID();
}
- public int getID()
+ public long getID()
{
return id;
}
- public void setID(final int id)
- {
- this.id = id;
- }
-
public Filter getFilter()
{
return filter;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -59,7 +59,7 @@
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.core.server.replication.impl.StatefulObjectReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.Transaction;
@@ -119,9 +119,9 @@
// on a particular node, and all would
// have to be specified in the message. Specify 10000 ints takes up a lot less space than 10000 arbitrary queue names
// The drawback of this approach is we only allow up to 2^32 queues in memory at any one time
- private int transientIDSequence;
+ // private int transientIDSequence;
- private Set<Integer> transientIDs = new HashSet<Integer>();
+ // private Set<Integer> transientIDs = new HashSet<Integer>();
// private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
@@ -178,7 +178,7 @@
// this.addressSettingsRepository = addressSettingsRepository;
- lock = new StatefulObjectReadWriteLock("postoffice", storageManager.generateUniqueID(), 0);
+ lock = new ReplicationAwareReadWriteLock("postoffice", 0);
}
// MessagingComponent implementation ---------------------------------------
@@ -218,7 +218,7 @@
// queueInfos.clear();
- transientIDs.clear();
+ // transientIDs.clear();
started = false;
}
@@ -460,7 +460,7 @@
lock.writeLock().lock();
try
{
- binding.setID(generateTransientID());
+ //binding.setID(generateTransientID());
boolean addressExists = addressManager.getBindingsForRoutingAddress(binding.getAddress()) != null;
@@ -498,7 +498,7 @@
props.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
- props.putIntProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
+ props.putLongProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -510,8 +510,7 @@
}
String uid = UUIDGenerator.getInstance().generateStringUUID();
-
- log.info("*** sending notification");
+
managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
}
@@ -546,9 +545,7 @@
}
}
- releaseTransientID(binding.getID());
-
-
+ //releaseTransientID(binding.getID());
}
finally
{
@@ -602,6 +599,19 @@
lock.readLock().unlock();
}
}
+
+ public Binding getBindingByID(final long id)
+ {
+ lock.readLock().lock();
+ try
+ {
+ return addressManager.getBindingByID(id);
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
public Bindings getMatchingBindings(final SimpleString address)
{
@@ -880,16 +890,17 @@
private synchronized void startExpiryScanner()
{
- if (reaperPeriod > 0)
- {
- reaper = new Reaper();
-
- expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
-
- expiryReaper.setPriority(reaperPriority);
-
- expiryReaper.start();
- }
+ //TODO disabled for now
+// if (reaperPeriod > 0)
+// {
+// reaper = new Reaper();
+//
+// expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
+//
+// expiryReaper.setPriority(reaperPriority);
+//
+// expiryReaper.start();
+// }
}
// private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
@@ -918,30 +929,30 @@
// return message;
// }
- private int generateTransientID()
- {
- int start = transientIDSequence;
- do
- {
- int id = transientIDSequence++;
+// private int generateTransientID()
+// {
+// int start = transientIDSequence;
+// do
+// {
+// int id = transientIDSequence++;
+//
+// if (!transientIDs.contains(id))
+// {
+// transientIDs.add(id);
+//
+// return id;
+// }
+// }
+// while (transientIDSequence != start);
+//
+// throw new IllegalStateException("Run out of queue ids!");
+// }
+//
+// private void releaseTransientID(final int id)
+// {
+// transientIDs.remove(id);
+// }
- if (!transientIDs.contains(id))
- {
- transientIDs.add(id);
-
- return id;
- }
- }
- while (transientIDSequence != start);
-
- throw new IllegalStateException("Run out of queue ids!");
- }
-
- private void releaseTransientID(final int id)
- {
- transientIDs.remove(id);
- }
-
private final PageMessageOperation getPageOperation(final Transaction tx)
{
// you could have races on the case two sessions using the same XID
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -45,6 +45,8 @@
private final Map<SimpleString, Bindings> mappings = new HashMap<SimpleString, Bindings>();
private final Map<SimpleString, Binding> nameMap = new HashMap<SimpleString, Binding>();
+
+ private final Map<Long, Binding> idMap = new HashMap<Long, Binding>();
public void addBinding(final Binding binding)
{
@@ -55,9 +57,9 @@
nameMap.put(binding.getUniqueName(), binding);
- addMappingInternal(binding.getAddress(), binding);
+ idMap.put(binding.getID(), binding);
- log.info(System.identityHashCode(this) + " adding binding " + binding.getUniqueName());
+ addMappingInternal(binding.getAddress(), binding);
}
public Binding removeBinding(final SimpleString uniqueName)
@@ -68,6 +70,8 @@
{
return null;
}
+
+ idMap.remove(binding.getID());
removeBindingInternal(binding.getAddress(), uniqueName);
@@ -81,20 +85,13 @@
public Binding getBinding(final SimpleString bindableName)
{
- log.info(System.identityHashCode(this) + " dumping bindings");
-
- for (SimpleString name: nameMap.keySet())
- {
- log.info("binding name: " + name);
- }
-
- for (SimpleString address: mappings.keySet())
- {
- log.info("address name: " + address);
- }
-
return nameMap.get(bindableName);
}
+
+ public Binding getBindingByID(final long id)
+ {
+ return idMap.get(id);
+ }
public Map<SimpleString, Binding> getBindings()
{
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -94,8 +94,7 @@
* @param binding the binding to add
*/
public void addBinding(final Binding binding)
- {
- log.info("Adding binding " + binding.getAddress() + " name " + binding.getUniqueName());
+ {
super.addBinding(binding);
Address add = addAndUpdateAddressMap(binding.getAddress());
if (add.containsWildCard())
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -24,11 +24,8 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
+import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -41,7 +38,6 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
-import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
@@ -50,7 +46,6 @@
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.server.replication.impl.JBMThread;
-import org.jboss.messaging.utils.Pair;
/**
* A ChannelImpl
@@ -180,26 +175,29 @@
{
send(packet, false);
}
-
+
// This must never called by more than one thread concurrently
public void send(final Packet packet, final boolean flush)
{
- //FIXME - this is a bit hacky
-
+ // FIXME - this is a bit hacky
+
Thread t = Thread.currentThread();
-
+
if (t instanceof JBMThread)
- {
+ {
JBMThread thread = (JBMThread)t;
-
+
if (thread.isRecording())
{
thread.getReplicator().registerWaitingChannel(this);
-
- log.info("Queueing write");
-
- queuedWrites.add(new Pair<Replicator, Packet>(thread.getReplicator(), packet));
-
+
+ QueuedWrite qw = new QueuedWrite();
+ qw.replicator = thread.getReplicator();
+ qw.packet = packet;
+ //qw.sequence = qw.replicator.getReplicateSequence();
+
+ queuedWrites.add(qw);
+
return;
}
}
@@ -249,7 +247,6 @@
if (connection.isActive() || packet.isWriteAlways())
{
- log.info("actually writing packet " + packet.getType());
connection.getTransportConnection().write(buffer, flush);
}
}
@@ -318,7 +315,7 @@
{
resendCache.add(packet);
}
-
+
connection.getTransportConnection().write(buffer);
long toWait = connection.getBlockingCallTimeout();
@@ -498,15 +495,15 @@
{
connection.removeChannel(id);
-// if (replicatingChannel != null)
-// {
-// // If we're reconnecting to a live node which is replicated then there will be a replicating channel
-// // too. We need to then make sure that all replication responses come back since packets aren't
-// // considered confirmed until response comes back and is processed. Otherwise responses to previous
-// // message sends could come back after reconnection resulting in clients resending same message
-// // since it wasn't confirmed yet.
-// replicatingChannel.waitForAllReplicationResponse();
-// }
+ // if (replicatingChannel != null)
+ // {
+ // // If we're reconnecting to a live node which is replicated then there will be a replicating channel
+ // // too. We need to then make sure that all replication responses come back since packets aren't
+ // // considered confirmed until response comes back and is processed. Otherwise responses to previous
+ // // message sends could come back after reconnection resulting in clients resending same message
+ // // since it wasn't confirmed yet.
+ // replicatingChannel.waitForAllReplicationResponse();
+ // }
// And switch it
@@ -524,6 +521,7 @@
for (final Packet packet : resendCache)
{
+ log.info("Replaying command " + packet);
doWrite(packet);
}
}
@@ -612,7 +610,7 @@
else
{
if (packet.isResponse())
- {
+ {
response = packet;
confirm(packet);
@@ -775,28 +773,76 @@
sendSemaphore.release(sizeToFree);
}
}
-
-
- private java.util.Queue<Pair<Replicator, Packet>> queuedWrites = new ConcurrentLinkedQueue<Pair<Replicator, Packet>>();
+ private java.util.Queue<QueuedWrite> queuedWrites = new ConcurrentLinkedQueue<QueuedWrite>();
+
+ private final Object replicationLock = new Object();
+
+ // we only include sequence for debug
+ private static class QueuedWrite
+ {
+ Replicator replicator;
+
+ Packet packet;
+
+ // long sequence;
+
+ boolean done;
+ }
+
public void replicationResponseReceived(final Replicator replicator)
{
- Pair<Replicator, Packet> pair = queuedWrites.peek();
-
- if (pair != null && pair.a == replicator)
- {
- do
+ synchronized (replicationLock)
+ {
+ QueuedWrite qw = queuedWrites.peek();
+
+ //We assume max of only one queued write per channel per replicator for now - this is true for
+ //all actions
+
+ //And we only send a replication response if we're waiting for a queued write TODO!
+
+ if (qw.replicator == replicator)
{
queuedWrites.remove();
-
- log.info("replication response received, sending packet " + pair.b);
- send(pair.b);
-
- pair = queuedWrites.peek();
+
+ send(qw.packet);
+
+ qw = queuedWrites.peek();
+
+ while (qw != null)
+ {
+ if (qw.done)
+ {
+ queuedWrites.remove();
+
+ send(qw.packet);
+
+ qw = queuedWrites.peek();
+ }
+ else
+ {
+ break;
+ }
+ }
}
- while (pair != null && pair.a.isResponseReceived());
+ else
+ {
+ Iterator<QueuedWrite> iter = queuedWrites.iterator();
+
+ iter.next();
+
+ while (iter.hasNext())
+ {
+ qw = iter.next();
+
+ if (qw.replicator == replicator)
+ {
+ qw.done = true;
+
+ break;
+ }
+ }
+ }
}
}
-
-
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -33,10 +33,12 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_QUEUE_REPLICATION_CHANNEL;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ACKNOWLEDGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_QUEUE_DELIVERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REDISTRIBUTION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING;
@@ -122,6 +124,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
@@ -148,8 +151,6 @@
{
final byte packetType = in.readByte();
- log.info("decoding packet " + packetType);
-
Packet packet;
switch (packetType)
@@ -434,6 +435,16 @@
packet = new ReplicateLockSequenceMessage();
break;
}
+ case REPLICATE_QUEUE_DELIVERY:
+ {
+ packet = new PacketImpl(REPLICATE_QUEUE_DELIVERY);
+ break;
+ }
+ case REGISTER_QUEUE_REPLICATION_CHANNEL:
+ {
+ packet = new RegisterQueueReplicationChannelMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
import org.jboss.messaging.core.remoting.spi.Connector;
@@ -368,8 +369,6 @@
{
final Packet packet = decoder.decode(buffer);
- log.info("packet received " + packet.getType());
-
synchronized (transferLock)
{
if (!frozen)
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -122,25 +122,19 @@
public void write(final MessagingBuffer buffer, final boolean flush)
{
try
- {
- log.info("writing on invm connection");
+ {
executor.execute(new Runnable()
{
public void run()
{
try
- {
- log.info("runnable running");
+ {
if (!closed)
{
buffer.readInt(); // read and discard
handler.bufferReceived(id, buffer);
}
- else
- {
- log.info("it's closed");
- }
}
catch (Exception e)
{
@@ -154,7 +148,6 @@
catch (RejectedExecutionException e)
{
// Ignore - this can happen if server/client is shutdown and another request comes in
- log.error("shutdown", e);
}
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -160,6 +160,12 @@
public static final byte REPLICATE_LOCK_SEQUENCES = 98;
+ public static final byte REPLICATE_QUEUE_DELIVERY = 99;
+
+ public static final byte REGISTER_QUEUE_REPLICATION_CHANNEL = 100;
+
+ public static final byte REGISTER_POST_OFFICE_REPLICATION_CHANNEL = 101;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
@@ -187,11 +193,6 @@
public int encode(final MessagingBuffer buffer)
{
-// if (this.type == PacketImpl.EXCEPTION)
-// {
-// log.info("encoding exception", new Exception());
-// }
-
// The standard header fields
buffer.writeInt(0); // The length gets filled in at the end
buffer.writeByte(type);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -18,7 +18,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.DataConstants;
-import org.jboss.messaging.utils.Pair;
/**
*
@@ -34,17 +33,21 @@
// Attributes ----------------------------------------------------
- private List<Pair<Long, Integer>> sequences;
+ private List<Long> sequences;
+ private boolean requiresResponse;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ReplicateLockSequenceMessage(final List<Pair<Long, Integer>> sequences)
+ public ReplicateLockSequenceMessage(final List<Long> sequences, final boolean requiresResponse)
{
super(REPLICATE_LOCK_SEQUENCES);
this.sequences = sequences;
+
+ this.requiresResponse = requiresResponse;
}
// Public --------------------------------------------------------
@@ -56,35 +59,44 @@
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + sequences.size() * (DataConstants.SIZE_LONG + DataConstants.SIZE_INT);
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+ sequences.size() *
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BOOLEAN;
}
@Override
public void encodeBody(final MessagingBuffer buffer)
{
buffer.writeInt(sequences.size());
- for (Pair<Long, Integer> sequence : sequences)
+ for (long sequence : sequences)
{
- buffer.writeLong(sequence.a);
- buffer.writeInt(sequence.b);
+ buffer.writeLong(sequence);
}
+ buffer.writeBoolean(requiresResponse);
}
@Override
public void decodeBody(final MessagingBuffer buffer)
{
int len = buffer.readInt();
- sequences = new ArrayList<Pair<Long, Integer>>(len);
+ sequences = new ArrayList<Long>(len);
for (int i = 0; i < len; i++)
{
- sequences.add(new Pair<Long, Integer>(buffer.readLong(), buffer.readInt()));
+ sequences.add(buffer.readLong());
}
+ requiresResponse = buffer.readBoolean();
}
- public List<Pair<Long, Integer>> getSequences()
+ public List<Long> getSequences()
{
return sequences;
}
+
+ public boolean isRequiresResponse()
+ {
+ return requiresResponse;
+ }
// Package protected ---------------------------------------------
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicationResponseMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicationResponseMessage.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicationResponseMessage.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -22,8 +22,6 @@
package org.jboss.messaging.core.remoting.impl.wireformat.replication;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
-
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
/**
@@ -56,6 +54,11 @@
return true;
}
+ public boolean isRequiresConfirmations()
+ {
+ return false;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -64,4 +67,3 @@
// Inner classes -------------------------------------------------
}
-
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -44,6 +44,7 @@
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
import org.jboss.messaging.core.remoting.server.RemotingService;
import org.jboss.messaging.core.remoting.spi.Acceptor;
import org.jboss.messaging.core.remoting.spi.AcceptorFactory;
@@ -286,9 +287,6 @@
RemotingConnection rc = new RemotingConnectionImpl(connection, replicatingConnection, interceptors, !config.isBackup());
- log.info("** creating connection " + System.identityHashCode(rc) + " replicating connection is " + replicatingConnection +
- " backup is " + config.isBackup());
-
Channel channel1 = rc.getChannel(1, -1, false);
final Replicator replicator;
@@ -305,7 +303,7 @@
{
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
{
- log.info("*** got replication response from create session");
+ ReplicationResponseMessage msg = (ReplicationResponseMessage)packet;
replicator.replicationResponseReceived();
}
else
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/Queue.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/Queue.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -76,6 +76,10 @@
void cancel(MessageReference reference) throws Exception;
void deliverAsync(Executor executor);
+
+ void deliverAll();
+
+ HandleStatus deliverOne();
List<MessageReference> list(Filter filter);
@@ -139,7 +143,7 @@
void addRedistributor(long delay, Executor executor);
// Only used in testing
- void deliverNow();
+ // void deliverNow();
boolean checkDLQ(MessageReference ref) throws Exception;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/QueueFactory.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/QueueFactory.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/QueueFactory.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.utils.SimpleString;
/**
@@ -38,7 +39,8 @@
*/
public interface QueueFactory
{
- Queue createQueue(long persistenceID, final SimpleString address, SimpleString name, Filter filter, boolean durable, boolean temporary);
+ Queue createQueue(long persistenceID, final SimpleString address, SimpleString name, Filter filter, boolean durable,
+ boolean temporary, Replicator replicator);
/**
* This is required for delete-all-reference to work correctly with paging
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -106,7 +106,7 @@
private volatile boolean started;
- private int replicationCount;
+ // private int replicationCount;
/*
* Constructor using static list of connectors
@@ -452,51 +452,51 @@
clearBindings();
- waitForReplicationsToComplete(3000);
+ // waitForReplicationsToComplete(3000);
}
- private synchronized void waitForReplicationsToComplete(long timeout)
- {
- long toWait = timeout;
+// private synchronized void waitForReplicationsToComplete(long timeout)
+// {
+// long toWait = timeout;
+//
+// long start = System.currentTimeMillis();
+//
+// while (replicationCount > 0 && toWait > 0)
+// {
+// try
+// {
+// wait(toWait);
+// }
+// catch (InterruptedException e)
+// {
+// }
+//
+// long now = System.currentTimeMillis();
+//
+// toWait -= now - start;
+//
+// start = now;
+// }
+//
+// if (toWait <= 0)
+// {
+// log.warn("Timed out waiting for replication responses to return");
+// }
+//
+// }
+//
+// private synchronized void replicationComplete()
+// {
+// replicationCount--;
+//
+// notify();
+// }
+//
+// private synchronized void beforeReplicate()
+// {
+// replicationCount++;
+// }
- long start = System.currentTimeMillis();
-
- while (replicationCount > 0 && toWait > 0)
- {
- try
- {
- wait(toWait);
- }
- catch (InterruptedException e)
- {
- }
-
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
- }
-
- if (toWait <= 0)
- {
- log.warn("Timed out waiting for replication responses to return");
- }
-
- }
-
- private synchronized void replicationComplete()
- {
- replicationCount--;
-
- notify();
- }
-
- private synchronized void beforeReplicate()
- {
- replicationCount++;
- }
-
public void activate(final Queue queue) throws Exception
{
this.queue = queue;
@@ -631,7 +631,7 @@
throw new IllegalStateException("queueID is null");
}
- RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
+ RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(), queueAddress,
clusterName,
routingName,
queueID,
@@ -782,7 +782,7 @@
Queue queue = (Queue)queueBinding.getBindable();
- RemoteQueueBinding binding = new RemoteQueueBindingImpl(address,
+ RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(), address,
uniqueName,
routingName,
queueID,
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -59,7 +59,7 @@
private final SimpleString uniqueName;
private final SimpleString routingName;
-
+
private final int remoteQueueID;
private final Filter queueFilter;
@@ -71,20 +71,23 @@
private int consumerCount;
private final SimpleString idsHeaderName;
-
- private int id;
-
+
+ private final long id;
+
private final int distance;
-
- public RemoteQueueBindingImpl(final SimpleString address,
+
+ public RemoteQueueBindingImpl(final long id,
+ final SimpleString address,
final SimpleString uniqueName,
final SimpleString routingName,
final int remoteQueueID,
final SimpleString filterString,
- final Queue storeAndForwardQueue,
+ final Queue storeAndForwardQueue,
final SimpleString bridgeName,
final int distance) throws Exception
{
+ this.id = id;
+
this.address = address;
this.storeAndForwardQueue = storeAndForwardQueue;
@@ -92,7 +95,7 @@
this.uniqueName = uniqueName;
this.routingName = routingName;
-
+
this.remoteQueueID = remoteQueueID;
if (filterString != null)
@@ -103,22 +106,17 @@
{
queueFilter = null;
}
-
+
this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(bridgeName);
-
+
this.distance = distance;
}
-
- public int getID()
+
+ public long getID()
{
return id;
}
-
- public void setID(final int id)
- {
- this.id = id;
- }
-
+
public SimpleString getAddress()
{
return address;
@@ -128,7 +126,7 @@
{
return storeAndForwardQueue;
}
-
+
public Queue getQueue()
{
return storeAndForwardQueue;
@@ -143,7 +141,7 @@
{
return uniqueName;
}
-
+
public SimpleString getClusterName()
{
return uniqueName;
@@ -153,24 +151,24 @@
{
return false;
}
-
+
public BindingType getType()
{
return BindingType.REMOTE_QUEUE;
}
-
+
public Filter getFilter()
{
return queueFilter;
}
-
+
public int getDistance()
{
return distance;
}
public synchronized boolean isHighAcceptPriority(final ServerMessage message)
- {
+ {
if (consumerCount == 0)
{
return false;
@@ -193,15 +191,15 @@
return false;
}
-
+
public void willRoute(final ServerMessage message)
- {
- //We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
-
- //TODO - this can be optimised
-
+ {
+ // We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
+
+ // TODO - this can be optimised
+
byte[] ids = (byte[])message.getProperty(idsHeaderName);
-
+
if (ids == null)
{
ids = new byte[4];
@@ -209,17 +207,17 @@
else
{
byte[] newIds = new byte[ids.length + 4];
-
+
System.arraycopy(ids, 0, newIds, 4, ids.length);
-
+
ids = newIds;
}
-
+
ByteBuffer buff = ByteBuffer.wrap(ids);
-
+
buff.putInt(remoteQueueID);
-
- message.putBytesProperty(idsHeaderName, ids);
+
+ message.putBytesProperty(idsHeaderName, ids);
}
public synchronized void addConsumer(final SimpleString filterString) throws Exception
@@ -271,7 +269,7 @@
consumerCount--;
}
-
+
public synchronized int consumerCount()
{
return consumerCount;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -38,6 +38,7 @@
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.Transaction;
@@ -69,7 +70,8 @@
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final Replicator replicator)
{
super(id,
address,
@@ -80,7 +82,8 @@
scheduledExecutor,
postOffice,
storageManager,
- addressSettingsRepository);
+ addressSettingsRepository,
+ replicator);
this.pagingManager = postOffice.getPagingManager();
this.storageManager = storageManager;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -32,6 +32,8 @@
import javax.management.MBeanServer;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ConnectionManager;
+import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.DivertConfiguration;
@@ -74,7 +76,9 @@
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
import org.jboss.messaging.core.remoting.server.RemotingService;
import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
import org.jboss.messaging.core.remoting.spi.Connection;
@@ -464,6 +468,8 @@
// Need to activate the connection even if session can't be found - since otherwise response
// will never get back
+
+ log.info("** reattaching session");
checkActivate(connection);
@@ -476,6 +482,8 @@
// Reconnect the channel to the new connection
int serverLastReceivedCommandID = session.transferConnection(connection, lastReceivedCommandID);
+ log.info("Reattached session ok");
+
return new ReattachSessionResponseMessage(serverLastReceivedCommandID, false);
}
}
@@ -713,6 +721,8 @@
private Map<String, Object> backupConnectorParams;
+ private ConnectionManager replicatingConnectionManager;
+
private void setupBackupConnectorFactory()
{
String backupConnectorName = configuration.getBackupConnectorName();
@@ -742,11 +752,37 @@
}
backupConnectorParams = backupConnector.getParams();
+
+ replicatingConnectionManager = new ConnectionManagerImpl(null,
+ backupConnector,
+ null,
+ false,
+ 10, // TODO don't hardcode this
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+ 0,
+ 0d,
+ 0,
+ this.threadPool,
+ this.scheduledPool);
}
}
}
private boolean activatedBackup;
+
+ public synchronized RemotingConnection getPooledReplicatingConnection()
+ {
+ RemotingConnection conn = null;
+
+ if (replicatingConnectionManager != null)
+ {
+ conn = replicatingConnectionManager.getConnection(1);
+ }
+
+ return conn;
+ }
public synchronized RemotingConnection getReplicatingConnection()
{
@@ -779,37 +815,8 @@
ChannelHandler prevHandler = channel1.getHandler();
- log.info("Prev handler is " + prevHandler);
+ sendOnReplicatingAndWaitForResponse(packet, channel1);
- final Future future = new Future();
-
- channel1.setHandler(new ChannelHandler()
- {
- public void handlePacket(final Packet packet)
- {
- if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
- {
- log.info("&&&&&&&&&&&&&&&& got replication response");
- future.run();
- }
- else
- {
- throw new IllegalArgumentException("Invalid packet " + packet.getType());
- }
- }
- });
-
- channel1.send(packet);
-
- boolean ok = future.await(10000);
-
- if (!ok)
- {
- throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
- }
-
- log.info("got response");
-
channel1.setHandler(prevHandler);
activatedBackup = true;
@@ -902,6 +909,8 @@
}
configuration.setBackup(false);
+
+ log.info("set backup to false");
if (clusterManager != null)
{
@@ -1250,7 +1259,72 @@
//
// return true;
// }
+
+ private void sendOnReplicatingAndWaitForResponse(final Packet packet, final Channel channel)
+ {
+ final Future future = new Future();
+
+ channel.setHandler(new ChannelHandler()
+ {
+ public void handlePacket(final Packet packet)
+ {
+ future.run();
+ }
+ });
+
+ channel.send(packet);
+
+ boolean ok = future.await(10000);
+ if (!ok)
+ {
+ throw new IllegalStateException("Timed out waiting for response from backup");
+ }
+ }
+
+ private Replicator getReplicatorForQueue(final long queueID)
+ {
+ RemotingConnection replicatingConnection = this.getPooledReplicatingConnection();
+
+ final Replicator replicator;
+
+ if (replicatingConnection != null)
+ {
+ Channel channel1 = replicatingConnection.getChannel(1, -1, false);
+
+ JBMThread thread = JBMThread.currentThread();
+
+ thread.setNoReplayOrRecord();
+
+ //sendOnReplicatingAndWaitForResponse(new RegisterQueueReplicationChannelMessage(queueID), channel1);
+
+ //Actually no need to wait for response
+
+ channel1.send(new RegisterQueueReplicationChannelMessage(queueID));
+
+ thread.resumeRecording();
+
+ Channel replChannel = replicatingConnection.getChannel(queueID, -1, false);
+
+
+ replicator = new ReplicatorImpl(replChannel);
+
+ replChannel.setHandler(new ChannelHandler()
+ {
+ public void handlePacket(final Packet packet)
+ {
+ replicator.replicationResponseReceived();
+ }
+ });
+ }
+ else
+ {
+ replicator = null;
+ }
+
+ return replicator;
+ }
+
private void loadJournal() throws Exception
{
List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
@@ -1270,13 +1344,16 @@
{
filter = new FilterImpl(queueBindingInfo.getFilterString());
}
+
+ Replicator replicator = getReplicatorForQueue(queueBindingInfo.getPersistenceID());
Queue queue = queueFactory.createQueue(queueBindingInfo.getPersistenceID(),
queueBindingInfo.getAddress(),
queueBindingInfo.getQueueName(),
filter,
true,
- false);
+ false,
+ replicator);
Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeID);
@@ -1344,8 +1421,7 @@
final boolean durable,
final boolean temporary,
final boolean ignoreIfExists) throws Exception
- {
- log.info("** creating queue " + queueName);
+ {
Binding binding = postOffice.getBinding(queueName);
if (binding != null)
@@ -1366,8 +1442,18 @@
{
filter = new FilterImpl(filterString);
}
+
+ long queueID;
+
+ do
+ {
+ queueID = storageManager.generateUniqueID();
+ }
+ while (queueID == 0 || queueID == 1); //0 and 1 are reserved channels
+
+ Replicator replicator = getReplicatorForQueue(queueID);
- final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(), address, queueName, filter, durable, temporary);
+ Queue queue = queueFactory.createQueue(queueID, address, queueName, filter, durable, temporary, replicator);
binding = new LocalQueueBinding(address, queue, nodeID);
@@ -1436,7 +1522,7 @@
pagingManager,
storageManager);
- Binding binding = new DivertBinding(sAddress, divert);
+ Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
postOffice.addBinding(binding);
@@ -1483,6 +1569,8 @@
{
// This session may well be on a different connection and different channel id, so we must get rid
// of it and create another
+
+ //TODO - is this true any more??
currentSession.getChannel().close();
}
@@ -1490,10 +1578,6 @@
RemotingConnection replicatingConnection = connection.getReplicatingConnection();
- log.info("getting repl conenction from " + System.identityHashCode(connection) + " it is " + replicatingConnection);
-
- log.info("Creating session, replicating connection is " + replicatingConnection);
-
final Replicator replicator;
if (replicatingConnection != null)
@@ -1507,7 +1591,7 @@
public void handlePacket(final Packet packet)
{
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
- {
+ {
replicator.replicationResponseReceived();
}
else
@@ -1547,8 +1631,6 @@
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, replicator, configuration);
- log.info(System.identityHashCode(handler)+ " ** creating serversessionpackethandler on backup "+ configuration.isBackup() + " replicator is " + replicator);
-
session.setHandler(handler);
channel.setHandler(handler);
@@ -1619,7 +1701,5 @@
t.setPriority(threadPriority);
return t;
}
-
}
-
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -12,11 +12,10 @@
package org.jboss.messaging.core.server.impl;
-import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CLOSED;
-import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_POST_OFFICE_REPLICATION_CHANNEL;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_QUEUE_REPLICATION_CHANNEL;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_STARTUP_INFO;
@@ -24,35 +23,22 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.management.Notification;
-import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
-import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.cluster.ClusterConnection;
-import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.server.replication.impl.JBMThread;
-import org.jboss.messaging.utils.Pair;
/**
* A packet handler for all packets that need to be handled at the server level
@@ -61,7 +47,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
*/
-public class MessagingServerPacketHandler implements ChannelHandler, ReplicableAction
+public class MessagingServerPacketHandler implements ChannelHandler
{
private static final Logger log = Logger.getLogger(MessagingServerPacketHandler.class);
@@ -70,13 +56,13 @@
private final Channel channel1;
private final RemotingConnection connection;
-
+
private Replicator replicator;
+
+ private volatile List<Long> sequences;
- private Packet packet;
-
- private List<Pair<Long, Integer>> sequences;
-
+ private volatile boolean requiresReplicationResponse;
+
public MessagingServerPacketHandler(final MessagingServer server,
final Channel channel1,
final RemotingConnection connection,
@@ -87,86 +73,37 @@
this.channel1 = channel1;
this.connection = connection;
-
+
this.replicator = replicator;
}
-
- public void run()
- {
- handlePacket();
- }
-
- public Packet getPacket()
- {
- return packet;
- }
-
+
+
public void handlePacket(final Packet packet)
{
- this.packet = packet;
-
- log.info("Handling packet " + packet.getType() + " on backup " + server.getConfiguration().isBackup());
-
- if (server.getConfiguration().isBackup())
- {
- log.info("getting current thread");
-
- JBMThread thread = JBMThread.currentThread();
-
- thread.setReplay(sequences);
-
- //thread.setReplay(true);
-
- log.info("about to call handle packet");
-
- handlePacket();
-
- //send the response message
-
- log.info("sending back replication response");
-
- if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
- {
- channel1.send(new ReplicationResponseMessage());
- }
- }
- else
- {
- log.info("not backup");
- if (replicator != null)
- {
- replicator.execute(this);
- }
- else
- {
- handlePacket();
- }
- }
- }
-
- private void handlePacket()
- {
byte type = packet.getType();
-
+
if (!server.isInitialised() && type != PacketImpl.REPLICATE_STARTUP_INFO)
{
- throw new IllegalStateException("First packet must be startup info for backup " + type);
+ throw new IllegalStateException("First packet must be startup info for backup " + type);
}
- // All these operations need to be idempotent since they are outside of the session
- // reliability replay functionality
switch (type)
{
case REPLICATE_LOCK_SEQUENCES:
{
ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
- sequences = msg.getSequences();
- log.info("Got sequences " + sequences.size());
+
+ sequences = msg.getSequences();
+
+ requiresReplicationResponse = msg.isRequiresResponse();
+
return;
}
case REPLICATE_STARTUP_INFO:
- {
+ {
ReplicateStartupInfoMessage msg = (ReplicateStartupInfoMessage)packet;
+
+ // log.info("** got replicate startup info");
try
{
@@ -176,99 +113,94 @@
{
log.error("Failed to initialise", e);
}
-
- break;
- }
- case CREATESESSION:
- {
- CreateSessionMessage request = (CreateSessionMessage)packet;
- log.info("sequences is " + sequences);
- handleCreateSession(request, sequences == null);
-
break;
}
- case REATTACH_SESSION:
- {
- ReattachSessionMessage request = (ReattachSessionMessage)packet;
-
- handleReattachSession(request);
-
- break;
- }
- case CREATE_QUEUE:
- {
- // Create queue can also be fielded here in the case of a replicated store and forward queue creation
-
- CreateQueueMessage request = (CreateQueueMessage)packet;
+ case REGISTER_QUEUE_REPLICATION_CHANNEL:
+ {
+ RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
+
+ Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
- handleCreateQueue(request);
-
+ channel.setHandler(new QueueReplicationPacketHandler(msg.getBindingID(), server.getPostOffice(), channel));
+
break;
}
- case PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING:
+ case REGISTER_POST_OFFICE_REPLICATION_CHANNEL:
{
- ReplicateRemoteBindingAddedMessage request = (ReplicateRemoteBindingAddedMessage)packet;
-
- handleAddRemoteQueueBinding(request);
-
break;
}
- case PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING:
+ case CREATESESSION:
{
- ReplicateRemoteBindingRemovedMessage request = (ReplicateRemoteBindingRemovedMessage)packet;
+ final CreateSessionMessage request = (CreateSessionMessage)packet;
- handleRemoveRemoteQueueBinding(request);
+ ReplicableAction action = new ReplicableAction()
+ {
+ public void run()
+ {
+ handleCreateSession(request, sequences == null);
+ }
+
+ public Packet getPacket()
+ {
+ return packet;
+ }
+ };
+
+ if (server.getConfiguration().isBackup())
+ {
+ JBMThread thread = JBMThread.currentThread();
- break;
- }
- case PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER:
- {
- ReplicateRemoteConsumerAddedMessage request = (ReplicateRemoteConsumerAddedMessage)packet;
+ thread.setReplay(sequences);
- handleAddRemoteConsumer(request);
+ action.run();
+
+ thread.setNoReplayOrRecord();
+ }
+ else
+ {
+ if (replicator != null)
+ {
+ replicator.execute(action);
+ }
+ else
+ {
+ action.run();
+ }
+ }
break;
}
- case PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER:
+ case REATTACH_SESSION:
{
- ReplicateRemoteConsumerRemovedMessage request = (ReplicateRemoteConsumerRemovedMessage)packet;
+ ReattachSessionMessage request = (ReattachSessionMessage)packet;
- handleRemoveRemoteConsumer(request);
+ handleReattachSession(request);
break;
}
- case PacketImpl.REPLICATE_ACKNOWLEDGE:
- {
- ReplicateAcknowledgeMessage request = (ReplicateAcknowledgeMessage)packet;
-
- handleReplicateAcknowledge(request);
-
- break;
- }
- case PacketImpl.REPLICATE_REDISTRIBUTION:
- {
- ReplicateRedistributionMessage message = (ReplicateRedistributionMessage)packet;
-
- handleReplicateRedistribution(message);
-
- break;
- }
default:
{
log.error("Invalid packet " + packet);
}
}
sequences = null;
- }
+
+ // send the response message
+ if (server.getConfiguration().isBackup() && requiresReplicationResponse || type == REPLICATE_STARTUP_INFO)
+ {
+ channel1.send(new ReplicationResponseMessage());
+ }
+ }
+
private void handleCreateSession(final CreateSessionMessage request, final boolean activate)
{
Packet response;
try
- {
+ {
response = server.createSession(request.getName(),
- request.getSessionChannelID(),
+ request.getSessionChannelID(),
request.getUsername(),
request.getPassword(),
request.getMinLargeMessageSize(),
@@ -294,10 +226,10 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
}
}
-
- channel1.send(response);
+
+ channel1.send(response);
}
-
+
private void handleReattachSession(final ReattachSessionMessage request)
{
Packet response;
@@ -323,163 +255,4 @@
channel1.send(response);
}
- private void handleCreateQueue(final CreateQueueMessage request)
- {
- try
- {
- server.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isDurable(), request.isTemporary());
- }
- catch (Exception e)
- {
- log.error("Failed to handle create queue", e);
- }
- }
-
- private void handleAddRemoteQueueBinding(final ReplicateRemoteBindingAddedMessage request)
- {
- ClusterConnection cc = server.getClusterManager().getClusterConnection(request.getClusterConnectionName());
-
- if (cc == null)
- {
- throw new IllegalStateException("No cluster connection found with name " + request.getClusterConnectionName());
- }
-
- try
- {
- cc.handleReplicatedAddBinding(request.getAddress(),
- request.getUniqueName(),
- request.getRoutingName(),
- request.getRemoteQueueID(),
- request.getFilterString(),
- request.getSfQueueName(),
- request.getDistance());
- }
- catch (Exception e)
- {
- log.error("Failed to handle add remote queue binding", e);
- }
- }
-
- private void handleRemoveRemoteQueueBinding(final ReplicateRemoteBindingRemovedMessage request)
- {
- try
- {
- Binding binding = server.getPostOffice().removeBinding(request.getUniqueName());
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueName());
- }
- }
- catch (Exception e)
- {
- log.error("Failed to handle remove remote queue binding", e);
- }
- }
-
- private void handleAddRemoteConsumer(final ReplicateRemoteConsumerAddedMessage request)
- {
- RemoteQueueBinding binding = (RemoteQueueBinding)server.getPostOffice()
- .getBinding(request.getUniqueBindingName());
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueBindingName());
- }
-
- try
- {
- binding.addConsumer(request.getFilterString());
- }
- catch (Exception e)
- {
- log.error("Failed to handle add remote consumer", e);
- }
-
- // Need to propagate the consumer add
- Notification notification = new Notification(null, CONSUMER_CREATED, request.getProperties());
-
- try
- {
- server.getManagementService().sendNotification(notification);
- }
- catch (Exception e)
- {
- log.error("Failed to handle add remote consumer", e);
- }
- }
-
- private void handleRemoveRemoteConsumer(final ReplicateRemoteConsumerRemovedMessage request)
- {
- RemoteQueueBinding binding = (RemoteQueueBinding)server.getPostOffice()
- .getBinding(request.getUniqueBindingName());
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueBindingName());
- }
-
- try
- {
- binding.removeConsumer(request.getFilterString());
- }
- catch (Exception e)
- {
- log.error("Failed to handle remove remote consumer", e);
- }
-
- // Need to propagate the consumer close
- Notification notification = new Notification(null, CONSUMER_CLOSED, request.getProperties());
-
- try
- {
- server.getManagementService().sendNotification(notification);
- }
- catch (Exception e)
- {
- log.error("Failed to handle remove remote consumer", e);
- }
- }
-
- private void handleReplicateAcknowledge(final ReplicateAcknowledgeMessage request)
- {
- Binding binding = server.getPostOffice().getBinding(request.getUniqueName());
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find binding " + request.getUniqueName());
- }
-
- try
- {
- Queue queue = (Queue)binding.getBindable();
-
- MessageReference ref = queue.removeFirstReference(request.getMessageID());
-
- queue.acknowledge(ref);
- }
- catch (Exception e)
- {
- log.error("Failed to handle remove remote consumer", e);
- }
- }
-
- private void handleReplicateRedistribution(final ReplicateRedistributionMessage request)
- {
- Binding binding = server.getPostOffice().getBinding(request.getQueueName());
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find binding " + request.getQueueName());
- }
-
- try
- {
- server.handleReplicateRedistribution(request.getQueueName(), request.getMessageID());
- }
- catch (Exception e)
- {
- log.error("Failed to handle remove remote consumer", e);
- }
- }
}
\ No newline at end of file
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -29,6 +29,7 @@
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.utils.SimpleString;
@@ -73,7 +74,8 @@
final SimpleString name,
final Filter filter,
final boolean durable,
- final boolean temporary)
+ final boolean temporary,
+ final Replicator replicator)
{
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
@@ -89,7 +91,8 @@
scheduledExecutor,
postOffice,
storageManager,
- addressSettingsRepository);
+ addressSettingsRepository,
+ replicator);
}
else
{
@@ -102,7 +105,8 @@
scheduledExecutor,
postOffice,
storageManager,
- addressSettingsRepository);
+ addressSettingsRepository,
+ replicator);
}
queue.setDistributionPolicy(addressSettings.getDistributionPolicy());
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -15,6 +15,7 @@
import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIGINAL_DESTINATION;
import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIG_MESSAGE_ID;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_QUEUE_DELIVERY;
import java.util.ArrayList;
import java.util.Collection;
@@ -43,6 +44,8 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.Distributor;
import org.jboss.messaging.core.server.HandleStatus;
@@ -51,7 +54,9 @@
import org.jboss.messaging.core.server.ScheduledDeliveryHandler;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.impl.Redistributor;
-import org.jboss.messaging.core.server.replication.impl.StatefulObjectReadWriteLock;
+import org.jboss.messaging.core.server.replication.ReplicableCall;
+import org.jboss.messaging.core.server.replication.Replicator;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.Transaction;
@@ -78,6 +83,8 @@
public static final int NUM_PRIORITIES = 10;
+ private final Replicator replicator;
+
private final long id;
private final SimpleString name;
@@ -147,7 +154,8 @@
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final Replicator replicator)
{
this.id = id;
@@ -181,8 +189,10 @@
direct = true;
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
-
- lock = new StatefulObjectReadWriteLock(name.toString(), id, 0).writeLock();
+
+ lock = new ReplicationAwareReadWriteLock(name.toString(), 0).writeLock();
+
+ this.replicator = replicator;
}
// Bindable implementation -------------------------------------------------------------------------------------
@@ -274,9 +284,7 @@
tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
- storageManager.storeReferenceTransactional(tx.getID(),
- ref.getQueue().getID(),
- message.getMessageID());
+ storageManager.storeReferenceTransactional(tx.getID(), ref.getQueue().getID(), message.getMessageID());
}
if (scheduledDeliveryTime != null && durableRef)
@@ -372,26 +380,33 @@
{
// Prevent too many executors running at once
- if (waitingToDeliver.compareAndSet(false, true))
+ if (backup)
{
- executor.execute(deliverRunner);
+ // We don't deliver async directly on the backup
+ return;
}
- }
- // Only used in testing - do not call directly!
- public void deliverNow()
- {
- lock.lock();
- try
+ if (waitingToDeliver.compareAndSet(false, true))
{
- deliver();
+ // log.info("delivering async on backup " + backup, new Exception());
+ executor.execute(deliverRunner);
}
- finally
- {
- lock.unlock();
- }
}
+ // // Only used in testing - do not call directly!
+ // public void deliverNow()
+ // {
+ // lock.lock();
+ // try
+ // {
+ // deliver();
+ // }
+ // finally
+ // {
+ // lock.unlock();
+ // }
+ // }
+
public void addConsumer(final Consumer consumer) throws Exception
{
lock.lock();
@@ -1126,81 +1141,57 @@
{
backup = true;
- direct = false;
+ // direct = false;
}
- public boolean activate()
+ public synchronized boolean activate()
{
- lock.lock();
- try
+ consumersToFailover = consumers.size();
+
+ if (consumersToFailover == 0)
{
- consumersToFailover = consumers.size();
+ backup = false;
- if (consumersToFailover == 0)
- {
- backup = false;
-
- return true;
- }
- else
- {
- return false;
- }
+ return true;
}
- finally
+ else
{
- lock.unlock();
+ return false;
}
}
- public void activateNow(final Executor executor)
+ public synchronized void activateNow(final Executor executor)
{
- lock.lock();
- try
+ if (backup)
{
- if (backup)
- {
- log.info("Timed out waiting for all consumers to reconnect to queue " + name +
- " so queue will be activated now");
+ log.info("Timed out waiting for all consumers to reconnect to queue " + name +
+ " so queue will be activated now");
- backup = false;
+ backup = false;
- scheduledDeliveryHandler.reSchedule();
+ scheduledDeliveryHandler.reSchedule();
- deliverAsync(executor);
- }
+ deliverAsync(executor);
}
- finally
- {
- lock.unlock();
- }
}
- public boolean consumerFailedOver()
+ public synchronized boolean consumerFailedOver()
{
- lock.lock();
- try
+ consumersToFailover--;
+
+ if (consumersToFailover == 0)
{
- consumersToFailover--;
+ // All consumers for the queue have failed over, can re-activate it now
- if (consumersToFailover == 0)
- {
- // All consumers for the queue have failed over, can re-activate it now
+ backup = false;
- backup = false;
+ scheduledDeliveryHandler.reSchedule();
- scheduledDeliveryHandler.reSchedule();
-
- return true;
- }
- else
- {
- return false;
- }
+ return true;
}
- finally
+ else
{
- lock.unlock();
+ return false;
}
}
@@ -1417,33 +1408,21 @@
tx.commit();
}
- /*
- * Attempt to deliver all the messages in the queue
- */
- private void deliver()
+ public HandleStatus deliverOne()
{
lock.lock();
+ direct = false;
+
try
{
- // We don't do actual delivery if the queue is on a backup node - this is
- // because it's async and could get out of step
- // with the live node. Instead, when we replicate the delivery we remove
- // the ref from the queue
+ Iterator<MessageReference> iterator = null;
- if (backup)
+ HandleStatus status;
+ do
{
- return;
- }
+ MessageReference reference;
- direct = false;
-
- MessageReference reference;
-
- Iterator<MessageReference> iterator = null;
-
- while (true)
- {
if (iterator == null)
{
reference = messageReferences.peekFirst();
@@ -1484,49 +1463,45 @@
promptDelivery = false;
}
- return;
+
+ status = HandleStatus.BUSY;
}
-
- // PagingManager would be null only on testcases
- if (pagingStore == null && pagingManager != null)
+ else
{
- // TODO: It would be better if we could initialize the pagingStore during the construction
- try
+ // PagingManager would be null only on testcases
+ if (pagingStore == null && pagingManager != null)
{
- pagingStore = pagingManager.getPageStore(reference.getMessage().getDestination());
+ // TODO: It would be better if we could initialize the pagingStore during the construction
+ try
+ {
+ pagingStore = pagingManager.getPageStore(reference.getMessage().getDestination());
+ }
+ catch (Exception e)
+ {
+ // This shouldn't happen, and if it happens, this shouldn't abort the route
+
+ log.error("Failed to get page store", e);
+ }
}
- catch (Exception e)
- {
- // This shouldn't happen, and if it happens, this shouldn't abort the route
- }
- }
- HandleStatus status = deliver(reference);
+ status = deliverReference(reference);
- if (status == HandleStatus.HANDLED)
- {
- if (iterator == null)
+ if (status == HandleStatus.HANDLED)
{
- messageReferences.removeFirst();
+ if (iterator == null)
+ {
+ messageReferences.removeFirst();
+ }
+ else
+ {
+ iterator.remove();
+ }
}
- else
- {
- iterator.remove();
- }
}
- else if (status == HandleStatus.BUSY)
- {
- // All consumers busy - give up
- break;
- }
- else if (status == HandleStatus.NO_MATCH && iterator == null)
- {
- // Consumers not all busy - but filter not accepting - iterate
- // back
- // through the queue
- iterator = messageReferences.iterator();
- }
}
+ while (status == HandleStatus.NO_MATCH);
+
+ return status;
}
finally
{
@@ -1551,11 +1526,12 @@
boolean add = false;
- if (direct && !backup)
+ if (direct)
{
// Deliver directly
- HandleStatus status = deliver(ref);
+ // log.info("delivering direct on backup " + backup);
+ HandleStatus status = deliverReference(ref);
if (status == HandleStatus.HANDLED)
{
@@ -1577,6 +1553,7 @@
}
else
{
+ // log.info("Not delivering direct on backup " + backup);
add = true;
}
@@ -1604,7 +1581,7 @@
// filters with queues - in most cases
// it's an ant-pattern since it would cause a queue scan on each
// message
- deliver();
+ deliverAll();
}
}
}
@@ -1614,7 +1591,7 @@
}
}
- private HandleStatus deliver(final MessageReference reference)
+ private HandleStatus deliverReference(final MessageReference reference)
{
HandleStatus status = distributionPolicy.distribute(reference);
@@ -1693,17 +1670,17 @@
ServerMessage msg = ref.getMessage();
if (!scheduledDeliveryHandler.checkAndSchedule(ref, backup))
- {
+ {
messageReferences.addFirst(ref, msg.getPriority());
}
}
-
- deliver();
}
finally
{
lock.unlock();
}
+
+ deliverAll();
}
// Inner classes
@@ -1716,16 +1693,69 @@
// Must be set to false *before* executing to avoid race
waitingToDeliver.set(false);
- deliver();
+ // log.info("** calling deliver runner " + backup, new Exception());
+
+ deliverAll();
}
}
- final class RefsOperation implements TransactionOperation
+ /*
+ * Attempt to deliver all the messages in the queue
+ */
+ public void deliverAll()
{
- List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+ // direct = false;
- List<MessageReference> refsToAck = new ArrayList<MessageReference>();
+ HandleStatus handled;
+ if (replicator != null)
+ {
+ ReplicableCall<HandleStatus> action = new DeliverAction();
+
+ do
+ {
+ replicator.execute(action);
+
+ handled = action.getResult();
+ }
+ while (handled != HandleStatus.BUSY);
+ }
+ else
+ {
+ do
+ {
+ handled = deliverOne();
+ }
+ while (handled != HandleStatus.BUSY);
+ }
+ }
+
+ private class DeliverAction implements ReplicableCall<HandleStatus>
+ {
+ public Packet getPacket()
+ {
+ return new PacketImpl(REPLICATE_QUEUE_DELIVERY);
+ }
+
+ private HandleStatus status;
+
+ public void run()
+ {
+ status = deliverOne();
+ }
+
+ public HandleStatus getResult()
+ {
+ return status;
+ }
+ }
+
+ protected final class RefsOperation implements TransactionOperation
+ {
+ protected final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+
+ protected final List<MessageReference> refsToAck = new ArrayList<MessageReference>();
+
synchronized void addRef(final MessageReference ref)
{
refsToAdd.add(ref);
@@ -1771,15 +1801,7 @@
QueueImpl queue = entry.getKey();
- queue.lock.lock();
- try
- {
- queue.postRollback(refs);
- }
- finally
- {
- queue.lock.unlock();
- }
+ queue.postRollback(refs);
}
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -51,7 +51,7 @@
import org.jboss.messaging.core.server.ServerConsumer;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
-import org.jboss.messaging.core.server.replication.impl.StatefulObjectReadWriteLock;
+import org.jboss.messaging.core.server.replication.impl.ReplicationAwareReadWriteLock;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.utils.TypedProperties;
@@ -168,7 +168,7 @@
this.updateDeliveries = updateDeliveries;
- lock = new StatefulObjectReadWriteLock("consumer " + id, storageManager.generateUniqueID(), 0).writeLock();
+ lock = new ReplicationAwareReadWriteLock("consumer " + id, 0).writeLock();
binding.getQueue().addConsumer(this);
}
@@ -487,8 +487,11 @@
try
{
+ //log.info("handling message");
+
if ((flowControl && availableCredits <= 0) || !started)
{
+ log.info("busy");
return HandleStatus.BUSY;
}
@@ -578,7 +581,7 @@
{
availableCredits -= packet.getRequiredBufferSize();
}
-
+
channel.send(packet);
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -132,8 +132,6 @@
private RemotingConnection remotingConnection;
- // private Channel replicatingChannel;
-
private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
private final Executor executor;
@@ -451,8 +449,6 @@
{
Binding binding = postOffice.getBinding(name);
- log.info("binding is " + binding);
-
if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
{
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST, "Binding " + name + " does not exist");
@@ -479,7 +475,8 @@
name,
filter,
false,
- true);
+ true,
+ null);
// There's no need for any special locking since the list method is synchronized
List<MessageReference> refs = ((Queue)binding.getBindable()).list(filter);
@@ -496,8 +493,6 @@
theQueue = (Queue)binding.getBindable();
}
- log.info("*********** creating consumer");
-
ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
this,
(QueueBinding)binding,
@@ -560,8 +555,7 @@
}
public void handleCreateQueue(final CreateQueueMessage packet)
- {
- log.info("processing create queue");
+ {
SimpleString address = packet.getAddress();
final SimpleString name = packet.getQueueName();
@@ -634,8 +628,6 @@
channel.confirm(packet);
channel.send(response);
-
- log.info("processed create queue");
}
public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
@@ -1561,6 +1553,8 @@
{
this.setStarted(false);
}
+
+ log.info("Transferring connection");
// backup = false;
@@ -1585,6 +1579,7 @@
int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
+ log.info("replaying commands");
channel.replayCommands(lastReceivedCommandID);
if (wasStarted)
@@ -1592,6 +1587,8 @@
this.setStarted(true);
}
+ log.info("Transferred connection");
+
return serverLastReceivedCommandID;
}
@@ -1715,13 +1712,9 @@
private void doReceiveCredits(final SessionConsumerFlowCreditMessage packet)
{
try
- {
- log.info("packet consumer id is "+ packet.getConsumerID());
-
+ {
ServerConsumer consumer = consumers.get(packet.getConsumerID());
- log.info("consumer is " + consumer);
-
consumer.receiveCredits(packet.getCredits());
}
catch (Exception e)
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -80,7 +80,6 @@
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.server.replication.impl.JBMThread;
-import org.jboss.messaging.utils.Pair;
/**
* A ServerSessionPacketHandler
@@ -94,27 +93,31 @@
private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
private final ServerSession session;
-
+
private final Replicator replicator;
-
+
private Packet packet;
-
- //private boolean backup;
-
+
private Configuration config;
+
+ //TODO the sequences and repl response can be encapsulated in a super class
- private List<Pair<Long, Integer>> sequences;
+ private volatile List<Long> sequences;
+ private volatile boolean requiresReplicationResponse;
+
private final Channel channel;
-
- public ServerSessionPacketHandler(final ServerSession session, final Replicator replicator, final Configuration config)
+
+ public ServerSessionPacketHandler(final ServerSession session,
+ final Replicator replicator,
+ final Configuration config)
{
this.session = session;
-
+
this.replicator = replicator;
-
+
this.channel = session.getChannel();
-
+
this.config = config;
}
@@ -122,60 +125,59 @@
{
return session.getID();
}
-
+
public void run()
{
handlePacket();
}
-
+
public Packet getPacket()
- {
+ {
return packet;
}
-
+
public void handlePacket(final Packet packet)
{
this.packet = packet;
- log.info(System.identityHashCode(this)+ " Handling packet " + packet.getType() + " on backup " + config.isBackup());
-
+ // log.info("Got packet " + packet + " at server session packet handler backup is " + config.isBackup());
+
if (config.isBackup())
- {
+ {
JBMThread thread = JBMThread.currentThread();
-
+
thread.setReplay(sequences);
-
- // thread.setReplay(true);
-
- handlePacket();
-
- //send the response message
-
- if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
+
+ handlePacket();
+
+ // send the response message
+
+ if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES && this.requiresReplicationResponse)
{
+ log.info("sending back replication response");
channel.send(new ReplicationResponseMessage());
- }
+ }
+
+ thread.setNoReplayOrRecord();
}
else
- {
+ {
if (replicator != null)
{
replicator.execute(this);
}
else
{
- log.info("replicator is null");
handlePacket();
}
}
}
-
- private void dumpSequences(List<Pair<Long, Integer>> sequences)
+
+ private void dumpSequences(List<Long> sequences)
{
- log.info("Sequences size is " + sequences.size());
- for (Pair<Long, Integer> pair: sequences)
+ for (long sequence : sequences)
{
- log.info(pair.a + ": " + pair.b);
+ log.info(sequence);
}
}
@@ -190,11 +192,13 @@
case REPLICATE_LOCK_SEQUENCES:
{
ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
+
sequences = msg.getSequences();
- log.info("Session, set sequences");
- dumpSequences(sequences);
-
+ this.requiresReplicationResponse = msg.isRequiresResponse();
+
+ // dumpSequences(sequences);
+
break;
}
case SESS_CREATECONSUMER:
@@ -204,16 +208,15 @@
break;
}
case CREATE_QUEUE:
- {
- log.info("Got create queue message");
- CreateQueueMessage request = (CreateQueueMessage)packet;
- session.handleCreateQueue(request);
+ {
+ CreateQueueMessage request = (CreateQueueMessage)packet;
+ session.handleCreateQueue(request);
break;
}
case DELETE_QUEUE:
{
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
- session.handleDeleteQueue(request);
+ session.handleDeleteQueue(request);
break;
}
case SESS_QUEUEQUERY:
@@ -354,20 +357,20 @@
case SESS_SEND:
{
SessionSendMessage message = (SessionSendMessage)packet;
- session.handleSend(message);
- break;
+ session.handleSend(message);
+ break;
}
case SESS_SEND_LARGE:
{
SessionSendLargeMessage message = (SessionSendLargeMessage)packet;
- session.handleSendLargeMessage(message);
- break;
+ session.handleSendLargeMessage(message);
+ break;
}
case SESS_SEND_CONTINUATION:
{
SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
session.handleSendContinuations(message);
- break;
+ break;
}
}
}
Deleted: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableAction.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableAction.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableAction.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -1,38 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.replication;
-
-import org.jboss.messaging.core.remoting.Packet;
-
-/**
- * A ReplicableAction
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public interface ReplicableAction extends Runnable
-{
- Packet getPacket();
-}
Copied: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableCall.java (from rev 7478, branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableAction.java)
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableCall.java (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/ReplicableCall.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.replication;
+
+
+/**
+ * A ReplicableCall
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public interface ReplicableCall<R> extends ReplicableAction
+{
+ R getResult();
+}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/Replicator.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -39,7 +39,9 @@
void registerWaitingChannel(Channel channel);
- boolean isResponseReceived();
+ // boolean isResponseReceived();
void replicationResponseReceived();
+
+ // long getReplicateSequence();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -24,9 +24,9 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.jboss.messaging.core.server.replication.Replicator;
-import org.jboss.messaging.utils.Pair;
/**
* A JBMThread
@@ -39,12 +39,17 @@
{
private static enum ThreadState
{
- RECORD, REPLAY, END_RECORD;
+ RECORD, REPLAY, NONE;
}
+// public long getSequence()
+// {
+// return sequence;
+// }
+//
private ThreadState state;
- private List<Pair<Long, Integer>> objectSequences;
+ private List<Long> objectSequences;
private int pos;
@@ -75,7 +80,7 @@
return state == ThreadState.RECORD;
}
- public void setReplay(final List<Pair<Long, Integer>> objectSequences)
+ public void setReplay(final List<Long> objectSequences)
{
this.objectSequences = objectSequences;
@@ -84,11 +89,16 @@
this.pos = 0;
}
+// private volatile long sequence;
+//
+// private AtomicLong seqCounter = new AtomicLong(0);
+
public void setRecord(final Replicator replicator)
{
+ //this.sequence = seqCounter.getAndIncrement();
if (this.objectSequences == null)
{
- this.objectSequences = new ArrayList<Pair<Long, Integer>>();
+ this.objectSequences = new ArrayList<Long>();
}
else
{
@@ -102,22 +112,32 @@
this.replicator = replicator;
}
- public void endRecord()
+ public void setNoReplayOrRecord()
{
- this.state = ThreadState.END_RECORD;
+ this.state = ThreadState.NONE;
}
- public Pair<Long, Integer> getNextSequence()
+ public void resumeRecording()
{
+ this.state = ThreadState.RECORD;
+ }
+
+ public void resumeReplay()
+ {
+ this.state = ThreadState.REPLAY;
+ }
+
+ public long getNextSequence()
+ {
return objectSequences.get(pos++);
}
- public void addSequence(final Pair<Long, Integer> currentSequence)
+ public void addSequence(final long currentSequence)
{
objectSequences.add(currentSequence);
}
- public List<Pair<Long, Integer>> getSequences()
+ public List<Long> getSequences()
{
return objectSequences;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/PriorityLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/PriorityLock.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/PriorityLock.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -41,21 +41,19 @@
private final Queue<QueueEntry> waiting;
- private volatile int currentSequence;
+ private volatile long currentSequence;
private Thread owner;
-
- public PriorityLock(final int sequence)
+
+ public PriorityLock(final long sequence)
{
waiting = new PriorityQueue<QueueEntry>();
-
+
this.currentSequence = sequence;
}
- public void lock(final int sequence)
+ public void lock(final long sequence)
{
- //log.info(this + " trying to get lock on backup " + sequence);
-
Thread currentThread = Thread.currentThread();
if (sequence != currentSequence)
@@ -68,15 +66,12 @@
}
while (sequence != currentSequence)
- {
- log.info("parking lock, expected " + sequence + " current " + currentSequence);
+ {
LockSupport.park();
}
}
owner = currentThread;
-
- //log.info(this + " got lock om backup " + sequence, new Exception());
}
public void unlock()
@@ -93,15 +88,13 @@
synchronized (waiting)
{
entry = waiting.peek();
-
+
if (entry != null && entry.thread == owner)
{
waiting.poll();
}
-
+
entry = waiting.peek();
-
- log.info("size " + waiting.size());
}
if (entry != null)
@@ -112,11 +105,11 @@
private static final class QueueEntry implements Comparable<QueueEntry>
{
- private final int sequence;
+ private final long sequence;
private final Thread thread;
- private QueueEntry(final int sequence, final Thread thread)
+ private QueueEntry(final long sequence, final Thread thread)
{
this.sequence = sequence;
@@ -130,9 +123,9 @@
public int compareTo(final QueueEntry entry)
{
- int i = entry.sequence;
+ long l = entry.sequence;
- return sequence < i ? -1 : (sequence == i ? 0 : 1);
+ return sequence < l ? -1 : (sequence == l ? 0 : 1);
}
}
}
Copied: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java (from rev 7478, branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/StatefulObjectReadWriteLock.java)
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -0,0 +1,258 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.replication.impl;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.ConcurrentHashSet;
+
+/**
+ * A ReplicationAwareReadWriteLock
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ReplicationAwareReadWriteLock implements ReadWriteLock
+{
+ private static final Logger log = Logger.getLogger(ReplicationAwareReadWriteLock.class);
+
+ private final Lock readLock = new StatefulObjectReadLock();
+
+ private final Lock writeLock = new StatefulObjectWriteLock();
+
+ // private long objectID;
+
+ private final ReadWriteLock rwLock;
+
+ private final AtomicInteger counter;
+
+ private final PriorityLock sequencedLock;
+
+ private String name;
+
+ public ReplicationAwareReadWriteLock(final String name, final int initialCount)
+ {
+ this.name = name;
+
+ // this.objectID = objectID;
+
+ rwLock = new ReentrantReadWriteLock();
+
+ sequencedLock = new PriorityLock(initialCount);
+
+ counter = new AtomicInteger(initialCount);
+ }
+
+ public Lock readLock()
+ {
+ return readLock;
+ }
+
+ public Lock writeLock()
+ {
+ return writeLock;
+ }
+
+ private Map<JBMThread, StackTraceElement[]> stackTraces = new ConcurrentHashMap<JBMThread, StackTraceElement[]>();
+
+ //debug only
+ private void addOwner(final JBMThread thread)
+ {
+ owners.add(thread);
+
+ stackTraces.put(thread, thread.getStackTrace());
+ }
+
+ //debug only
+ private void removeOwner(final JBMThread thread)
+ {
+ owners.remove(thread);
+
+ stackTraces.remove(thread);
+ }
+
+ // For debug
+ private Set<Thread> owners = new ConcurrentHashSet<Thread>();
+
+ private boolean doLock(long time, TimeUnit unit, boolean read) throws InterruptedException
+ {
+ JBMThread thread = JBMThread.currentThread();
+
+ // debug only
+ if (owners.contains(thread))
+ {
+ Exception e = new Exception();
+ e.setStackTrace(stackTraces.get(thread));
+ log.error("Stateful lock is not re-entrant, first obtained here", e);
+ Exception e2 = new Exception();
+ log.info("Second attempt to obtain here", e2);
+ throw new IllegalStateException("Stateful lock is NOT re-entrant!");
+ }
+
+ if (thread.isReplay())
+ {
+ long sequence = thread.getNextSequence();
+
+ sequencedLock.lock(sequence);
+
+ addOwner(thread);
+
+ return true;
+ }
+ else
+ {
+ boolean ok;
+
+ if (read)
+ {
+ ok = rwLock.readLock().tryLock(time, unit);
+ }
+ else
+ {
+ ok = rwLock.writeLock().tryLock(time, unit);
+ }
+
+ if (ok)
+ {
+ thread.addSequence(counter.getAndIncrement());
+
+ addOwner(thread);
+ }
+
+ return ok;
+ }
+ }
+
+ private void doUnlock(final boolean read)
+ {
+ JBMThread thread = JBMThread.currentThread();
+
+ if (thread.isReplay())
+ {
+ sequencedLock.unlock();
+ }
+ else
+ {
+ if (read)
+ {
+ rwLock.readLock().unlock();
+ }
+ else
+ {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ removeOwner(thread);
+ }
+
+ private class StatefulObjectReadLock implements Lock
+ {
+ public void lock()
+ {
+ // throw new UnsupportedOperationException();
+ try
+ {
+ doLock(10000, TimeUnit.MILLISECONDS, true);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ public void lockInterruptibly() throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Condition newCondition()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean tryLock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
+ {
+ return doLock(time, unit, true);
+ }
+
+ public void unlock()
+ {
+ doUnlock(true);
+ }
+ }
+
+ private class StatefulObjectWriteLock implements Lock
+ {
+ public void lock()
+ {
+ // throw new UnsupportedOperationException();
+ try
+ {
+ doLock(10000, TimeUnit.MILLISECONDS, false);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ public void lockInterruptibly() throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Condition newCondition()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean tryLock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
+ {
+ return doLock(time, unit, false);
+ }
+
+ public void unlock()
+ {
+ doUnlock(false);
+ }
+ }
+}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -22,10 +22,11 @@
package org.jboss.messaging.core.server.replication.impl;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
@@ -33,7 +34,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
-import org.jboss.messaging.utils.Pair;
/**
* A ReplicatorImpl
@@ -46,12 +46,16 @@
{
private static final Logger log = Logger.getLogger(ReplicatorImpl.class);
- private Channel replicatingChannel;
+ private final Channel replicatingChannel;
+
+ private final Queue<Set<Channel>> waitingChannelsQueue = new ConcurrentLinkedQueue<Set<Channel>>();
- private Set<Channel> waitingChannels = new HashSet<Channel>();
+ private Set<Channel> currentChannels;
- private boolean responseReceived;
-
+ // private long responseSequence;
+
+ // private long replicateSequence;
+
public ReplicatorImpl(final Channel replicatingChannel)
{
this.replicatingChannel = replicatingChannel;
@@ -59,69 +63,61 @@
public void registerWaitingChannel(final Channel channel)
{
- this.waitingChannels.add(channel);
+ currentChannels.add(channel);
}
- public synchronized void replicationResponseReceived()
+ public void replicationResponseReceived()
{
- log.info("** got replication response in replicator");
+ //long sequence = responseSequence++;
+ Set<Channel> waitingChannels = waitingChannelsQueue.remove();
+
for (Channel channel : waitingChannels)
- {
+ {
channel.replicationResponseReceived(this);
}
-
- responseReceived = true;
}
- public synchronized boolean isResponseReceived()
- {
- return responseReceived;
- }
-
public void execute(final ReplicableAction action)
{
// First we execute the action
-
- log.info("Running action locally");
-
+
JBMThread thread = JBMThread.currentThread();
- //List<Pair<Long, Integer>> sequences = new ArrayList<Pair<Long, Integer>>();
+ this.currentChannels = new HashSet<Channel>();
thread.setRecord(this);
-
+
action.run();
- thread.endRecord();
+ thread.setNoReplayOrRecord();
- log.info("Ran action locally");
+ List<Long> sequences = JBMThread.currentThread().getSequences();
- List<Pair<Long, Integer>> sequences = JBMThread.currentThread().getSequences();
-
- dumpSequences(sequences);
+ // dumpSequences(sequences);
// We then send the sequences to the backup
-
- log.info("Replicated sequences");
- Packet packet = new ReplicateLockSequenceMessage(sequences);
+ if (!currentChannels.isEmpty())
+ {
+ waitingChannelsQueue.add(currentChannels);
+ }
+ Packet packet = new ReplicateLockSequenceMessage(sequences, !currentChannels.isEmpty());
+
replicatingChannel.send(packet);
// Next we replicate the actual action
- replicatingChannel.send(action.getPacket());
-
- log.info("replicated packet " + action.getPacket().getType());
+ replicatingChannel.send(action.getPacket());
}
- private void dumpSequences(List<Pair<Long, Integer>> sequences)
+ private void dumpSequences(List<Long> sequences)
{
log.info("Sequences size is " + sequences.size());
- for (Pair<Long, Integer> pair: sequences)
+ for (long sequence: sequences)
{
- log.info(pair.a + ": " + pair.b);
+ log.info(sequence);
}
}
Deleted: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/StatefulObjectReadWriteLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/StatefulObjectReadWriteLock.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/StatefulObjectReadWriteLock.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -1,243 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.replication.impl;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.ConcurrentHashSet;
-import org.jboss.messaging.utils.Pair;
-
-/**
- * A StatefulObjectReadWriteLock
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class StatefulObjectReadWriteLock implements ReadWriteLock
-{
- private static final Logger log = Logger.getLogger(StatefulObjectReadWriteLock.class);
-
- private final Lock readLock = new StatefulObjectReadLock();
-
- private final Lock writeLock = new StatefulObjectWriteLock();
-
- private long objectID;
-
- private final ReadWriteLock rwLock;
-
- private final AtomicInteger counter;
-
- private final PriorityLock sequencedLock;
-
- private String name;
-
- public StatefulObjectReadWriteLock(final String name, final long objectID, final int initialCount)
- {
- this.name = name;
-
- this.objectID = objectID;
-
- rwLock = new ReentrantReadWriteLock();
-
- sequencedLock = new PriorityLock(initialCount);
-
- counter = new AtomicInteger(initialCount);
- }
-
- public Lock readLock()
- {
- return readLock;
- }
-
- public Lock writeLock()
- {
- return writeLock;
- }
-
- //For debug
- private Set<Thread> owners = new ConcurrentHashSet<Thread>();
-
- private boolean doLock(long time, TimeUnit unit, boolean read) throws InterruptedException
- {
- JBMThread thread = JBMThread.currentThread();
-
- //debug only
- if (owners.contains(thread))
- {
- throw new IllegalStateException("Stateful lock is NOT re-entrant!");
- }
-
- if (thread.isReplay())
- {
- Pair<Long, Integer> sequence = thread.getNextSequence();
-
- log.info(name + " Attempting to get lock on backup " + sequence.b, new Exception());
-
- if (sequence.a != objectID)
- {
- throw new IllegalStateException("Invalid object id " + sequence.a + " expected " + objectID);
- }
-
- sequencedLock.lock(sequence.b);
-
- owners.add(thread);
-
- return true;
- }
- else
- {
- boolean ok;
-
- if (read)
- {
- ok = rwLock.readLock().tryLock(time, unit);
- }
- else
- {
- ok = rwLock.writeLock().tryLock(time, unit);
- }
-
- if (ok)
- {
- log.info(name + " added sequence on live " + counter.get(), new Exception());
-
- thread.addSequence(new Pair<Long, Integer>(objectID, counter.getAndIncrement()));
-
- owners.add(thread);
- }
-
- return ok;
- }
- }
-
- private void doUnlock(final boolean read)
- {
- JBMThread thread = JBMThread.currentThread();
-
- if (thread.isReplay())
- {
- sequencedLock.unlock();
- }
- else
- {
- if (read)
- {
- rwLock.readLock().unlock();
- }
- else
- {
- rwLock.writeLock().unlock();
- }
- }
-
- owners.remove(thread);
- }
-
- private class StatefulObjectReadLock implements Lock
- {
- public void lock()
- {
- //throw new UnsupportedOperationException();
- try
- {
- doLock(10000, TimeUnit.MILLISECONDS, true);
- }
- catch (InterruptedException e)
- {
- }
- }
-
- public void lockInterruptibly() throws InterruptedException
- {
- throw new UnsupportedOperationException();
- }
-
- public Condition newCondition()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean tryLock()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
- {
- return doLock(time, unit, true);
- }
-
- public void unlock()
- {
- doUnlock(true);
- }
- }
-
- private class StatefulObjectWriteLock implements Lock
- {
- public void lock()
- {
- //throw new UnsupportedOperationException();
- try
- {
- doLock(10000, TimeUnit.MILLISECONDS, false);
- }
- catch (InterruptedException e)
- {
- }
- }
-
- public void lockInterruptibly() throws InterruptedException
- {
- throw new UnsupportedOperationException();
- }
-
- public Condition newCondition()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean tryLock()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
- {
- return doLock(time, unit, false);
- }
-
- public void unlock()
- {
- doUnlock(false);
- }
- }
-}
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/SimpleAutomaticFailoverTest.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -83,67 +83,142 @@
// Public --------------------------------------------------------
- public void testReplication() throws Exception
+ public void testReplication1() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-
- sf.setProducerWindowSize(32 * 1024);
-
- log.info("creating session");
-
- ClientSession session = sf.createSession(false, true, true);
-
- log.info("created session");
-
- session.createQueue(ADDRESS, ADDRESS, null, false);
-
- log.info("created queue");
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- log.info("created producer");
-
- final int numMessages = 1;
-
- for (int i = 0; i < numMessages; i++)
+ for (int j = 0; j < 5000; j++)
{
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().writeString("aardvarks");
- producer.send(message);
+ log.info("Iteration " + j);
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ sf.setProducerWindowSize(32 * 1024);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 100;
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().writeString("aardvarks");
+ producer.send(message);
+ }
+
+ //Thread.sleep(500);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ log.info("sent messages");
+
+ session.start();
+
+ log.info("Started session");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().readString());
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ message2.acknowledge();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("That took " + (end - start));
+
+ ClientMessage message3 = consumer.receive(250);
+
+ assertNull(message3);
+
+ session.close();
+
+ tearDown();
+
+ setUp();
}
-
- // Thread.sleep(30000);
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- log.info("created consumer");
-
- session.start();
-
- log.info("started session");
-
- for (int i = 0; i < numMessages; i++)
+ }
+
+ public void testReplication2() throws Exception
+ {
+ for (int j = 0; j < 5000; j++)
{
- ClientMessage message2 = consumer.receive();
-
- assertEquals("aardvarks", message2.getBody().readString());
- assertEquals(i, message2.getProperty(new SimpleString("count")));
-
- message2.acknowledge();
+ log.info("Iteration " + j);
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ sf.setProducerWindowSize(32 * 1024);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ log.info("sent messages");
+
+ session.start();
+
+ //Thread.sleep(500);
+
+ final int numMessages = 1000;
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().writeString("aardvarks");
+ producer.send(message);
+ }
+
+ //Thread.sleep(500);
+
+ log.info("Started session");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().readString());
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ message2.acknowledge();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("That took " + (end - start));
+
+ ClientMessage message3 = consumer.receive(250);
+
+ assertNull(message3);
+
+ session.close();
+
+ tearDown();
+
+ setUp();
}
-
- log.info("consumed messages");
-
- ClientMessage message3 = consumer.receive(250);
-
- assertNull(message3);
-
- session.close();
}
public void testFailoverSameConnectionFactory() throws Exception
@@ -173,7 +248,7 @@
message.getBody().writeString("aardvarks");
producer.send(message);
}
-
+
RemotingConnection conn1 = ((ClientSessionImpl)session).getConnection();
// Simulate failure on connection
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-07-02 04:35:48 UTC (rev 7512)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-07-02 11:26:07 UTC (rev 7513)
@@ -954,7 +954,7 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.postoffice.Binding#getID()
*/
- public int getID()
+ public long getID()
{
return 0;
More information about the jboss-cvs-commits
mailing list