[jboss-cvs] JBoss Messaging SVN: r1911 - in trunk: src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice tests/src/org/jboss/test/messaging/core/plugin/postoffice

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Jan 6 01:35:14 EST 2007


Author: ovidiu.feodorov at jboss.com
Date: 2007-01-06 01:35:08 -0500 (Sat, 06 Jan 2007)
New Revision: 1911

Modified:
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java
   trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java
   trunk/src/main/org/jboss/jms/server/destination/TopicService.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
Log:
Failed-over queues are preferred when creating consumers/browsers
over a failed-over session. 

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-01-06 06:35:08 UTC (rev 1911)
@@ -136,7 +136,6 @@
       callbackHandlers = new HashMap();
    }
 
-
    // HierarchicalState implementation -------------------------------------------------------------
 
    public DelegateSupport getDelegate()

Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java	2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java	2007-01-06 06:35:08 UTC (rev 1911)
@@ -79,7 +79,7 @@
    {
       JMSCondition queueCond = new JMSCondition(true, name);
 
-      Binding binding = (Binding)postOffice.listBindingsForCondition(queueCond).iterator().next();
+      Binding binding = (Binding)postOffice.getBindingForCondition(queueCond).iterator().next();
 
       if (binding == null)
       {
@@ -99,7 +99,7 @@
    {
       JMSCondition queueCond = new JMSCondition(true, name);
 
-      Binding binding = (Binding)postOffice.listBindingsForCondition(queueCond).iterator().next();
+      Binding binding = (Binding)postOffice.getBindingForCondition(queueCond).iterator().next();
 
       if (binding == null)
       {
@@ -124,7 +124,7 @@
 
       JMSCondition queueCond = new JMSCondition(true, name);
 
-      Binding binding = (Binding)postOffice.listBindingsForCondition(queueCond).iterator().next();
+      Binding binding = (Binding)postOffice.getBindingForCondition(queueCond).iterator().next();
 
       if (binding == null)
       {

Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java	2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java	2007-01-06 06:35:08 UTC (rev 1911)
@@ -60,7 +60,7 @@
    {
       JMSCondition topicCond = new JMSCondition(false, name);
       
-      Collection subs = postOffice.listBindingsForCondition(topicCond);
+      Collection subs = postOffice.getBindingForCondition(topicCond);
       
       //XXX How to lock down all subscriptions?
       Iterator iter = subs.iterator();
@@ -76,7 +76,7 @@
    {
       JMSCondition topicCond = new JMSCondition(false, name);
       
-      Collection subs = postOffice.listBindingsForCondition(topicCond);
+      Collection subs = postOffice.getBindingForCondition(topicCond);
       
       return subs.size();         
    }
@@ -85,7 +85,7 @@
    {
       JMSCondition topicCond = new JMSCondition(false, name);
       
-      Collection subs = postOffice.listBindingsForCondition(topicCond);
+      Collection subs = postOffice.getBindingForCondition(topicCond);
       
       Iterator iter = subs.iterator();
       
@@ -108,7 +108,7 @@
    {
       JMSCondition topicCond = new JMSCondition(false, name);
       
-      Collection subs = postOffice.listBindingsForCondition(topicCond);
+      Collection subs = postOffice.getBindingForCondition(topicCond);
       
       return getSubscriptionsAsText(subs, true) + getSubscriptionsAsText(subs, false);
    }
@@ -117,7 +117,7 @@
    {
       JMSCondition topicCond = new JMSCondition(false, name);
       
-      Collection subs = postOffice.listBindingsForCondition(topicCond);
+      Collection subs = postOffice.getBindingForCondition(topicCond);
       
       return getSubscriptionsAsText(subs, durable);
    }
@@ -127,7 +127,7 @@
    {
       JMSCondition topicCond = new JMSCondition(false, name);
       
-      Collection subs = postOffice.listBindingsForCondition(topicCond);
+      Collection subs = postOffice.getBindingForCondition(topicCond);
       
       return getMessagesFromDurableSub(subs, subName, clientID, trimSelector(selector));
    }
@@ -137,7 +137,7 @@
    {
       JMSCondition topicCond = new JMSCondition(false, name);
       
-      Collection subs = postOffice.listBindingsForCondition(topicCond);
+      Collection subs = postOffice.getBindingForCondition(topicCond);
       
       return getMessagesFromNonDurableSub(subs, channelID, trimSelector(selector));
    }

Modified: trunk/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/TopicService.java	2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/server/destination/TopicService.java	2007-01-06 06:35:08 UTC (rev 1911)
@@ -73,7 +73,7 @@
          JMSCondition topicCond = new JMSCondition(false, destination.getName());
                     
          // We deploy any queues corresponding to pre-existing durable subscriptions
-         Collection bindings = postOffice.listBindingsForCondition(topicCond);
+         Collection bindings = postOffice.getBindingForCondition(topicCond);
          Iterator iter = bindings.iterator();
          while (iter.hasNext())
          {
@@ -116,7 +116,7 @@
          
          JMSCondition topicCond = new JMSCondition(false, destination.getName());         
          
-         Collection bindings = postOffice.listBindingsForCondition(topicCond);
+         Collection bindings = postOffice.getBindingForCondition(topicCond);
          
          Iterator iter = bindings.iterator();
          while (iter.hasNext())            

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-01-06 06:35:08 UTC (rev 1911)
@@ -632,6 +632,24 @@
    }
    
    // Protected ------------------------------------------------------------------------------------
+
+   /**
+    * Give access to children enpoints to the failed node ID, in case this is a failover connection.
+    * Return null if the connection is regular (not failover).
+    */
+   Integer getFailedNodeID()
+   {
+      return failedNodeID;
+   }
+
+   /**
+    * Tell children enpoints (and anybody from this package, for that matter) whether this
+    * connection is a regular or failover connection.
+    */
+   boolean isFailoverConnection()
+   {
+      return failedNodeID != null;
+   }
      
    // Private --------------------------------------------------------------------------------------
    

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-01-06 06:35:08 UTC (rev 1911)
@@ -181,7 +181,7 @@
    // SessionDelegate implementation ---------------------------------------------------------------
        
    public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
-                                                  String selectorString,
+                                                  String selector,
                                                   boolean noLocal,
                                                   String subscriptionName,
                                                   boolean isCC,
@@ -189,19 +189,18 @@
    {
       try
       {
-         if (failoverChannelID == null)
+         if (!connectionEndpoint.isFailoverConnection())
          {
             // regular consumer
-            return createConsumerDelegateInternal(jmsDestination, selectorString,
+            return createConsumerDelegateInternal(jmsDestination, selector,
                                                   noLocal, subscriptionName);
          }
-         else
-         {
-            // failover consumer
-            return createFailoverConsumerDelegateInternal(jmsDestination, selectorString,
-                                                          noLocal, subscriptionName,
-                                                          failoverChannelID);
-         }         
+
+         // we're child of a failover connection. Favor failover channels when creating new
+         // consumers
+         return createFailoverConsumerDelegateInternal(jmsDestination, selector,
+                                                       noLocal, subscriptionName,
+                                                       failoverChannelID);
       }
       catch (Throwable t)
       {
@@ -210,22 +209,20 @@
    }
       
 	public BrowserDelegate createBrowserDelegate(JBossDestination jmsDestination,
-                                                String messageSelector,
-                                                Long failoverChannelID) throws JMSException
+                                                String selector, Long failoverChannelID)
+      throws JMSException
 	{
       try
       {
-         if (failoverChannelID == null)
+         if (!connectionEndpoint.isFailoverConnection())
          {
             // regular browser
-            return createBrowserDelegateInternal(jmsDestination, messageSelector);
+            return createBrowserDelegateInternal(jmsDestination, selector);
          }
-         else
-         {
-            // failover browser
-            return createFailoverBrowserDelegateInternal(jmsDestination, messageSelector,
-                                                          failoverChannelID);
-         }
+
+         // we're child of a failover connection. Favor failover channels when creating new
+         // browsers
+         return createFailoverBrowserDelegateInternal(jmsDestination, selector, failoverChannelID);
       }
       catch (Throwable t)
       {
@@ -567,7 +564,7 @@
          else
          {
             //Topic            
-            Collection bindings = postOffice.listBindingsForCondition(new JMSCondition(false, dest.getName()));
+            Collection bindings = postOffice.getBindingForCondition(new JMSCondition(false, dest.getName()));
             
             if (!bindings.isEmpty())
             {
@@ -1019,17 +1016,23 @@
       }
       
       rec.del.acknowledge(null);    
-   } 
+   }
 
+   /**
+    * @param oldChannelID - old channel ID. It may be null when creating consumer subsequently to
+    *        the actual failover, so it's our responsibility to look the proper channel.
+    * @return
+    * @throws Exception
+    */
    private ConsumerDelegate createFailoverConsumerDelegateInternal(JBossDestination jmsDestination,
                                                                    String selectorString,
                                                                    boolean noLocal,
                                                                    String subscriptionName,
-                                                                   Long ocid)
+                                                                   Long oldChannelID)
       throws Exception
    {
       log.debug(this + " creating FAILOVER consumer for failed channel " +
-         ocid + " for " + jmsDestination +
+         oldChannelID + " for " + jmsDestination +
          (selectorString == null ? "" : ", selector '" + selectorString + "'") +
          (subscriptionName == null ? "" : ", subscription '" + subscriptionName + "'") +
          (noLocal ? ", noLocal" : ""));
@@ -1040,19 +1043,39 @@
          throw new IllegalStateException("Cannot failover on a non-clustered post office!");
       }
 
-      long oldChannelID = ocid.longValue();
-      Binding binding = ((ClusteredPostOffice)postOffice).getBindingforChannelId(oldChannelID);
+      Binding binding = null;
 
+      if (oldChannelID != null)
+      {
+         binding = postOffice.getBindingforChannelId(oldChannelID.longValue());
+      }
+      else
+      {
+         // locate the binding based on destination name and the failed node ID
+         Collection c = postOffice.
+            getBindingForCondition(new JMSCondition(true, jmsDestination.getName()));
+
+         for(Iterator i = c.iterator(); i.hasNext(); )
+         {
+            Binding b = (Binding)i.next();
+            if (connectionEndpoint.getFailedNodeID().equals(b.getFailedNodeID()))
+            {
+               binding = b;
+               break;
+            }
+         }
+      }
+
       if (binding == null)
       {
-         throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
+         throw new IllegalStateException("Can't find failed over " +
+            (oldChannelID != null ?
+               "channel " +  oldChannelID : "queue " + jmsDestination.getName()));
       }
 
       Queue newQueue = binding.getQueue();
       long newChannelID = newQueue.getChannelID();
 
-      if (trace) { log.trace(this + " failing over from channel " + oldChannelID + " to channel " + newChannelID); }
-
       int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
       int prefetchSize = connectionEndpoint.getPrefetchSize();
       
@@ -1369,9 +1392,9 @@
 
    private BrowserDelegate createFailoverBrowserDelegateInternal(JBossDestination jmsDestination,
                                                                  String selector,
-                                                                 Long ocid) throws Throwable
+                                                                 Long oldChannelID) throws Throwable
    {
-      log.debug(this + " creating FAILOVER browser for failed channel " + ocid + " for " +
+      log.debug(this + " creating FAILOVER browser for failed channel " + oldChannelID + " for " +
          jmsDestination + (selector == null ? "" : ", selector '" + selector + "'"));
 
       if (postOffice.isLocal())
@@ -1379,19 +1402,38 @@
          throw new IllegalStateException("Cannot failover on a non-clustered post office!");
       }
 
-      long oldChannelID = ocid.longValue();
+      Binding binding = null;
 
-      Binding binding = ((ClusteredPostOffice)postOffice).getBindingforChannelId(oldChannelID);
+      if (oldChannelID != null)
+      {
+         binding = postOffice.getBindingforChannelId(oldChannelID.longValue());
+      }
+      else
+      {
+         // locate the binding based on destination name and the failed node ID
+         Collection c = postOffice.
+            getBindingForCondition(new JMSCondition(true, jmsDestination.getName()));
 
+         for(Iterator i = c.iterator(); i.hasNext(); )
+         {
+            Binding b = (Binding)i.next();
+            if (connectionEndpoint.getFailedNodeID().equals(b.getFailedNodeID()))
+            {
+               binding = b;
+               break;
+            }
+         }
+      }
+
       if (binding == null)
       {
-         throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
+         throw new IllegalStateException("Can't find failed over " +
+            (oldChannelID != null ?
+               "channel " +  oldChannelID : "queue " + jmsDestination.getName()));
       }
 
       Channel newChannel = binding.getQueue();
 
-      if (trace) { log.trace(this + " failing over from channel " + oldChannelID + " to channel " + newChannel.getChannelID()); }
-
       int browserID = connectionEndpoint.getServerPeer().getNextObjectID();
 
       ServerBrowserEndpoint ep = new ServerBrowserEndpoint(this, browserID, newChannel, selector);

Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java	2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java	2007-01-06 06:35:08 UTC (rev 1911)
@@ -32,14 +32,13 @@
  * 
  * A PostOffice
  * 
- * A post office holds bindings of queues to conditions.
+ * A post office holds bindings of queues to conditions. When routing a reference, the post office
+ * routes the reference to any binding whose condition matches the condition specified in the call
+ * to route(...)
  * 
- * When routing a reference, the post office routes the reference to any binding whose
- * condition matches the condition specified in the call to route(...)
+ * Currently we only support conditions where the condition is an exact text match, and there is a
+ * single binding per queue.
  * 
- * Currently we only support conditions where the condition is an exact text match, and
- * there is a single binding per queue.
- * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -58,17 +57,12 @@
 
    /**
     * List the bindings that match the specified condition
-    * @param condition
-    * @return
-    * @throws Exception
     */
-   Collection listBindingsForCondition(Condition condition) throws Exception;
+   Collection getBindingForCondition(Condition condition) throws Exception;
+
    
    /**
-    * Get the binding for the specified queue name
-    * @param queueName
-    * @return
-    * @throws Exception
+    * Get the binding for the specified queue name.
     */
    Binding getBindingForQueueName(String queueName) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2007-01-06 06:35:08 UTC (rev 1911)
@@ -257,7 +257,7 @@
       }
    }   
    
-   public Collection listBindingsForCondition(Condition condition) throws Exception
+   public Collection getBindingForCondition(Condition condition) throws Exception
    {
       return listBindingsForConditionInternal(condition, true);
    }  

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java	2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java	2007-01-06 06:35:08 UTC (rev 1911)
@@ -247,22 +247,22 @@
             office.bindQueue(new SimpleCondition("condition2"), queue8);
          
          
-         Collection bindings = office.listBindingsForCondition(new SimpleCondition("dummy"));
+         Collection bindings = office.getBindingForCondition(new SimpleCondition("dummy"));
          assertNotNull(bindings);
          assertTrue(bindings.isEmpty());
          
          //We don't match on substrings
-         bindings = office.listBindingsForCondition(new SimpleCondition("condition123"));
+         bindings = office.getBindingForCondition(new SimpleCondition("condition123"));
          assertNotNull(bindings);
          assertTrue(bindings.isEmpty());
          
          //We don't currently support hierarchies
-         bindings = office.listBindingsForCondition(new SimpleCondition("condition1.subcondition"));
+         bindings = office.getBindingForCondition(new SimpleCondition("condition1.subcondition"));
          assertNotNull(bindings);
          assertTrue(bindings.isEmpty());
          
          //We currently just do an exact match
-         bindings = office.listBindingsForCondition(new SimpleCondition("condition1"));
+         bindings = office.getBindingForCondition(new SimpleCondition("condition1"));
          assertNotNull(bindings);
          assertEquals(4, bindings.size());
          
@@ -272,7 +272,7 @@
          assertEquivalent((Binding)iter.next(), binding3);
          assertEquivalent((Binding)iter.next(), binding4);
          
-         bindings = office.listBindingsForCondition(new SimpleCondition("condition2"));
+         bindings = office.getBindingForCondition(new SimpleCondition("condition2"));
          assertNotNull(bindings);
          assertEquals(4, bindings.size());
          
@@ -1079,9 +1079,16 @@
    protected void assertEquivalent(Binding binding1, Binding binding2)
    {
       assertEquals(binding1.getNodeID(), binding2.getNodeID());
-      assertEquals(binding1.getQueue().getName(), binding2.getQueue().getName()); 
-      String selector1 = binding1.getQueue().getFilter() != null ? binding1.getQueue().getFilter().getFilterString() : null;
-      String selector2 = binding2.getQueue().getFilter() != null ? binding2.getQueue().getFilter().getFilterString() : null;
+      assertEquals(binding1.getQueue().getName(), binding2.getQueue().getName());
+
+      String selector1 =
+         binding1.getQueue().getFilter() != null ?
+            binding1.getQueue().getFilter().getFilterString() : null;
+
+      String selector2 =
+         binding2.getQueue().getFilter() != null ?
+            binding2.getQueue().getFilter().getFilterString() : null;
+
       assertEquals(selector1, selector2);
       assertEquals(binding1.getQueue().getChannelID(), binding2.getQueue().getChannelID());
       assertEquals(binding1.getQueue().isRecoverable(), binding2.getQueue().isRecoverable());




More information about the jboss-cvs-commits mailing list