[jboss-cvs] JBoss Messaging SVN: r1550 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss: jms/server/endpoint messaging/core/plugin/contract messaging/core/plugin/postoffice messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 2 20:05:29 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-11-02 20:05:25 -0500 (Thu, 02 Nov 2006)
New Revision: 1550

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-519 - Implementing field failedOver HashMap

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-03 01:05:25 UTC (rev 1550)
@@ -217,7 +217,8 @@
              {
                  postOfficeToUse = queuePostOffice;
              }
-             binding = postOfficeToUse.getBindingforChannelId(-1,oldchannelID);
+             // this is a Clustered operation... so postOffice here must be Clustered
+             binding = ((ClusteredPostOffice)postOfficeToUse).getBindingforChannelId(oldchannelID);
              if (binding==null)
              {
                  throw new JMSException("Can't find failed over channel " + oldchannelID);

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-11-03 01:05:25 UTC (rev 1550)
@@ -59,4 +59,6 @@
    Collection listAllBindingsForCondition(String condition) throws Exception;
 
    public void failOver(int nodeId) throws Exception;
+
+   Binding getBindingforChannelId(long channelId) throws Exception;
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java	2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java	2006-11-03 01:05:25 UTC (rev 1550)
@@ -70,8 +70,6 @@
     */
    Binding getBindingForQueueName(String queueName) throws Exception;
 
-   Binding getBindingforChannelId(int parameterNodeId, long channelId) throws Exception;    
-
    /**
     * Route a reference.
     * @param ref

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-11-03 01:05:25 UTC (rev 1550)
@@ -280,47 +280,6 @@
       }
    }
 
-   public Binding getBindingforChannelId(int parameterNodeId, long channelId) throws Exception
-   {
-       if (parameterNodeId<0) parameterNodeId=this.nodeId;
-      log.info("DefaultPostOffice::getBindingforChannelId(" + parameterNodeId + " ," + channelId + ")");
-
-       lock.readLock().acquire();
-
-       try
-       {
-          Map nameMap = (Map)nameMaps.get(new Integer(parameterNodeId));
-
-          Binding binding = null;
-
-          if (nameMap != null)
-          {
-             for (Iterator iterbindings = nameMap.values().iterator();iterbindings.hasNext();)
-             {
-                 Binding itemBinding = (Binding)iterbindings.next();
-                 if (itemBinding.getQueue().getChannelID()==channelId)
-                 {
-                     binding=itemBinding;
-                     break;
-                 }
-             }
-
-          }
-          else
-          {
-              log.info("nameMap is null");
-          }
-
-
-          log.info("Returned " + binding);
-          return binding;
-       }
-       finally
-       {
-          lock.readLock().release();
-       }
-   }
-   
    public void recover() throws Exception
    {
       //NOOP
@@ -736,15 +695,15 @@
    protected void addToNameMap(Binding binding)
    {
       Map nameMap = (Map)nameMaps.get(new Integer(binding.getNodeId()));
-      
+
       if (nameMap == null)
       {
          nameMap = new LinkedHashMap();
-         
+
          nameMaps.put(new Integer(binding.getNodeId()), nameMap);
       }
-      
-      nameMap.put(binding.getQueue().getName(), binding);      
+
+      nameMap.put(binding.getQueue().getName(), binding);
    }
    
    protected void addToConditionMap(Binding binding)

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java	2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java	2006-11-03 01:05:25 UTC (rev 1550)
@@ -24,6 +24,7 @@
 import java.util.List;
 
 import org.jboss.messaging.core.Router;
+import org.jboss.messaging.core.Receiver;
 
 /**
  * A ClusterRouter
@@ -37,6 +38,9 @@
 public interface ClusterRouter extends Router
 {
    List getQueues();
-   
+
    ClusteredQueue getLocalQueue();
+
+   boolean add(Receiver receiver, boolean failedOver);
+
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-03 01:05:25 UTC (rev 1550)
@@ -22,13 +22,7 @@
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
 import java.io.*;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
@@ -129,7 +123,11 @@
    private ClusterRouterFactory routerFactory;
    
    private Map routerMap;
-   
+
+   /** List of failed over bindings.
+    *  Map<int nodeId, Map<channelId,Binding>>*/
+   private Map failedBindings;
+
    private StatsSender statsSender;
    
    private boolean started;
@@ -227,6 +225,8 @@
       this.routerFactory = rf;
       
       routerMap = new HashMap();
+
+      failedBindings = new LinkedHashMap();
       
       statsSender = new StatsSender(this, statsSendPeriod);
       
@@ -582,7 +582,7 @@
             binding = (Binding)nameMap.get(queueName);
          }
          
-         if (binding != null)
+         if (binding != null && failed)
          {
             throw new IllegalArgumentException(this.nodeId + " Binding already exists for node Id " + nodeId + " queue name " + queueName);
          }
@@ -1154,8 +1154,56 @@
       }
    }
 
+   public Binding getBindingforChannelId(long channelId) throws Exception
+   {
+      lock.readLock().acquire();
 
+      try
+      {
 
+         Map channelMap = (Map)failedBindings.get(new Integer(nodeId));
+         Binding binding = null;
+         if (channelMap!=null)
+         {
+            binding = (Binding)channelMap.get(new Long(channelId));
+         }
+
+         if (binding==null)
+         {
+
+            Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
+
+
+            if (nameMap != null)
+            {
+               for (Iterator iterbindings = nameMap.values().iterator();iterbindings.hasNext();)
+               {
+                  Binding itemBinding = (Binding)iterbindings.next();
+                  if (itemBinding.getQueue().getChannelID()==channelId)
+                  {
+                     binding=itemBinding;
+                     break;
+                  }
+               }
+
+            }
+            else
+            {
+               log.info("nameMap is null");
+            }
+
+         }
+         log.info("Returned " + binding);
+         return binding;
+      }
+      finally
+      {
+         lock.readLock().release();
+      }
+   }
+
+
+
    public String printBindingInformation()
    {
 
@@ -1190,36 +1238,65 @@
 
      
    // Protected ---------------------------------------------------------------------------------------
-        
+
+
+   protected void addToNameMap(Binding binding)
+   {
+      if (!binding.isFailed())
+      {
+         super.addToNameMap(binding);
+      }
+      else
+      {
+         addIntoFailedMaps(binding);
+      }
+   }
+
+   private void addIntoFailedMaps(Binding binding)
+   {
+      Map channelMap = (Map)failedBindings.get(new Integer(binding.getNodeId()));
+
+      if (channelMap == null)
+      {
+         channelMap = new LinkedHashMap();
+
+         failedBindings.put(new Integer(binding.getNodeId()), failedBindings);
+      }
+
+      channelMap.put(new Long(binding.getQueue().getChannelID()), binding);
+   }
+
+
    protected void addToConditionMap(Binding binding)
    {
       String condition = binding.getCondition();
-      
+
       ClusteredBindings bindings = (ClusteredBindings)conditionMap.get(condition);
-      
+
       if (bindings == null)
       {
          bindings = new DefaultClusteredBindings(nodeId);
-         
+
          conditionMap.put(condition, bindings);
       }
-      
+
       bindings.addBinding(binding);
-      
+
       String queueName = binding.getQueue().getName();
-      
+
       ClusterRouter router = (ClusterRouter)routerMap.get(queueName);
-      
+
       if (router == null)
       {
          router = routerFactory.createRouter();
-         
+
          routerMap.put(queueName, router);
-         
+
          bindings.addRouter(queueName, router);
       }
-      
-      router.add(binding.getQueue());                
+
+      // todo: Maybe we should have isFailed as a property of Queue instead of Binding, so we won't need to change this signature.
+      router.add(binding.getQueue(),binding.isFailed());
    }
 
    protected void removeFromConditionMap(Binding binding)

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-11-03 01:05:25 UTC (rev 1550)
@@ -55,6 +55,8 @@
    //MUST be an arraylist for fast index access
    private ArrayList nonLocalQueues;
 
+   private ArrayList failedOverQueues;
+
    private ClusteredQueue localQueue;
 
    private int target;
@@ -62,6 +64,7 @@
    public DefaultRouter()
    {
       nonLocalQueues = new ArrayList();
+      failedOverQueues = new ArrayList();
    }
 
    public int size()
@@ -76,16 +79,27 @@
 
    public boolean add(Receiver receiver)
    {
+      return add(receiver,false);
+   }
+
+   public boolean add(Receiver receiver, boolean failedOver)
+   {
       ClusteredQueue queue = (ClusteredQueue)receiver;
 
       if (queue.isLocal())
       {
-         if (localQueue != null)
+         if (failedOver)
          {
-            //throw new IllegalStateException("Already has local queue");
-            return true; // todo - fix this
+            failedOverQueues.add(receiver);
          }
-         localQueue = queue;
+         else
+         {
+            if (localQueue != null)
+            {
+               throw new IllegalStateException("Already has local queue");
+            }
+            localQueue = queue;
+         }
       }
       else
       {




More information about the jboss-cvs-commits mailing list