[jboss-cvs] JBoss Messaging SVN: r1552 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss: jms/client jms/client/container jms/client/delegate jms/server/endpoint jms/server/endpoint/advised messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 3 07:41:56 EST 2006
Author: timfox
Date: 2006-11-03 07:41:49 -0500 (Fri, 03 Nov 2006)
New Revision: 1552
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
Log:
A few changes - tidying up, cosmetic, better separation of concerns in failover handling
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2006-11-03 12:41:49 UTC (rev 1552)
@@ -141,7 +141,7 @@
// call pre or postDeliver so messages won't be acked, or stored in session/tx
sess = conn.createSessionDelegate(false, Session.CLIENT_ACKNOWLEDGE, false);
- cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true,-1l,-1);
+ cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true);
}
finally
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java 2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java 2006-11-03 12:41:49 UTC (rev 1552)
@@ -252,7 +252,7 @@
tccc.set(getClass().getClassLoader());
ConsumerDelegate consumerDelegate = delegate.
- createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false,-1l, -1);
+ createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false);
return new JBossMessageConsumer(consumerDelegate);
}
@@ -305,7 +305,7 @@
tccc.set(getClass().getClassLoader());
ConsumerDelegate consumerDelegate =
- delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false,-1l,-1);
+ delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false);
return new JBossMessageConsumer(consumerDelegate);
}
@@ -339,7 +339,7 @@
messageSelector = null;
}
ConsumerDelegate consumerDelegate =
- delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false,-1l,-1);
+ delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false);
return new JBossMessageConsumer(consumerDelegate);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-11-03 12:41:49 UTC (rev 1552)
@@ -351,11 +351,9 @@
if (trace) { log.trace("handleFailoverOnConsumer: creating alternate consumer"); }
ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
- createConsumerDelegate((JBossDestination)failedConsumerState.getDestination(),
+ failOverConsumer((JBossDestination)failedConsumerState.getDestination(),
failedConsumerState.getSelector(),
- failedConsumerState.isNoLocal(),
- failedConsumerState.getSubscriptionName(),
- false,
+ failedConsumerState.isNoLocal(),
failedConsumerDelegate.getChannelId(),
oldServerID);
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-11-03 12:41:49 UTC (rev 1552)
@@ -186,12 +186,18 @@
*/
public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
- boolean connectionConsumer,
- long oldChannelID,
- int nodeId) throws JMSException
+ boolean connectionConsumer) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
+
+ public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
+ String selectorString,
+ boolean noLocal,
+ long oldchannelID, int nodeId) throws JMSException
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
/**
* This invocation should either be handled by the client-side interceptor chain or by the
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-03 12:41:49 UTC (rev 1552)
@@ -109,8 +109,8 @@
private PostOffice topicPostOffice;
private PostOffice queuePostOffice;
private int nodeId;
+
-
// Constructors --------------------------------------------------
protected ServerSessionEndpoint(int sessionID, ServerConnectionEndpoint connectionEndpoint)
@@ -138,29 +138,85 @@
// SessionDelegate implementation --------------------------------
- public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
- String selectorString,
- boolean noLocal,
- String subscriptionName,
- boolean isCC, long oldchannelID, int nodeId) throws JMSException
+ /*
+ * Separation of concerns.
+ * This code DOES NOT belong in the createConsumerDelegate() method
+ */
+ public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
+ String selectorString,
+ boolean noLocal,
+ long oldChannelID, int nodeId) throws JMSException
{
- // TODO - Remove this log.info before merging into trunk
- log.info("createConsumerDelegate nodeId=" + nodeId + " oldChannelId=" + oldchannelID);
+ try
+ {
+ ((ClusteredPostOffice)topicPostOffice).failOver(nodeId);
- if (nodeId < 0) nodeId = this.nodeId;
-
- if (nodeId != this.nodeId) // this is temporary
- {
- try
+ // fail over channel
+ PostOffice postOfficeToUse = null;
+ if (jmsDestination.isTopic())
+ {
+ postOfficeToUse = topicPostOffice;
+ }
+ else
+ {
+ postOfficeToUse = queuePostOffice;
+ }
+
+ if (postOfficeToUse.isLocal())
+ {
+ throw new IllegalStateException("Cannot failover on a non clustered post office!");
+ }
+
+ // this is a Clustered operation... so postOffice here must be Clustered
+ Binding binding = ((ClusteredPostOffice)postOfficeToUse).getBindingforChannelId(oldChannelID);
+ if (binding == null)
+ {
+ throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
+ }
+
+ // TODO - Remove this log.info before merging into trunk
+ if (binding.getQueue() instanceof RemoteQueueStub)
{
- ((ClusteredPostOffice)topicPostOffice).failOver(nodeId);
+ log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((RemoteQueueStub)binding.getQueue()).getChannelID());
}
- catch (Exception e)
+ else
{
- e.printStackTrace();
- }
+ log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((PagingFilteredQueue)binding.getQueue()).getChannelID());
+ }
+
+ int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
+
+ int prefetchSize = connectionEndpoint.getPrefetchSize();
+
+ ServerConsumerEndpoint ep =
+ new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(), binding.getQueue().getName(),
+ this, selectorString, noLocal, jmsDestination, prefetchSize, nodeId);
+
+ JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
+
+ ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(), prefetchSize);
+
+ putConsumerEndpoint(consumerID, ep); // caching consumer locally
+
+ connectionEndpoint.getServerPeer().putConsumerEndpoint(consumerID, ep); // cachin consumer in server peer
+
+ return stub;
}
-
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " createConsumerDelegate");
+ }
+ }
+
+ /*
+ * Please don't put failover logic in here
+ */
+ public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
+ String selectorString,
+ boolean noLocal,
+ String subscriptionName,
+ boolean isCC) throws JMSException
+ {
try
{
if (closed)
@@ -205,27 +261,8 @@
selector = new Selector(selectorString);
}
- // fail over channel
- if (oldchannelID >= 0)
+ if (jmsDestination.isTopic())
{
- PostOffice postOfficeToUse = null;
- if (jmsDestination.isTopic())
- {
- postOfficeToUse = topicPostOffice;
- }
- else
- {
- postOfficeToUse = queuePostOffice;
- }
- // this is a Clustered operation... so postOffice here must be Clustered
- binding = ((ClusteredPostOffice)postOfficeToUse).getBindingforChannelId(oldchannelID);
- if (binding==null)
- {
- throw new JMSException("Can't find failed over channel " + oldchannelID);
- }
- }
- else if (jmsDestination.isTopic())
- {
if (subscriptionName == null)
{
// non-durable subscription
@@ -419,23 +456,10 @@
}
int prefetchSize = connectionEndpoint.getPrefetchSize();
-
-
- if (oldchannelID>=0)
- {
- // TODO - Remove this log.info before merging into trunk
- if (binding.getQueue() instanceof RemoteQueueStub)
- {
- log.info("OldChannelId=" + oldchannelID + " while currentChannelId=" + ((RemoteQueueStub)binding.getQueue()).getChannelID());
- }
- else
- {
- log.info("OldChannelId=" + oldchannelID + " while currentChannelId=" + ((PagingFilteredQueue)binding.getQueue()).getChannelID());
- }
- }
+
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(), binding.getQueue().getName(),
- this, selectorString, noLocal, jmsDestination, prefetchSize,nodeId);
+ this, selectorString, noLocal, jmsDestination, prefetchSize, nodeId);
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-11-03 12:41:49 UTC (rev 1552)
@@ -45,9 +45,14 @@
*/
public interface SessionEndpoint extends Closeable
{
+ ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
+ String selectorString,
+ boolean noLocal,
+ long oldchannelID, int nodeId) throws JMSException;
+
ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
- boolean connectionConsumer, long oldchannelID, int serverId) throws JMSException;
+ boolean connectionConsumer) throws JMSException;
BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
throws JMSException;
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-11-03 12:41:49 UTC (rev 1552)
@@ -85,10 +85,18 @@
public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
- boolean connectionConsumer,long oldchannelID,int nodeId) throws JMSException
+ boolean connectionConsumer) throws JMSException
{
- return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer, oldchannelID,nodeId);
+ return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer);
}
+
+ public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
+ String selectorString,
+ boolean noLocal,
+ long oldChannelID, int nodeId) throws JMSException
+ {
+ return endpoint.failOverConsumer(jmsDestination, selectorString, noLocal, oldChannelID, nodeId);
+ }
public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
throws JMSException
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-03 12:41:49 UTC (rev 1552)
@@ -1099,59 +1099,70 @@
public void failOver(int nodeId) throws Exception
{
- log.info("Preparing failover against node " + nodeId);
- Map subMaps = (Map)nameMaps.get(new Integer(nodeId));
- ArrayList namesToRemove = new ArrayList();
- for (Iterator iterNames = subMaps.entrySet().iterator();iterNames.hasNext();)
- {
- Map.Entry entry = (Map.Entry)iterNames.next();
- Binding binding = (Binding )entry.getValue();
- if (binding.getQueue().isClustered())
+ //Need to lock
+ lock.writeLock().acquire();
+
+ try
+ {
+ log.info("Preparing failover against node " + nodeId);
+ Map subMaps = (Map)nameMaps.get(new Integer(nodeId));
+ ArrayList namesToRemove = new ArrayList();
+ for (Iterator iterNames = subMaps.entrySet().iterator();iterNames.hasNext();)
{
+ Map.Entry entry = (Map.Entry)iterNames.next();
+ Binding binding = (Binding )entry.getValue();
+ if (!binding.getQueue().isClustered())
+ {
+ throw new IllegalStateException("Queue is not clustered!: " + binding.getQueue().getName());
+ }
ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
- if (!queue.isLocal())
+ if (queue.isLocal())
{
- namesToRemove.add(entry);
+ throw new IllegalStateException("Queue is local!: " + binding.getQueue().getName());
}
+ namesToRemove.add(entry);
}
- }
-
- for (Iterator iterNames = namesToRemove.iterator();iterNames.hasNext();)
- {
- Map.Entry entry = (Map.Entry)iterNames.next();
- Binding binding = (Binding)entry.getValue();
- RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
- this.removeBinding(nodeId,(String)entry.getKey());
-
- this.deleteBinding(nodeId,(String)entry.getKey());
-
- UnbindRequest unbindRequest = new UnbindRequest(nodeId, stub.getName());
- syncSendRequest(unbindRequest);
-
- // A failed over queue will have the flag failover, only if there isn't another local queue with the same name
- // In case this node doesn't have that queue, we will simply assume the queue as nothing else had happened.
- boolean failed = this.getBindingForQueueName(stub.getName())!=null;
-
- if (!failed)
+
+ for (Iterator iterNames = namesToRemove.iterator();iterNames.hasNext();)
{
- log.info("The current node didn't have a queue " + stub.getName() + " so it's assuming the queue as a regular queue");
+ Map.Entry entry = (Map.Entry)iterNames.next();
+ Binding binding = (Binding)entry.getValue();
+ RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
+ this.removeBinding(nodeId,(String)entry.getKey());
+
+ this.deleteBinding(nodeId,(String)entry.getKey());
+
+ UnbindRequest unbindRequest = new UnbindRequest(nodeId, stub.getName());
+ syncSendRequest(unbindRequest);
+
+ // A failed over queue will have the flag failover set only if there isn't another local queue with the same name
+ // In case this node doesn't have that queue, we will simply assume the queue as nothing else had happened.
+ boolean failed = this.getBindingForQueueName(stub.getName()) != null;
+
+ if (!failed)
+ {
+ log.info("The current node didn't have a queue " + stub.getName() + " so it's assuming the queue as a regular queue");
+ }
+
+ Binding newBinding = this.createBinding(this.nodeId, binding.getCondition(),
+ stub.getName(), stub.getChannelID(),
+ stub.getFilter(), stub.isRecoverable(), failed);
+
+ insertBinding(newBinding);
+
+ LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
+ clusteredQueue.deactivate();
+ clusteredQueue.load();
+ clusteredQueue.activate();
+ addBinding(newBinding);
+ System.out.println("**** sending binding on " + binding.getQueue().getName() + " with condition=" + binding.getCondition());
+ sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
}
-
-
- Binding newBinding = this.createBinding(this.nodeId, binding.getCondition(),
- stub.getName(), stub.getChannelID(),
- stub.getFilter(), stub.isRecoverable(), failed);
-
- insertBinding(newBinding);
-
- LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
- clusteredQueue.deactivate();
- clusteredQueue.load();
- clusteredQueue.activate();
- addBinding(newBinding);
- System.out.println("**** sending binding on " + binding.getQueue().getName() + " with condition=" + binding.getCondition());
- sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
}
+ finally
+ {
+ lock.writeLock().release();
+ }
}
public Binding getBindingforChannelId(long channelId) throws Exception
@@ -1160,38 +1171,33 @@
try
{
-
Map channelMap = (Map)failedBindings.get(new Integer(nodeId));
Binding binding = null;
- if (channelMap!=null)
+ if (channelMap != null)
{
binding = (Binding)channelMap.get(new Long(channelId));
}
- if (binding==null)
+ if (binding == null)
{
-
Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
-
if (nameMap != null)
{
- for (Iterator iterbindings = nameMap.values().iterator();iterbindings.hasNext();)
+ for (Iterator iterbindings = nameMap.values().iterator(); iterbindings.hasNext();)
{
Binding itemBinding = (Binding)iterbindings.next();
- if (itemBinding.getQueue().getChannelID()==channelId)
+ if (itemBinding.getQueue().getChannelID() == channelId)
{
- binding=itemBinding;
+ binding = itemBinding;
break;
}
}
-
}
else
{
log.info("nameMap is null");
}
-
}
log.info("Returned " + binding);
return binding;
@@ -1206,12 +1212,10 @@
public String printBindingInformation()
{
-
StringWriter buffer = new StringWriter();
PrintWriter out = new PrintWriter(buffer);
out.print(super.printBindingInformation());
-
out.println("<table border=1><tr><td>Node</td><td>ChannelID</td><td>Binding</td>");
for (Iterator iter = this.failedBindings.entrySet().iterator(); iter.hasNext();)
@@ -1235,10 +1239,8 @@
}
}
-
out.println("</table>");
-
out.println("<br>Router Information");
for (Iterator iterRouter = routerMap.entrySet().iterator();iterRouter.hasNext();)
@@ -1255,11 +1257,8 @@
}
}
-
-
+
return buffer.toString();
-
-
}
@@ -1437,7 +1436,7 @@
Message message = new Message(null, null, bytes);
- controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, 0);
+ controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
if (trace) { log.info(this.nodeId + " sent and executed ok"); }
}
More information about the jboss-cvs-commits
mailing list