[jboss-cvs] JBoss Messaging SVN: r1554 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss: jms/client/container jms/client/delegate jms/server/endpoint jms/server/endpoint/advised messaging/core/plugin/postoffice messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 3 18:26:59 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-11-03 18:26:52 -0500 (Fri, 03 Nov 2006)
New Revision: 1554
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-519 - binding AOP to failOver method and fixing locking issue between failOver and getBinding method (both locking the same object)
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-11-03 23:25:20 UTC (rev 1553)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-11-03 23:26:52 UTC (rev 1554)
@@ -354,6 +354,8 @@
failOverConsumer((JBossDestination)failedConsumerState.getDestination(),
failedConsumerState.getSelector(),
failedConsumerState.isNoLocal(),
+ failedConsumerState.getSubscriptionName(),
+ failedConsumerState.isConnectionConsumer(),
failedConsumerDelegate.getChannelId(),
oldServerID);
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-11-03 23:25:20 UTC (rev 1553)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-11-03 23:26:52 UTC (rev 1554)
@@ -51,6 +51,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
*
* @version <tt>$Revision$</tt>
*
@@ -190,11 +191,16 @@
{
throw new IllegalStateException("This invocation should not be handled here!");
}
-
+
+ /**
+ * @see org.jboss.jms.server.endpoint.ServerSessionEndpoint#failOverConsumer(org.jboss.jms.destination.JBossDestination, String, boolean, String, boolean, long, int)
+ * @see org.jboss.jms.client.container.StateCreationAspect#handleCreateConsumerDelegate(org.jboss.aop.joinpoint.Invocation)
+ * */
public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
String selectorString,
- boolean noLocal,
- long oldchannelID, int nodeId) throws JMSException
+ boolean noLocal, String subscriptionName,
+ boolean connectionConsumer,
+ long oldChannelID, int nodeId) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-03 23:25:20 UTC (rev 1553)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-03 23:26:52 UTC (rev 1554)
@@ -88,11 +88,11 @@
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
-
+
private boolean trace = log.isTraceEnabled();
private int sessionID;
-
+
private boolean closed;
private ServerConnectionEndpoint connectionEndpoint;
@@ -110,14 +110,14 @@
private PostOffice queuePostOffice;
private int nodeId;
-
+
// Constructors --------------------------------------------------
protected ServerSessionEndpoint(int sessionID, ServerConnectionEndpoint connectionEndpoint)
throws Exception
{
this.sessionID = sessionID;
-
+
this.connectionEndpoint = connectionEndpoint;
ServerPeer sp = connectionEndpoint.getServerPeer();
@@ -133,24 +133,25 @@
tr = sp.getTxRepository();
consumers = new HashMap();
- browsers = new HashMap();
+ browsers = new HashMap();
}
-
+
// SessionDelegate implementation --------------------------------
-
+
/*
* Separation of concerns.
* This code DOES NOT belong in the createConsumerDelegate() method
*/
public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
String selectorString,
- boolean noLocal,
+ boolean noLocal, String subscriptionName,
+ boolean connectionConsumer,
long oldChannelID, int nodeId) throws JMSException
{
try
{
((ClusteredPostOffice)topicPostOffice).failOver(nodeId);
-
+
// fail over channel
PostOffice postOfficeToUse = null;
if (jmsDestination.isTopic())
@@ -161,19 +162,19 @@
{
postOfficeToUse = queuePostOffice;
}
-
+
if (postOfficeToUse.isLocal())
{
throw new IllegalStateException("Cannot failover on a non clustered post office!");
}
-
+
// this is a Clustered operation... so postOffice here must be Clustered
Binding binding = ((ClusteredPostOffice)postOfficeToUse).getBindingforChannelId(oldChannelID);
if (binding == null)
{
throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
- }
-
+ }
+
// TODO - Remove this log.info before merging into trunk
if (binding.getQueue() instanceof RemoteQueueStub)
{
@@ -182,32 +183,32 @@
else
{
log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((PagingFilteredQueue)binding.getQueue()).getChannelID());
- }
-
+ }
+
int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
-
+
int prefetchSize = connectionEndpoint.getPrefetchSize();
-
+
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(), binding.getQueue().getName(),
this, selectorString, noLocal, jmsDestination, prefetchSize, nodeId);
-
+
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
-
+
ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(), prefetchSize);
-
+
putConsumerEndpoint(consumerID, ep); // caching consumer locally
-
+
connectionEndpoint.getServerPeer().putConsumerEndpoint(consumerID, ep); // cachin consumer in server peer
-
+
return stub;
}
catch (Throwable t)
{
throw ExceptionUtil.handleJMSInvocation(t, this + " createConsumerDelegate");
- }
+ }
}
-
+
/*
* Please don't put failover logic in here
*/
@@ -216,28 +217,28 @@
boolean noLocal,
String subscriptionName,
boolean isCC) throws JMSException
- {
+ {
try
{
if (closed)
{
throw new IllegalStateException("Session is closed");
}
-
+
if ("".equals(selectorString))
{
selectorString = null;
}
-
+
log.debug("creating consumer for " + jmsDestination + ", selector " + selectorString + ", " + (noLocal ? "noLocal, " : "") + "subscription " + subscriptionName);
-
+
ManagedDestination mDest = dm.getDestination(jmsDestination.getName(), jmsDestination.isQueue());
-
+
if (mDest == null)
{
throw new InvalidDestinationException("No such destination: " + jmsDestination);
}
-
+
if (jmsDestination.isTemporary())
{
// Can only create a consumer for a temporary destination on the same connection
@@ -248,12 +249,12 @@
"to that which created the temporary destination";
throw new IllegalStateException(msg);
}
- }
-
+ }
+
int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
-
+
Binding binding = null;
-
+
// Always validate the selector first
Selector selector = null;
if (selectorString != null)
@@ -267,41 +268,41 @@
{
// non-durable subscription
if (log.isTraceEnabled()) { log.trace("creating new non-durable subscription on " + jmsDestination); }
-
+
//Create the non durable sub
QueuedExecutor executor = (QueuedExecutor)pool.get();
-
+
PagingFilteredQueue q;
-
+
if (topicPostOffice.isLocal())
{
- q = new PagingFilteredQueue(new GUID().toString(), idm.getId(), ms, pm, true, false,
+ q = new PagingFilteredQueue(new GUID().toString(), idm.getId(), ms, pm, true, false,
executor, selector,
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize());
-
+
binding = topicPostOffice.bindQueue(jmsDestination.getName(), q);
}
else
{
- q = new LocalClusteredQueue(topicPostOffice, nodeId, new GUID().toString(), idm.getId(), ms, pm, true, false,
+ q = new LocalClusteredQueue(topicPostOffice, nodeId, new GUID().toString(), idm.getId(), ms, pm, true, false,
executor, selector, tr,
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize());
-
+
ClusteredPostOffice cpo = (ClusteredPostOffice)topicPostOffice;
-
+
if (mDest.isClustered())
- {
+ {
binding = cpo.bindClusteredQueue(jmsDestination.getName(), (LocalClusteredQueue)q);
}
else
{
binding = cpo.bindQueue(jmsDestination.getName(), q);
}
- }
+ }
}
else
{
@@ -309,110 +310,110 @@
{
throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
}
-
+
// we have a durable subscription, look it up
String clientID = connectionEndpoint.getClientID();
if (clientID == null)
{
throw new JMSException("Cannot create durable subscriber without a valid client ID");
}
-
+
// See if there any bindings with the same client_id.subscription_name name
-
+
String name = MessageQueueNameHelper.createSubscriptionName(clientID, subscriptionName);
-
+
binding = topicPostOffice.getBindingForQueueName(name);
-
+
if (binding == null)
{
//Does not already exist
-
+
if (trace) { log.trace("creating new durable subscription on " + jmsDestination); }
-
+
QueuedExecutor executor = (QueuedExecutor)pool.get();
PagingFilteredQueue q;
-
+
if (topicPostOffice.isLocal())
{
- q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
+ q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
executor, selector,
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize());
-
+
binding = topicPostOffice.bindQueue(jmsDestination.getName(), q);
}
else
{
- q = new LocalClusteredQueue(topicPostOffice, nodeId, name, idm.getId(), ms, pm, true, true,
+ q = new LocalClusteredQueue(topicPostOffice, nodeId, name, idm.getId(), ms, pm, true, true,
executor, selector, tr,
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize());
-
+
ClusteredPostOffice cpo = (ClusteredPostOffice)topicPostOffice;
-
+
if (mDest.isClustered())
- {
+ {
binding = cpo.bindClusteredQueue(jmsDestination.getName(), (LocalClusteredQueue)q);
}
else
{
binding = cpo.bindQueue(jmsDestination.getName(), q);
}
- }
+ }
}
else
{
//Durable sub already exists
-
+
if (trace) { log.trace("subscription " + subscriptionName + " already exists"); }
-
+
// From javax.jms.Session Javadoc (and also JMS 1.1 6.11.1):
// A client can change an existing durable subscription by creating a durable
// TopicSubscriber with the same name and a new topic and/or message selector.
// Changing a durable subscriber is equivalent to unsubscribing (deleting) the old
// one and creating a new one.
-
+
String filterString = binding.getQueue().getFilter() != null ? binding.getQueue().getFilter().getFilterString() : null;
-
+
boolean selectorChanged =
(selectorString == null && filterString != null) ||
(filterString == null && selectorString != null) ||
(filterString != null && selectorString != null &&
!filterString.equals(selectorString));
-
+
if (trace) { log.trace("selector " + (selectorChanged ? "has" : "has NOT") + " changed"); }
-
+
boolean topicChanged = !binding.getCondition().equals(jmsDestination.getName());
-
+
if (log.isTraceEnabled()) { log.trace("topic " + (topicChanged ? "has" : "has NOT") + " changed"); }
-
+
if (selectorChanged || topicChanged)
- {
+ {
if (trace) { log.trace("topic or selector changed so deleting old subscription"); }
-
+
// Unbind the durable subscription
-
+
if (mDest.isClustered() && !topicPostOffice.isLocal())
{
ClusteredPostOffice cpo = (ClusteredPostOffice)topicPostOffice;
-
+
cpo.unbindClusteredQueue(name);
}
else
- {
+ {
topicPostOffice.unbindQueue(name);
}
-
+
// create a fresh new subscription
-
+
QueuedExecutor executor = (QueuedExecutor)pool.get();
PagingFilteredQueue q;
-
+
if (topicPostOffice.isLocal())
{
- q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
+ q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
executor, selector,
mDest.getFullSize(),
mDest.getPageSize(),
@@ -421,42 +422,42 @@
}
else
{
- q = new LocalClusteredQueue(topicPostOffice, nodeId, name, idm.getId(), ms, pm, true, true,
+ q = new LocalClusteredQueue(topicPostOffice, nodeId, name, idm.getId(), ms, pm, true, true,
executor, selector, tr,
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize());
-
+
ClusteredPostOffice cpo = (ClusteredPostOffice)topicPostOffice;
-
+
if (mDest.isClustered())
- {
+ {
binding = cpo.bindClusteredQueue(jmsDestination.getName(), (LocalClusteredQueue)q);
}
else
{
binding = cpo.bindQueue(jmsDestination.getName(), (LocalClusteredQueue)q);
}
- }
- }
+ }
+ }
}
}
}
else
{
//Consumer on a jms queue
-
+
//Let's find the binding
binding = queuePostOffice.getBindingForQueueName(jmsDestination.getName());
-
+
if (binding == null)
{
throw new IllegalStateException("Cannot find binding for jms queue: " + jmsDestination.getName());
}
}
-
+
int prefetchSize = connectionEndpoint.getPrefetchSize();
-
+
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(), binding.getQueue().getName(),
this, selectorString, noLocal, jmsDestination, prefetchSize, nodeId);
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-11-03 23:25:20 UTC (rev 1553)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-11-03 23:26:52 UTC (rev 1554)
@@ -38,21 +38,27 @@
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
* @version <tt>$Revision$</tt>
*
* $Id$
*/
public interface SessionEndpoint extends Closeable
-{
+{
+
+ /** I'm using basically the same siganture as @link{createConsumerDelegate) as some of the aspects like StateCreation
+ * will need these parameters on the right order. We would need to create another aspect method and I prefered reuse the
+ * same aspect for this similar feature. */
ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
String selectorString,
- boolean noLocal,
+ boolean noLocal, String subscriptionName,
+ boolean connectionConsumer,
long oldchannelID, int nodeId) throws JMSException;
-
+
ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
- boolean connectionConsumer) throws JMSException;
+ boolean connectionConsumer) throws JMSException;
BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
throws JMSException;
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-11-03 23:25:20 UTC (rev 1553)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-11-03 23:26:52 UTC (rev 1554)
@@ -40,6 +40,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -92,10 +93,13 @@
public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
String selectorString,
- boolean noLocal,
+ boolean noLocal, String subscriptionName,
+ boolean connectionConsumer,
long oldChannelID, int nodeId) throws JMSException
{
- return endpoint.failOverConsumer(jmsDestination, selectorString, noLocal, oldChannelID, nodeId);
+ return endpoint.failOverConsumer(jmsDestination, selectorString, noLocal,
+ subscriptionName, connectionConsumer,
+ oldChannelID, nodeId);
}
public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-11-03 23:25:20 UTC (rev 1553)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-11-03 23:26:52 UTC (rev 1554)
@@ -257,22 +257,7 @@
try
{
- Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
-
- Binding binding = null;
-
- if (nameMap != null)
- {
- binding = (Binding)nameMap.get(queueName);
- }
- else
- {
- log.info("nameMap is null");
- }
-
-
- log.info("Returned " + binding);
- return binding;
+ return internalGetBindingForQueueName(queueName);
}
finally
{
@@ -280,6 +265,28 @@
}
}
+ /** Internal methods (e.g. failOver) will already hold a lock and will need to call getBindingForQueueNames without a lock.
+ * (Also... I dind't move this method to the protected section of the code as this is related to getBindingForQueueNames */
+ protected Binding internalGetBindingForQueueName(String queueName)
+ {
+ Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
+
+ Binding binding = null;
+
+ if (nameMap != null)
+ {
+ binding = (Binding)nameMap.get(queueName);
+ }
+ else
+ {
+ log.info("nameMap is null");
+ }
+
+
+ log.info("Returned " + binding);
+ return binding;
+ }
+
public void recover() throws Exception
{
//NOOP
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-03 23:25:20 UTC (rev 1553)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-03 23:26:52 UTC (rev 1554)
@@ -1100,10 +1100,12 @@
public void failOver(int nodeId) throws Exception
{
//Need to lock
+ log.info("Preparing failover against node " + nodeId);
lock.writeLock().acquire();
-
+ log.info("Acquired the lock");
+
try
- {
+ {
log.info("Preparing failover against node " + nodeId);
Map subMaps = (Map)nameMaps.get(new Integer(nodeId));
ArrayList namesToRemove = new ArrayList();
@@ -1120,36 +1122,36 @@
{
throw new IllegalStateException("Queue is local!: " + binding.getQueue().getName());
}
- namesToRemove.add(entry);
+ namesToRemove.add(entry);
}
-
+
for (Iterator iterNames = namesToRemove.iterator();iterNames.hasNext();)
{
Map.Entry entry = (Map.Entry)iterNames.next();
Binding binding = (Binding)entry.getValue();
RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
this.removeBinding(nodeId,(String)entry.getKey());
-
+
this.deleteBinding(nodeId,(String)entry.getKey());
-
+
UnbindRequest unbindRequest = new UnbindRequest(nodeId, stub.getName());
syncSendRequest(unbindRequest);
-
+
// A failed over queue will have the flag failover set only if there isn't another local queue with the same name
// In case this node doesn't have that queue, we will simply assume the queue as nothing else had happened.
- boolean failed = this.getBindingForQueueName(stub.getName()) != null;
-
+ boolean failed = this.internalGetBindingForQueueName(stub.getName()) != null;
+
if (!failed)
{
log.info("The current node didn't have a queue " + stub.getName() + " so it's assuming the queue as a regular queue");
}
-
+
Binding newBinding = this.createBinding(this.nodeId, binding.getCondition(),
stub.getName(), stub.getChannelID(),
stub.getFilter(), stub.isRecoverable(), failed);
-
+
insertBinding(newBinding);
-
+
LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
clusteredQueue.deactivate();
clusteredQueue.load();
@@ -1257,12 +1259,12 @@
}
}
-
+
return buffer.toString();
}
-
+
// Protected ---------------------------------------------------------------------------------------
@@ -1328,75 +1330,75 @@
protected void removeFromConditionMap(Binding binding)
{
ClusteredBindings bindings = (ClusteredBindings)conditionMap.get(binding.getCondition());
-
+
if (bindings == null)
{
throw new IllegalStateException("Cannot find condition bindings for " + binding.getCondition());
}
-
+
boolean removed = bindings.removeBinding(binding);
-
+
if (!removed)
{
throw new IllegalStateException("Cannot find binding in condition binding list");
- }
-
+ }
+
if (bindings.isEmpty())
{
conditionMap.remove(binding.getCondition());
- }
-
+ }
+
String queueName = binding.getQueue().getName();
-
+
ClusterRouter router = (ClusterRouter)routerMap.get(queueName);
-
+
if (router == null)
{
throw new IllegalStateException("Cannot find router with name " + queueName);
}
-
+
removed = router.remove(binding.getQueue());
-
+
if (!removed)
{
throw new IllegalStateException("Cannot find router in map");
}
-
+
if (router.getQueues().isEmpty())
{
routerMap.remove(queueName);
- }
+ }
}
protected void loadBindings() throws Exception
{
if (trace) { log.trace(this.nodeId + " loading bindings"); }
-
+
boolean isState = syncChannel.getState(null, stateTimeout);
-
+
if (!isState)
- {
+ {
//Must be first member in group or non clustered- we load the state ourself from the database
-
+
if (trace) { log.trace(this.nodeId + " First member of group- so loading bindings from db"); }
-
- super.loadBindings();
+
+ super.loadBindings();
}
else
{
//The state will be set in due course via the MessageListener - we must wait until this happens
-
+
if (trace) { log.trace(this.nodeId + " Not first member of group- so waiting for state to arrive...."); }
-
+
synchronized (setStateLock)
{
//TODO we should implement a timeout on this
while (!stateSet)
{
setStateLock.wait();
- }
+ }
}
-
+
if (trace) { log.trace(this.nodeId + " Received state"); }
}
}
@@ -1422,20 +1424,20 @@
}
// Private ------------------------------------------------------------------------------------------
-
+
/*
* Multicast a sync request
*/
private void syncSendRequest(ClusterRequest request) throws Exception
- {
+ {
if (trace) { log.info(this.nodeId + " sending synch request to group, request: " + request); }
System.out.println("***************Request Sent **************");
byte[] bytes = writeRequest(request);
-
- Message message = new Message(null, null, bytes);
-
+
+ Message message = new Message(null, null, bytes);
+
controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
if (trace) { log.info(this.nodeId + " sent and executed ok"); }
More information about the jboss-cvs-commits
mailing list