[jboss-cvs] JBoss Messaging SVN: r1549 - in branches/Branch_Client_Failover_Experiment/src: etc/server/default/deploy main/org/jboss/messaging/core/plugin/postoffice main/org/jboss/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 2 19:21:29 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-11-02 19:21:22 -0500 (Thu, 02 Nov 2006)
New Revision: 1549

Modified:
   branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultBinding.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindingInfo.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-519 - Implementing field failed into Binding, BindingInfo and clustered related operations

Modified: branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2006-11-03 00:21:22 UTC (rev 1549)
@@ -67,10 +67,10 @@
       <attribute name="DataSource">java:/DefaultDS</attribute>
       <attribute name="CreateTablesOnStartup">true</attribute>
       <attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
-INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, IS_FAILED_OVER VARCHAR(1))
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, IS_FAILED_OVER) VALUES (?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, IS_FAILED_OVER FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
       ]]></attribute>
       <attribute name="GroupName">Queue</attribute>
       <attribute name="StateTimeout">5000</attribute>
@@ -131,10 +131,10 @@
       <attribute name="DataSource">java:/DefaultDS</attribute>
       <attribute name="CreateTablesOnStartup">true</attribute>
       <attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
-INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, IS_FAILED_OVER VARCHAR(1))
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, IS_FAILED_OVER) VALUES (?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, IS_FAILED_OVER FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
       ]]></attribute>
       <attribute name="GroupName">Topic</attribute>
       <attribute name="StateTimeout">5000</attribute>

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java	2006-11-03 00:21:22 UTC (rev 1549)
@@ -39,4 +39,9 @@
    public String getCondition();
    
    public Queue getQueue();
+
+   public boolean isFailed();
+
+   public void setFailed(boolean failed);
+
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultBinding.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultBinding.java	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultBinding.java	2006-11-03 00:21:22 UTC (rev 1549)
@@ -40,35 +40,49 @@
    private String condition;
    
    private Queue queue;
-       
+
+   private boolean failed;
+
    public DefaultBinding()
-   {      
+   {
    }
 
-   public DefaultBinding(int nodeId, String condition, Queue queue)
+   public DefaultBinding(int nodeId, String condition, Queue queue, boolean failed)
    {
       this.nodeId = nodeId;
-      
-      this.condition = condition;     
-      
+
+      this.condition = condition;
+
       this.queue = queue;
+
+      this.failed = failed;
    }
-   
+
    public int getNodeId()
    {
       return nodeId;
    }
-      
+
    public String getCondition()
    {
       return condition;
    }
-   
+
    public Queue getQueue()
    {
       return queue;
    }
 
+   public boolean isFailed()
+   {
+      return failed;
+   }
+
+   public void setFailed(boolean failed)
+   {
+      this.failed = failed;
+   }
+
    public String toString()
    {
        return "Node" + nodeId + " condition=" + condition + " queue=" + queue + " queueClass=" + queue.getClass().getName();

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-11-03 00:21:22 UTC (rev 1549)
@@ -188,7 +188,7 @@
             throw new IllegalArgumentException("Binding already exists for name " + queue.getName());
          }
                  
-         binding = new DefaultBinding(nodeId, condition, queue);
+         binding = new DefaultBinding(nodeId, condition, queue, false);
          
          addBinding(binding);
                
@@ -497,9 +497,11 @@
             
             long channelId = rs.getLong(5);
 
+            boolean failed = rs.getString(6).equals("Y");
+
             log.info("PostOffice " + this.officeName + " nodeId=" + nodeId + " condition=" + condition + " queueName=" + queueName + " channelId=" + channelId + " selector=" + selector);
                                              
-            Binding binding = this.createBinding(nodeId, condition, queueName, channelId, selector, true);
+            Binding binding = this.createBinding(nodeId, condition, queueName, channelId, selector, true, failed);
             binding.getQueue().deactivate();
             
             addBinding(binding);
@@ -525,15 +527,15 @@
       }
    }
    
-   protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, String filterString, boolean durable) throws Exception
+   protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, String filterString, boolean durable, boolean failed) throws Exception
    {      
       
       Filter filter = filterFactory.createFilter(filterString);
 
-       return createBinding(nodeId, condition, queueName, channelId, filter, durable);
+       return createBinding(nodeId, condition, queueName, channelId, filter, durable, failed);
    }
 
-    protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable)
+    protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
     {
         Queue queue;
         if (nodeId == this.nodeId)
@@ -548,7 +550,7 @@
            throw new IllegalStateException("This is a non clustered post office - should not have bindings from different nodes!");
         }
 
-        Binding binding = new DefaultBinding(nodeId, condition, queue);
+        Binding binding = new DefaultBinding(nodeId, condition, queue, failed);
 
         return binding;
 
@@ -581,6 +583,7 @@
             ps.setNull(5, Types.VARCHAR);
          }
          ps.setLong(6, binding.getQueue().getChannelID());
+         ps.setString(7,binding.isFailed()?"Y":"N");
 
          ps.executeUpdate();
       }
@@ -820,12 +823,12 @@
    {                
       Map map = new LinkedHashMap();
       map.put("INSERT_BINDING",
-              "INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) " +
-              "VALUES (?, ?, ?, ?, ?, ?)");
+              "INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID, IS_FAILED_OVER) " +
+              "VALUES (?, ?, ?, ?, ?, ?, ?)");
       map.put("DELETE_BINDING",
               "DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?");
       map.put("LOAD_BINDINGS",
-              "SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE " +
+              "SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID, IS_FAILED_OVER FROM JMS_POSTOFFICE " +
               "WHERE POSTOFFICE_NAME  = ?");
       return map;
    }
@@ -836,7 +839,7 @@
       map.put("CREATE_POSTOFFICE_TABLE",
               "CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER," +
               "QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), " +
-              "SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)");
+              "SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, IS_FAILED_OVER VARCHAR(1))");
       return map;
    }
    

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java	2006-11-03 00:21:22 UTC (rev 1549)
@@ -44,16 +44,17 @@
    }
    
    BindRequest(int nodeId, String queueName, String condition, String filterString,
-               long channelId, boolean durable)
+               long channelId, boolean durable, boolean failed)
    {
       bindingInfo = new BindingInfo(nodeId, queueName, condition, filterString,
-                                    channelId, durable);
+                                    channelId, durable, failed);
    }
 
    Object execute(PostOfficeInternal office) throws Exception
    {
       office.addBindingFromCluster(bindingInfo.getNodeId(), bindingInfo.getQueueName(), bindingInfo.getCondition(),
-                                   bindingInfo.getFilterString(), bindingInfo.getChannelId(), bindingInfo.isDurable());
+                                   bindingInfo.getFilterString(), bindingInfo.getChannelId(), bindingInfo.isDurable(),
+                                   bindingInfo.isFailed());
       return null;
    }
    

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindingInfo.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindingInfo.java	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindingInfo.java	2006-11-03 00:21:22 UTC (rev 1549)
@@ -49,13 +49,15 @@
    private long channelId;   
    
    private boolean durable;
+
+   private boolean failed;
    
    BindingInfo()
    {      
    }
    
    BindingInfo(int nodeId, String queueName, String condition, String filterString,
-               long channelId, boolean durable)
+               long channelId, boolean durable, boolean failed)
    {
       this.nodeId = nodeId;
       
@@ -68,12 +70,14 @@
       this.channelId = channelId;
       
       this.durable = durable;
+
+      this.failed = failed;
    }
 
    public void execute(PostOfficeInternal office) throws Exception
    {
       office.addBindingFromCluster(nodeId, queueName, condition,
-                                   filterString, channelId, durable);
+                                   filterString, channelId, durable, failed);
       
    }
    
@@ -136,4 +140,14 @@
    {
       return queueName;
    }
+
+   public boolean isFailed()
+   {
+      return failed;
+   }
+
+   public void setFailed(boolean failed)
+   {
+      this.failed = failed;
+   }
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java	2006-11-03 00:21:22 UTC (rev 1549)
@@ -38,5 +38,5 @@
 {
    List getQueues();
    
-   ClusteredQueue[] getLocalQueue();
+   ClusteredQueue getLocalQueue();
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-03 00:21:22 UTC (rev 1549)
@@ -332,31 +332,37 @@
       }
       
       Binding binding = (Binding)super.bindQueue(condition, queue);
-      
-      BindRequest request =
-         new BindRequest(this.nodeId, queue.getName(), condition, queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
-                         binding.getQueue().getChannelID(), queue.isRecoverable());
-      
-      syncSendRequest(request);
-        
+
+      sendBindRequest(condition, queue, binding);
+
       return binding;
    }
-   
-    public Binding unbindClusteredQueue(String queueName) throws Throwable
+
+   private void sendBindRequest(String condition, LocalClusteredQueue queue, Binding binding)
+      throws Exception
    {
-      if (trace)
-      {
-         log.trace(this.nodeId + " unbind clustered queue: " + queueName);
-      }
-        
-      Binding binding = (Binding)super.unbindQueue(queueName);
-      
-      UnbindRequest request = new UnbindRequest(this.nodeId, queueName);
-      
+      BindRequest request =
+         new BindRequest(this.nodeId, queue.getName(), condition, queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
+                         binding.getQueue().getChannelID(), queue.isRecoverable(), binding.isFailed());
+
       syncSendRequest(request);
-      
-      return binding;
    }
+
+   public Binding unbindClusteredQueue(String queueName) throws Throwable
+  {
+     if (trace)
+     {
+        log.trace(this.nodeId + " unbind clustered queue: " + queueName);
+     }
+
+     Binding binding = (Binding)super.unbindQueue(queueName);
+
+     UnbindRequest request = new UnbindRequest(this.nodeId, queueName);
+
+     syncSendRequest(request);
+
+     return binding;
+  }
    
    public boolean route(MessageReference ref, String condition, Transaction tx) throws Exception
    {
@@ -547,7 +553,7 @@
     * Called when another node adds a binding
     */
    public void addBindingFromCluster(int nodeId, String queueName, String condition,
-                                     String filterString, long channelID, boolean durable)
+                                     String filterString, long channelID, boolean durable, boolean failed)
       throws Exception
    {
       lock.writeLock().acquire();
@@ -581,7 +587,7 @@
             throw new IllegalArgumentException(this.nodeId + " Binding already exists for node Id " + nodeId + " queue name " + queueName);
          }
             
-         binding = this.createBinding(nodeId, condition, queueName, channelID, filterString, durable);
+         binding = this.createBinding(nodeId, condition, queueName, channelID, filterString, durable, failed);
          
          addBinding(binding);         
       }
@@ -974,12 +980,10 @@
                   
                   //Maybe the local queue now wants to pull message(s) from the remote queue given that the 
                   //stats for the remote queue have changed
-                  LocalClusteredQueue localQueueArray[] = (LocalClusteredQueue[])router.getLocalQueue();
+                  LocalClusteredQueue localQueue = (LocalClusteredQueue)router.getLocalQueue();
 
-                  // TODO: Verify what to do with this array of local queues, since we used to have only one localQueue
-                  if (localQueueArray.length>0)
+                  if (localQueue!=null)
                   {
-                     LocalClusteredQueue localQueue = localQueueArray[0];
                      //TODO - the call to getQueues is too slow since it creates a new list and adds the local queue!!!
                      RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
                      
@@ -1095,47 +1099,59 @@
 
    public void failOver(int nodeId) throws Exception
    {
-       log.info("Preparing failover against node " + nodeId);
-       Map subMaps = (Map)nameMaps.get(new Integer(nodeId));
-       ArrayList namesToRemove = new ArrayList();
-       for (Iterator iterNames = subMaps.entrySet().iterator();iterNames.hasNext();)
-       {
-           Map.Entry entry = (Map.Entry)iterNames.next();
-           Binding binding = (Binding )entry.getValue();
-           if (binding.getQueue().isClustered())
-           {
-               ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
-               if (!queue.isLocal())
-               {
-                   namesToRemove.add(entry);
-               }
-           }
-       }
+      log.info("Preparing failover against node " + nodeId);
+      Map subMaps = (Map)nameMaps.get(new Integer(nodeId));
+      ArrayList namesToRemove = new ArrayList();
+      for (Iterator iterNames = subMaps.entrySet().iterator();iterNames.hasNext();)
+      {
+         Map.Entry entry = (Map.Entry)iterNames.next();
+         Binding binding = (Binding )entry.getValue();
+         if (binding.getQueue().isClustered())
+         {
+            ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
+            if (!queue.isLocal())
+            {
+               namesToRemove.add(entry);
+            }
+         }
+      }
 
-       for (Iterator iterNames = namesToRemove.iterator();iterNames.hasNext();)
-       {
-           Map.Entry entry = (Map.Entry)iterNames.next();
-           Binding binding = (Binding)entry.getValue();
-           RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
-           this.removeBinding(nodeId,(String)entry.getKey());
+      for (Iterator iterNames = namesToRemove.iterator();iterNames.hasNext();)
+      {
+         Map.Entry entry = (Map.Entry)iterNames.next();
+         Binding binding = (Binding)entry.getValue();
+         RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
+         this.removeBinding(nodeId,(String)entry.getKey());
 
-           this.deleteBinding(nodeId,(String)entry.getKey());
+         this.deleteBinding(nodeId,(String)entry.getKey());
 
-           // todo: remove Binding from cluster
+         UnbindRequest unbindRequest = new UnbindRequest(nodeId, stub.getName());
+         syncSendRequest(unbindRequest);
 
-           Binding newBinding = this.createBinding(this.nodeId,binding.getCondition(),
-                                     stub.getName(),stub.getChannelID(),
-                                     stub.getFilter(),stub.isRecoverable());
+         // A failed over queue will have the flag failover, only if there isn't another local queue with the same name
+         // In case this node doesn't have that queue, we will simply assume the queue as nothing else had happened.
+         boolean failed = this.getBindingForQueueName(stub.getName())!=null;
 
-           insertBinding(newBinding);
+         if (!failed)
+         {
+            log.info("The current node didn't have a queue " + stub.getName() + " so it's assuming the queue as a regular queue");
+         }
 
-           LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
-           clusteredQueue.deactivate();
-           clusteredQueue.load();
-           clusteredQueue.activate();
-           addBinding(newBinding);
-           // todo: send new Binding into Cluster
-       }
+
+         Binding newBinding = this.createBinding(this.nodeId, binding.getCondition(),
+            stub.getName(), stub.getChannelID(),
+            stub.getFilter(), stub.isRecoverable(), failed);
+
+         insertBinding(newBinding);
+
+         LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
+         clusteredQueue.deactivate();
+         clusteredQueue.load();
+         clusteredQueue.activate();
+         addBinding(newBinding);
+         System.out.println("**** sending binding on " + binding.getQueue().getName() + " with condition=" + binding.getCondition());
+         sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
+      }
    }
 
 
@@ -1282,14 +1298,7 @@
       }
    }
 
-   protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, String filterString, boolean durable) throws Exception
-   {            
-      Filter filter = filterFactory.createFilter(filterString);
-
-       return createBinding(nodeId, condition, queueName, channelId, filter, durable);
-   }
-
-    protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable)
+    protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
     {
         Queue queue;
         if (nodeId == this.nodeId)
@@ -1304,7 +1313,7 @@
            queue = new RemoteQueueStub(nodeId, queueName, channelId, durable, pm, filter);
         }
 
-        Binding binding = new DefaultBinding(nodeId, condition, queue);
+        Binding binding = new DefaultBinding(nodeId, condition, queue, failed);
 
         return binding;
     }
@@ -1424,7 +1433,8 @@
                                                binding.getCondition(),
                                                queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
                                                queue.getChannelID(),
-                                               queue.isRecoverable());    
+                                               queue.isRecoverable(),
+                                               binding.isFailed());
             bindings.add(info);
          }
       }
@@ -1458,7 +1468,8 @@
       {
          BindingInfo info = (BindingInfo)iter.next();
          
-         Binding binding = this.createBinding(info.getNodeId(), info.getCondition(), info.getQueueName(), info.getChannelId(), info.getFilterString(), info.isDurable());
+         Binding binding = this.createBinding(info.getNodeId(), info.getCondition(), info.getQueueName(), info.getChannelId(),
+                                              info.getFilterString(), info.isDurable(),info.isFailed());
          
          if (binding.getNodeId() == this.nodeId)
          {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-11-03 00:21:22 UTC (rev 1549)
@@ -55,88 +55,81 @@
    //MUST be an arraylist for fast index access
    private ArrayList nonLocalQueues;
 
-   private ArrayList localQueues;
+   private ClusteredQueue localQueue;
 
    private int target;
-   
+
    public DefaultRouter()
    {
       nonLocalQueues = new ArrayList();
-      localQueues= new ArrayList();
    }
-   
+
    public int size()
    {
-      return nonLocalQueues.size() + localQueues.size();
+      return nonLocalQueues.size() + (localQueue == null ? 0 : 1);
    }
-   
-   public ClusteredQueue[] getLocalQueue()
+
+   public ClusteredQueue getLocalQueue()
    {
-      return (ClusteredQueue[])localQueues.toArray(new LocalClusteredQueue[localQueues.size()]);
+      return localQueue;
    }
 
    public boolean add(Receiver receiver)
    {
       ClusteredQueue queue = (ClusteredQueue)receiver;
-      
+
       if (queue.isLocal())
       {
-         /**if (localQueue != null)
+         if (localQueue != null)
          {
             //throw new IllegalStateException("Already has local queue");
-            log.warn("Already has LocalQueue " + localQueue);
-         } */
-         localQueues.add(queue);
+            return true; // todo - fix this
+         }
+         localQueue = queue;
       }
       else
       {
-         nonLocalQueues.add(queue); 
+         nonLocalQueues.add(queue);
       }
-      
+
       return true;
    }
 
    public void clear()
    {
       nonLocalQueues.clear();
-      
-      localQueues.clear();
-      
+
+      localQueue = null;
+
       target = 0;
    }
 
    public boolean contains(Receiver queue)
    {
-      return localQueues.contains(queue) || nonLocalQueues.contains(queue);
+      return localQueue == queue || nonLocalQueues.contains(queue);
    }
 
    public Iterator iterator()
    {
       List queues = new ArrayList();
-      
-      queues.addAll(localQueues);
 
+      if (localQueue != null)
+      {
+         queues.add(localQueue);
+      }
+
       queues.addAll(nonLocalQueues);
-      
+
       return queues.iterator();
    }
 
    public boolean remove(Receiver queue)
-   {      
-      if (localQueues.contains(queue))
+   {
+      if (localQueue == queue)
       {
-          if (localQueues.remove(queue))
-          {
-             if (target >= localQueues.size() - 1)
-             {
-                target = localQueues.size() - 1;
-             }
-             return true;
-          }
-          else
-          {
-             return false;
-          }
+         localQueue = null;
+
+         return true;
       }
       else
       {
@@ -152,95 +145,81 @@
          {
             return false;
          }
-      }      
+      }
    }
 
    public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
    {
       if (trace) { log.trace(this + " routing ref " + reference); }
-      
+
       //Favour the local queue
-          
-      if (!localQueues.isEmpty())
+
+      if (localQueue != null)
       {
-          checkTargetLocal();
-          ClusteredQueue queue = (ClusteredQueue)localQueues.get(target);
+         //The only time the local queue won't accept is if the selector doesn't
+         //match - in which case it won't match at any other nodes too so no point
+         //in trying them
 
-          queue = (ClusteredQueue)localQueues.get(target);
+         Delivery del = localQueue.handle(observer, reference, tx);
 
-          Delivery del = queue.handle(observer, reference, tx);
+         if (trace) { log.trace(this + " routed to local queue, it returned " + del); }
 
-          if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }
-
-          incTargetLocal();
-
-          //Again, if the selector doesn't match then it won't on any others so no point trying them
-          return del;
+         return del;
       }
       else
       {
-         //There is no local shared queue 
+         //There is no local shared queue
          //We round robin among the rest
-         
+
          if (!nonLocalQueues.isEmpty())
          {
             ClusteredQueue queue = (ClusteredQueue)nonLocalQueues.get(target);
-            
+
             queue = (ClusteredQueue)nonLocalQueues.get(target);
-            
+
             Delivery del = queue.handle(observer, reference, tx);
-             
+
             if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }
-                        
+
             incTarget();
 
             //Again, if the selector doesn't match then it won't on any others so no point trying them
             return del;
-         }                  
+         }
       }
-      
+
       if (trace) { log.trace(this + " no queues to route to so return null"); }
-      
+
       return null;
    }
-   
-    private void incTarget()
-    {
-       target++;
 
-       if (target == nonLocalQueues.size())
-       {
-          target = 0;
-       }
-    }
+   private void incTarget()
+   {
+      target++;
 
-    private void incTargetLocal()
-    {
-       target++;
+      if (target == nonLocalQueues.size())
+      {
+         target = 0;
+      }
+   }
 
-        checkTargetLocal();
-    }
+   public List getQueues()
+   {
+      List queues = new ArrayList();
 
-    private void checkTargetLocal() {
-        if (target == localQueues.size())
-        {
-           target = 0;
-        }
-    }
+      if (localQueue != null)
+      {
+         queues.add(localQueue);
+      }
 
-    public List getQueues()
-    {
-       List queues = new ArrayList();
+      queues.addAll(nonLocalQueues);
 
-       queues.addAll(localQueues);
-       queues.addAll(nonLocalQueues);
+      return queues;
+   }
 
-       return queues;
-    }
-
    public int numberOfReceivers()
    {
-      return nonLocalQueues.size() + localQueues.size();
+      return nonLocalQueues.size() + (localQueue != null ? 1 : 0);
    }
 }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-11-03 00:21:22 UTC (rev 1549)
@@ -43,7 +43,7 @@
 interface PostOfficeInternal extends ClusteredPostOffice
 {
    void addBindingFromCluster(int nodeId, String queueName, String condition,
-                              String filterString, long channelId, boolean durable)
+                              String filterString, long channelId, boolean durable, boolean failed)
       throws Exception;
    
    void removeBindingFromCluster(int nodeId, String queueName)

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-11-03 00:21:22 UTC (rev 1549)
@@ -263,5 +263,9 @@
    {
       return true;
    }
-   
+
+   public String toString()
+   {
+      return "RemoteQueueStub(node=" + this.nodeId + " name=" + this.name + " channelId=" + this.id + ")";
+   }
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java	2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java	2006-11-03 00:21:22 UTC (rev 1549)
@@ -54,6 +54,8 @@
 
    Object execute(PostOfficeInternal office) throws Exception
    {
+      // TODO: Due to failoever we can't guarantee unique key by nodeId and queueName any more.
+      // We might need to revist this
       office.removeBindingFromCluster(nodeId, queueName);
       
       return null;




More information about the jboss-cvs-commits mailing list