[jboss-cvs] JBoss Messaging SVN: r1552 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss: jms/client jms/client/container jms/client/delegate jms/server/endpoint jms/server/endpoint/advised messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 3 07:41:56 EST 2006


Author: timfox
Date: 2006-11-03 07:41:49 -0500 (Fri, 03 Nov 2006)
New Revision: 1552

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
Log:
A few changes - tidying up, cosmetic, better separation of concerns in failover handling


Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2006-11-03 12:41:49 UTC (rev 1552)
@@ -141,7 +141,7 @@
          // call pre or postDeliver so messages won't be acked, or stored in session/tx
          sess = conn.createSessionDelegate(false, Session.CLIENT_ACKNOWLEDGE, false);
 
-         cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true,-1l,-1);
+         cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true);
       }
       finally
       {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java	2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossSession.java	2006-11-03 12:41:49 UTC (rev 1552)
@@ -252,7 +252,7 @@
          tccc.set(getClass().getClassLoader());
 
          ConsumerDelegate consumerDelegate = delegate.
-            createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false,-1l, -1);
+            createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false);
          
          return new JBossMessageConsumer(consumerDelegate);
       }
@@ -305,7 +305,7 @@
          tccc.set(getClass().getClassLoader());
 
          ConsumerDelegate consumerDelegate =
-            delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false,-1l,-1);
+            delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false);
 
          return new JBossMessageConsumer(consumerDelegate);
       }
@@ -339,7 +339,7 @@
          messageSelector = null;
       }
       ConsumerDelegate consumerDelegate =
-         delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false,-1l,-1);
+         delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false);
       return new JBossMessageConsumer(consumerDelegate);
    }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-11-03 12:41:49 UTC (rev 1552)
@@ -351,11 +351,9 @@
       if (trace) { log.trace("handleFailoverOnConsumer: creating alternate consumer"); }
 
       ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
-         createConsumerDelegate((JBossDestination)failedConsumerState.getDestination(),
+         failOverConsumer((JBossDestination)failedConsumerState.getDestination(),
                                 failedConsumerState.getSelector(),
-                                failedConsumerState.isNoLocal(),
-                                failedConsumerState.getSubscriptionName(),
-                                false,
+                                failedConsumerState.isNoLocal(),                                
                                 failedConsumerDelegate.getChannelId(),
                                 oldServerID);
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-11-03 12:41:49 UTC (rev 1552)
@@ -186,12 +186,18 @@
     */
    public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                                   boolean noLocal, String subscriptionName,
-                                                  boolean connectionConsumer,
-                                                  long oldChannelID,
-                                                  int nodeId) throws JMSException
+                                                  boolean connectionConsumer) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
+   
+   public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
+                                            String selectorString,
+                                            boolean noLocal,
+                                            long oldchannelID, int nodeId) throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
 
    /**
     * This invocation should either be handled by the client-side interceptor chain or by the

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-03 12:41:49 UTC (rev 1552)
@@ -109,8 +109,8 @@
    private PostOffice topicPostOffice;
    private PostOffice queuePostOffice;
    private int nodeId;
+
    
-   
    // Constructors --------------------------------------------------
 
    protected ServerSessionEndpoint(int sessionID, ServerConnectionEndpoint connectionEndpoint)
@@ -138,29 +138,85 @@
    
    // SessionDelegate implementation --------------------------------
    
-	public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
-                                                  String selectorString,
-                                                  boolean noLocal,
-                                                  String subscriptionName,
-                                                  boolean isCC, long oldchannelID, int nodeId) throws JMSException
+   /*
+    * Separation of concerns.
+    * This code DOES NOT belong in the createConsumerDelegate() method
+    */
+   public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
+                                            String selectorString,
+                                            boolean noLocal,
+                                            long oldChannelID, int nodeId) throws JMSException
    {
-      // TODO - Remove this log.info before merging into trunk
-      log.info("createConsumerDelegate nodeId=" + nodeId + " oldChannelId=" + oldchannelID);
+      try
+      {
+         ((ClusteredPostOffice)topicPostOffice).failOver(nodeId);
       
-      if (nodeId < 0) nodeId = this.nodeId;
-
-      if (nodeId != this.nodeId)  // this is temporary
-      {
-          try
+         // fail over channel
+         PostOffice postOfficeToUse = null;
+         if (jmsDestination.isTopic())
+         {
+            postOfficeToUse = topicPostOffice;
+         }
+         else
+         {
+            postOfficeToUse = queuePostOffice;
+         }
+         
+         if (postOfficeToUse.isLocal())
+         {
+            throw new IllegalStateException("Cannot failover on a non clustered post office!");
+         }
+         
+         // this is a Clustered operation... so postOffice here must be Clustered
+         Binding binding = ((ClusteredPostOffice)postOfficeToUse).getBindingforChannelId(oldChannelID);
+         if (binding == null)
+         {
+            throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
+         }      
+                  
+          // TODO - Remove this log.info before merging into trunk
+          if (binding.getQueue() instanceof RemoteQueueStub)
           {
-            ((ClusteredPostOffice)topicPostOffice).failOver(nodeId);
+              log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((RemoteQueueStub)binding.getQueue()).getChannelID());
           }
-          catch (Exception e)
+          else
           {
-              e.printStackTrace();
-          }
+             log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((PagingFilteredQueue)binding.getQueue()).getChannelID());
+          }         
+          
+          int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
+                    
+          int prefetchSize = connectionEndpoint.getPrefetchSize();
+          
+          ServerConsumerEndpoint ep =
+             new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(), binding.getQueue().getName(),
+                                        this, selectorString, noLocal, jmsDestination, prefetchSize, nodeId);
+           
+          JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
+                      
+          ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(), prefetchSize);
+                        
+          putConsumerEndpoint(consumerID, ep); // caching consumer locally
+          
+          connectionEndpoint.getServerPeer().putConsumerEndpoint(consumerID, ep); // cachin consumer in server peer
+          
+          return stub;
       }
-      
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMSInvocation(t, this + " createConsumerDelegate");
+      }            
+   }
+   
+   /*
+    * Please don't put failover logic in here
+    */
+	public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
+                                                  String selectorString,
+                                                  boolean noLocal,
+                                                  String subscriptionName,
+                                                  boolean isCC) throws JMSException
+   {            
       try
       {
          if (closed)
@@ -205,27 +261,8 @@
             selector = new Selector(selectorString);
          }
 
-         // fail over channel
-         if (oldchannelID >= 0)
+         if (jmsDestination.isTopic())
          {
-             PostOffice postOfficeToUse = null;
-             if (jmsDestination.isTopic())
-             {
-                 postOfficeToUse = topicPostOffice;
-             }
-             else
-             {
-                 postOfficeToUse = queuePostOffice;
-             }
-             // this is a Clustered operation... so postOffice here must be Clustered
-             binding = ((ClusteredPostOffice)postOfficeToUse).getBindingforChannelId(oldchannelID);
-             if (binding==null)
-             {
-                 throw new JMSException("Can't find failed over channel " + oldchannelID);
-             }
-         }
-         else if (jmsDestination.isTopic())
-         {
             if (subscriptionName == null)
             {
                // non-durable subscription
@@ -419,23 +456,10 @@
          }
          
          int prefetchSize = connectionEndpoint.getPrefetchSize();
-
-
-         if (oldchannelID>=0)
-         {
-             // TODO - Remove this log.info before merging into trunk
-             if (binding.getQueue() instanceof RemoteQueueStub)
-             {
-                 log.info("OldChannelId=" + oldchannelID + " while currentChannelId=" + ((RemoteQueueStub)binding.getQueue()).getChannelID());
-             }
-             else
-             {
-                log.info("OldChannelId=" + oldchannelID + " while currentChannelId=" + ((PagingFilteredQueue)binding.getQueue()).getChannelID());
-             }
-         }
+        
          ServerConsumerEndpoint ep =
             new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(), binding.getQueue().getName(),
-                                       this, selectorString, noLocal, jmsDestination, prefetchSize,nodeId);
+                                       this, selectorString, noLocal, jmsDestination, prefetchSize, nodeId);
           
          JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
                      

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-11-03 12:41:49 UTC (rev 1552)
@@ -45,9 +45,14 @@
  */
 public interface SessionEndpoint extends Closeable
 { 
+   ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
+                                     String selectorString,
+                                     boolean noLocal,
+                                     long oldchannelID, int nodeId) throws JMSException;
+   
    ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                            boolean noLocal, String subscriptionName,
-                                           boolean connectionConsumer, long oldchannelID, int serverId) throws JMSException;   
+                                           boolean connectionConsumer) throws JMSException;   
    
    BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
       throws JMSException;

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-11-03 12:41:49 UTC (rev 1552)
@@ -85,10 +85,18 @@
    
    public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                                   boolean noLocal, String subscriptionName,
-                                                  boolean connectionConsumer,long oldchannelID,int nodeId) throws JMSException
+                                                  boolean connectionConsumer) throws JMSException
    {
-      return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer, oldchannelID,nodeId);
+      return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer);
    }
+   
+   public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
+                                            String selectorString,
+                                            boolean noLocal,
+                                            long oldChannelID, int nodeId) throws JMSException
+   {
+      return endpoint.failOverConsumer(jmsDestination, selectorString, noLocal, oldChannelID, nodeId);
+   }
 
    public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
       throws JMSException

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-03 06:26:37 UTC (rev 1551)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-03 12:41:49 UTC (rev 1552)
@@ -1099,59 +1099,70 @@
 
    public void failOver(int nodeId) throws Exception
    {
-      log.info("Preparing failover against node " + nodeId);
-      Map subMaps = (Map)nameMaps.get(new Integer(nodeId));
-      ArrayList namesToRemove = new ArrayList();
-      for (Iterator iterNames = subMaps.entrySet().iterator();iterNames.hasNext();)
-      {
-         Map.Entry entry = (Map.Entry)iterNames.next();
-         Binding binding = (Binding )entry.getValue();
-         if (binding.getQueue().isClustered())
+      //Need to lock
+      lock.writeLock().acquire();
+      
+      try
+      {         
+         log.info("Preparing failover against node " + nodeId);
+         Map subMaps = (Map)nameMaps.get(new Integer(nodeId));
+         ArrayList namesToRemove = new ArrayList();
+         for (Iterator iterNames = subMaps.entrySet().iterator();iterNames.hasNext();)
          {
+            Map.Entry entry = (Map.Entry)iterNames.next();
+            Binding binding = (Binding )entry.getValue();
+            if (!binding.getQueue().isClustered())
+            {
+               throw new IllegalStateException("Queue is not clustered!: " + binding.getQueue().getName());
+            }
             ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
-            if (!queue.isLocal())
+            if (queue.isLocal())
             {
-               namesToRemove.add(entry);
+               throw new IllegalStateException("Queue is local!: " + binding.getQueue().getName());
             }
+            namesToRemove.add(entry);                 
          }
-      }
-
-      for (Iterator iterNames = namesToRemove.iterator();iterNames.hasNext();)
-      {
-         Map.Entry entry = (Map.Entry)iterNames.next();
-         Binding binding = (Binding)entry.getValue();
-         RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
-         this.removeBinding(nodeId,(String)entry.getKey());
-
-         this.deleteBinding(nodeId,(String)entry.getKey());
-
-         UnbindRequest unbindRequest = new UnbindRequest(nodeId, stub.getName());
-         syncSendRequest(unbindRequest);
-
-         // A failed over queue will have the flag failover, only if there isn't another local queue with the same name
-         // In case this node doesn't have that queue, we will simply assume the queue as nothing else had happened.
-         boolean failed = this.getBindingForQueueName(stub.getName())!=null;
-
-         if (!failed)
+   
+         for (Iterator iterNames = namesToRemove.iterator();iterNames.hasNext();)
          {
-            log.info("The current node didn't have a queue " + stub.getName() + " so it's assuming the queue as a regular queue");
+            Map.Entry entry = (Map.Entry)iterNames.next();
+            Binding binding = (Binding)entry.getValue();
+            RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
+            this.removeBinding(nodeId,(String)entry.getKey());
+   
+            this.deleteBinding(nodeId,(String)entry.getKey());
+   
+            UnbindRequest unbindRequest = new UnbindRequest(nodeId, stub.getName());
+            syncSendRequest(unbindRequest);
+   
+            // A failed over queue will have the flag failover set only if there isn't another local queue with the same name
+            // In case this node doesn't have that queue, we will simply assume the queue as nothing else had happened.
+            boolean failed = this.getBindingForQueueName(stub.getName()) != null;
+   
+            if (!failed)
+            {
+               log.info("The current node didn't have a queue " + stub.getName() + " so it's assuming the queue as a regular queue");
+            }
+   
+            Binding newBinding = this.createBinding(this.nodeId, binding.getCondition(),
+               stub.getName(), stub.getChannelID(),
+               stub.getFilter(), stub.isRecoverable(), failed);
+   
+            insertBinding(newBinding);
+   
+            LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
+            clusteredQueue.deactivate();
+            clusteredQueue.load();
+            clusteredQueue.activate();
+            addBinding(newBinding);
+            System.out.println("**** sending binding on " + binding.getQueue().getName() + " with condition=" + binding.getCondition());
+            sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
          }
-
-
-         Binding newBinding = this.createBinding(this.nodeId, binding.getCondition(),
-            stub.getName(), stub.getChannelID(),
-            stub.getFilter(), stub.isRecoverable(), failed);
-
-         insertBinding(newBinding);
-
-         LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
-         clusteredQueue.deactivate();
-         clusteredQueue.load();
-         clusteredQueue.activate();
-         addBinding(newBinding);
-         System.out.println("**** sending binding on " + binding.getQueue().getName() + " with condition=" + binding.getCondition());
-         sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
       }
+      finally
+      {
+         lock.writeLock().release();
+      }
    }
 
    public Binding getBindingforChannelId(long channelId) throws Exception
@@ -1160,38 +1171,33 @@
 
       try
       {
-
          Map channelMap = (Map)failedBindings.get(new Integer(nodeId));
          Binding binding = null;
-         if (channelMap!=null)
+         if (channelMap != null)
          {
             binding = (Binding)channelMap.get(new Long(channelId));
          }
 
-         if (binding==null)
+         if (binding == null)
          {
-
             Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
 
-
             if (nameMap != null)
             {
-               for (Iterator iterbindings = nameMap.values().iterator();iterbindings.hasNext();)
+               for (Iterator iterbindings = nameMap.values().iterator(); iterbindings.hasNext();)
                {
                   Binding itemBinding = (Binding)iterbindings.next();
-                  if (itemBinding.getQueue().getChannelID()==channelId)
+                  if (itemBinding.getQueue().getChannelID() == channelId)
                   {
-                     binding=itemBinding;
+                     binding = itemBinding;
                      break;
                   }
                }
-
             }
             else
             {
                log.info("nameMap is null");
             }
-
          }
          log.info("Returned " + binding);
          return binding;
@@ -1206,12 +1212,10 @@
 
    public String printBindingInformation()
    {
-
       StringWriter buffer = new StringWriter();
       PrintWriter out = new PrintWriter(buffer);
       out.print(super.printBindingInformation());
 
-
       out.println("<table border=1><tr><td>Node</td><td>ChannelID</td><td>Binding</td>");
 
       for (Iterator iter = this.failedBindings.entrySet().iterator(); iter.hasNext();)
@@ -1235,10 +1239,8 @@
          }
       }
 
-
       out.println("</table>");
 
-
       out.println("<br>Router Information");
 
       for (Iterator iterRouter = routerMap.entrySet().iterator();iterRouter.hasNext();)
@@ -1255,11 +1257,8 @@
          }
 
       }
-
-
+      
       return buffer.toString();
-
-
    }
 
 
@@ -1437,7 +1436,7 @@
             
       Message message = new Message(null, null, bytes);      
       
-      controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, 0);
+      controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
 
       if (trace) { log.info(this.nodeId + " sent and executed ok"); }
    }




More information about the jboss-cvs-commits mailing list