[jboss-cvs] JBoss Messaging SVN: r1981 - in trunk: src/etc/xmdesc and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 18 11:12:55 EST 2007


Author: clebert.suconic at jboss.com
Date: 2007-01-18 11:12:55 -0500 (Thu, 18 Jan 2007)
New Revision: 1981

Added:
   trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingFactory.java
   trunk/src/main/org/jboss/jms/client/plugin/RoundRobbinLoadBalancingFactory.java
   trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java
   trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryUpdateMessage.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringAspectInternalTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/ConnectionFactoryUpdateTest.java
Modified:
   trunk/src/etc/server/default/deploy/connection-factories-service.xml
   trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml
   trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingPolicy.java
   trunk/src/main/org/jboss/jms/client/plugin/RoundRobinLoadBalancingPolicy.java
   trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
   trunk/src/main/org/jboss/jms/server/ConnectionManager.java
   trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
   trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-674

Modified: trunk/src/etc/server/default/deploy/connection-factories-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/connection-factories-service.xml	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/etc/server/default/deploy/connection-factories-service.xml	2007-01-18 16:12:55 UTC (rev 1981)
@@ -25,7 +25,7 @@
       </attribute>
       
       <attribute name="Clustered">true</attribute>
-      <attribute name="LoadBalancingPolicy">org.jboss.jms.client.plugin.RoundRobinLoadBalancingPolicy</attribute>
+      <attribute name="LoadBalancingFactory">org.jboss.jms.client.plugin.RoundRobbinLoadBalancingFactory</attribute>
    </mbean>
 
 </server>
\ No newline at end of file

Modified: trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml	2007-01-18 16:12:55 UTC (rev 1981)
@@ -88,10 +88,10 @@
       <type>boolean</type>
    </attribute>
 
-   <attribute access="read-write" getMethod="getLoadBalancingPolicy" setMethod="setLoadBalancingPolicy">
-      <description>The pluggable load balancing policy that is used to decide the next cluster node to create a clustered connection to</description>
-      <name>LoadBalancingPolicy</name>
-      <type>org.jboss.jms.client.plugin.LoadBalancingPolicy</type>
+   <attribute access="read-write" getMethod="getLoadBalancingFactory" setMethod="setLoadBalancingFactory">
+      <description>The pluggable load balancing policy factory that is used to decide the next cluster node to create a clustered connection to</description>
+      <name>LoadBalancingFactory</name>
+      <type>org.jboss.jms.client.plugin.LoadBalancingFactory</type>
    </attribute>
 
    <!-- Managed operations -->

Modified: trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/container/ClusteringAspect.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -71,14 +71,6 @@
       if (clusteredDelegate == null)
       {
          clusteredDelegate = (ClientClusteredConnectionFactoryDelegate)invocation.getTargetObject();
-
-         // TODO JBMESSAGING-674 - for the time being we assume that the cluster view is static, so 
-         //      we only initialize the load balancing policy here. This is obviously not true,
-         //      we'll need to review this when working on
-         //      http://jira.jboss.org/jira/browse/JBMESSAGING-674
-
-         clusteredDelegate.getLoadBalancingPolicy().
-            updateView(Arrays.asList(clusteredDelegate.getDelegates()));
       }
 
       // the method handles both the case of a first connection creation attempt and a retry during
@@ -215,6 +207,15 @@
       Map failoverMap = clusteredDelegate.getFailoverMap();
       Integer failoverNodeID = (Integer)failoverMap.get(nodeID);
 
+      // FailoverNodeID is not on the map, that means the ConnectionFactory was updated
+      // by another connection in another server.. So we will have to guess the failoverID
+      // by numeric order. Case we guessed the new server wrongly we will have to rely on
+      // redirect from failover
+      if (failoverNodeID == null)
+      {
+         failoverNodeID = guessFailoverID(failoverMap, nodeID);
+      }
+
       for (int i = 0; i < delegates.length; i++)
       {
          if (delegates[i].getServerID() == failoverNodeID.intValue())
@@ -226,6 +227,37 @@
       return null;
    }
 
+   /** FailoverNodeID is not on the map, that means the ConnectionFactory was updated
+     * by another connection in another server.. So we will have to guess the failoverID
+     * by numeric order. Case we guessed the new server wrongly we will have to rely on
+     * redirect from failover.
+     * (NOTE:
+     *  There is a testcase that uses reflection to validate this method at
+     *  org.jboss.test.messaging.jms.clustering.ClusteringAspectInternalTest
+     *  Modify that testcase case you decide to refactor this method)
+     */
+   private static Integer guessFailoverID(Map failoverMap, Integer nodeID)
+   {
+      Integer failoverNodeID = null;
+      Integer[] nodes = (Integer[]) failoverMap.keySet().toArray(new Integer[failoverMap.size()]);
+      // We need to sort the array first
+      Arrays.sort(nodes);
+      for (int i = 0; i < nodes.length; i++)
+      {
+         if (nodeID.intValue() < nodes[i].intValue())
+         {
+            failoverNodeID = nodes[i];
+            break;
+         }
+      }
+      // if still null use the first node...
+      if (failoverNodeID == null)
+      {
+         failoverNodeID = nodes[0];
+      }
+      return failoverNodeID;
+   }
+
    private synchronized ClientConnectionFactoryDelegate getDelegateForNode(int nodeID)
    {
       ClientConnectionFactoryDelegate[] delegates = clusteredDelegate.getDelegates();

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -132,6 +132,7 @@
    public void setDelegates(ClientConnectionFactoryDelegate[] dels)
    {
       this.delegates = dels;
+      getLoadBalancingPolicy().updateView(dels);
    }
 
    public Map getFailoverMap()
@@ -149,6 +150,20 @@
       return loadBalancingPolicy;
    }
 
+   /** Method used to update the delegate and failoverMap during viewChange */
+   public synchronized void updateFailoverInfo(ClientConnectionFactoryDelegate[] delegates,
+                                               Map failoverMap)
+   {
+      for (int i = 0; i < delegates.length; i++)
+      {
+         delegates[i].init();
+      }
+      this.delegates = delegates;
+      this.failoverMap = failoverMap;
+      getLoadBalancingPolicy().updateView(delegates);
+   }
+
+
    public String toString()
    {
       StringBuffer sb = new StringBuffer("ClusteredConnectionFactoryDelegate[");

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -69,7 +69,7 @@
 
    //This data is needed in order to create a connection
    private String serverLocatorURI;
- 
+
    private Version serverVersion;
  
    private int serverID;
@@ -284,6 +284,8 @@
             connectionDelegate.setRemotingConnection(remotingConnection);
             
             connectionDelegate.setVersionToUse(version);
+
+            remotingConnection.getCallbackManager().setConnectionDelegate(connectionDelegate);
          }
          else
          {
@@ -326,7 +328,7 @@
    {
       return serverVersion;
    }
-   
+
    public void synchronizeWith(DelegateSupport newDelegate) throws Exception
    {
       super.synchronizeWith(newDelegate);

Added: trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingFactory.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,69 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.client.plugin;
+
+import org.jboss.jms.delegate.ConnectionFactoryDelegate;
+import java.io.Serializable;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          $Id$
+ */
+public abstract class LoadBalancingFactory implements Serializable
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   private static final long serialVersionUID = -22502384102786964L;
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   static LoadBalancingFactory theInstance = new RoundRobbinLoadBalancingFactory();
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   // Public ---------------------------------------------------------------------------------------
+
+   /**
+    * A LoadBalancingPolicy will work on top of ConnectionFactoryDelegate
+    */
+   public abstract LoadBalancingPolicy createLoadBalancingPolicy(ConnectionFactoryDelegate[] view);
+
+   public static LoadBalancingFactory getDefaultFactory()
+   {
+      return theInstance;
+   }
+
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+}


Property changes on: trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingFactory.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingPolicy.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingPolicy.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/plugin/LoadBalancingPolicy.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -9,7 +9,6 @@
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 
 import java.io.Serializable;
-import java.util.List;
 
 /**
  * The interface that must be implemented by any load balancing policy plugin.
@@ -23,11 +22,13 @@
    static final long serialVersionUID = 328573973957394573L;
 
    ConnectionFactoryDelegate getNext();
-
+   
    /**
+    * This method should be called when updating the LoadBalancingFactory
     * @param delegates - a List<ConnectionFactoryDelegate> representing the lastest cluster view
     *        to chose delegates from
     */
-   void updateView(List delegates);
+   void updateView(ConnectionFactoryDelegate[] delegates);
 
+
 }

Added: trunk/src/main/org/jboss/jms/client/plugin/RoundRobbinLoadBalancingFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/plugin/RoundRobbinLoadBalancingFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/client/plugin/RoundRobbinLoadBalancingFactory.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,61 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.client.plugin;
+
+import org.jboss.jms.delegate.ConnectionFactoryDelegate;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          <p/>
+ *          $Id$
+ */
+public class RoundRobbinLoadBalancingFactory extends LoadBalancingFactory
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   private static final long serialVersionUID = -6304457304470491564L;
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public LoadBalancingPolicy createLoadBalancingPolicy(ConnectionFactoryDelegate[] view)
+   {
+      return new RoundRobinLoadBalancingPolicy(view);
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+}


Property changes on: trunk/src/main/org/jboss/jms/client/plugin/RoundRobbinLoadBalancingFactory.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/jms/client/plugin/RoundRobinLoadBalancingPolicy.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/plugin/RoundRobinLoadBalancingPolicy.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/plugin/RoundRobinLoadBalancingPolicy.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -21,6 +21,8 @@
 {
    // Constants ------------------------------------------------------------------------------------
 
+   private static final long serialVersionUID = 5215940403016586462L;
+   
    // Static ---------------------------------------------------------------------------------------
 
    // Attributes -----------------------------------------------------------------------------------
@@ -28,27 +30,31 @@
    // The index of the next delegate to be used
    private int next;
 
-   // List<ConnectionFactoryDelegate>
-   private List view;
+   private ConnectionFactoryDelegate[] delegates;
 
    // Constructors ---------------------------------------------------------------------------------
 
+   public RoundRobinLoadBalancingPolicy(ConnectionFactoryDelegate[] delegates)
+   {
+      this.delegates = delegates;
+   }
+
    // LoadBalancingPolicy implementation -----------------------------------------------------------
 
    public synchronized ConnectionFactoryDelegate getNext()
    {
-      if (next >= view.size())
+      if (next >= delegates.length)
       {
          next = 0;
       }
 
-      return (ConnectionFactoryDelegate)view.get(next++);
+      return delegates[next++];
    }
 
-   public synchronized void updateView(List delegates)
+   public synchronized void updateView(ConnectionFactoryDelegate[] delegates)
    {
-      view = new ArrayList(delegates);
       next = 0;
+      this.delegates = delegates;
    }
 
    // Public ---------------------------------------------------------------------------------------

Added: trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,38 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.client.remoting;
+
+import org.jboss.jms.message.MessageProxy;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ *          $Id$
+ */
+public interface CallbackHandler
+{
+
+   void handleMessage(Object message);
+   
+}


Property changes on: trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -26,6 +26,7 @@
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.jms.server.endpoint.ClientDelivery;
 import org.jboss.jms.server.remoting.MessagingMarshallable;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.logging.Logger;
 import org.jboss.remoting.callback.Callback;
 import org.jboss.remoting.callback.HandleCallbackException;
@@ -73,6 +74,7 @@
 
    // Map<Long(lookup)-MessageCallbackHandler>
    protected Map callbackHandlers;
+   protected ConnectionFactoryCallbackHandler connectionfactoryCallbackHandler;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -85,23 +87,33 @@
 
    public void handleCallback(Callback callback) throws HandleCallbackException
    {
-      MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
-      ClientDelivery dr = (ClientDelivery)mm.getLoad();
-      MessageProxy msg = dr.getMessage();
+      if (callback.getParameter() instanceof MessagingMarshallable)
+      {
+         MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
+         ClientDelivery dr = (ClientDelivery)mm.getLoad();
+         MessageProxy msg = dr.getMessage();
 
-      MessageCallbackHandler handler =
-         (MessageCallbackHandler)callbackHandlers.get(new Integer(dr.getConsumerId()));
+         MessageCallbackHandler handler =
+            (MessageCallbackHandler)callbackHandlers.get(new Integer(dr.getConsumerId()));
 
-      if (handler == null)
+         if (handler == null)
+         {
+            //This is OK and can happen if the callback handler is deregistered on consumer close,
+            //but there are messages still in transit which arrive later.
+            //In this case it is just safe to ignore the message
+            if (trace) { log.trace(this + " callback handler not found, message arrived after consumer is closed"); }
+            return;
+         }
+
+         handler.handleMessage(msg);
+      } else
       {
-         //This is OK and can happen if the callback handler is deregistered on consumer close,
-         //but there are messages still in transit which arrive later.
-         //In this case it is just safe to ignore the message
-         if (trace) { log.trace(this + " callback handler not found, message arrived after consumer is closed"); }
-         return;
+         log.trace("Receiving connectionFactoryUpdateMessage - " + callback.getParameter());
+         if (connectionfactoryCallbackHandler != null)
+         {
+            connectionfactoryCallbackHandler.handleMessage(callback.getParameter());
+         }
       }
-
-      handler.handleMessage(msg);
    }
 
    // Public ---------------------------------------------------------------------------------------
@@ -111,6 +123,12 @@
       callbackHandlers.put(new Integer(consumerID), handler);
    }
 
+   public void setConnectionDelegate (ClientConnectionDelegate connectionDelegate)
+   {
+      this.connectionfactoryCallbackHandler =
+         new ConnectionFactoryCallbackHandler(connectionDelegate);
+   }
+
    public MessageCallbackHandler unregisterHandler(int consumerID)
    { 
       return (MessageCallbackHandler)callbackHandlers.remove(new Integer(consumerID));

Added: trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,99 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.client.remoting;
+
+import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.server.endpoint.ConnectionFactoryUpdateMessage;
+import org.jboss.logging.Logger;
+
+/**
+ * This class will manage ConnectionFactory messages updates
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          <p/>
+ *          $Id$
+ */
+public class ConnectionFactoryCallbackHandler implements CallbackHandler
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   ClientConnectionDelegate connectionDelegate;
+   ConnectionState state;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   protected static final Logger log = Logger.getLogger(ConnectionFactoryCallbackHandler.class);
+   private static boolean trace = log.isTraceEnabled();
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ConnectionFactoryCallbackHandler(ClientConnectionDelegate connectionDelegate)
+   {
+      this.connectionDelegate = connectionDelegate;
+      this.state = (ConnectionState)connectionDelegate.getState();
+   }
+
+
+   // Implementation of CallbackHandler ------------------------------------------------------------
+   public void handleMessage(Object message)
+   {
+      ConnectionFactoryUpdateMessage updateMessage = (ConnectionFactoryUpdateMessage) message;
+
+
+      if (getState().getClusteredConnectionFactoryDelegate() != null &&
+          getState().getClusteredConnectionFactoryDelegate()
+             instanceof ClientClusteredConnectionFactoryDelegate)
+      {
+         ClientClusteredConnectionFactoryDelegate delegate =
+            (ClientClusteredConnectionFactoryDelegate) getState().
+               getClusteredConnectionFactoryDelegate();
+
+         delegate.updateFailoverInfo(updateMessage.getDelegates(), updateMessage.getFailoverMap());
+      }
+
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   protected ConnectionState getState()
+   {
+      if (state==null)
+      {
+         this.state = (ConnectionState)connectionDelegate.getState();
+      }
+      return this.state;
+   }
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+}


Property changes on: trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -51,7 +51,7 @@
  *
  * $Id$
  */
-public class MessageCallbackHandler
+public class MessageCallbackHandler implements CallbackHandler
 {
    // Constants -----------------------------------------------------
    
@@ -250,10 +250,12 @@
 
    /**
     * Handles a message sent from the server
-    * @param msg The message
+    * @param message The message
     */
-   public void handleMessage(MessageProxy msg)
-   {                      
+   public void handleMessage(Object message)
+   {
+      MessageProxy msg = (MessageProxy) message;
+
       if (trace) { log.trace("Receiving message " + msg + " from the remoting layer"); }
 
       synchronized (mainLock)

Modified: trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/ConnectionFactoryManager.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -22,7 +22,7 @@
 package org.jboss.jms.server;
 
 import org.jboss.jms.server.connectionfactory.JNDIBindings;
-import org.jboss.jms.client.plugin.LoadBalancingPolicy;
+import org.jboss.jms.client.plugin.LoadBalancingFactory;
 import org.jboss.messaging.core.plugin.contract.MessagingComponent;
 
 /**
@@ -45,7 +45,7 @@
                                  int defaultTempQueuePageSize,
                                  int defaultTempQueueDownCacheSize,
                                  boolean clustered,
-                                 LoadBalancingPolicy loadBalancingPolicy) throws Exception;
+                                 LoadBalancingFactory loadBalancingPolicy) throws Exception;
 
    void unregisterConnectionFactory(String uniqueName, boolean clustered) throws Exception;
 }

Modified: trunk/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionManager.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/ConnectionManager.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -23,6 +23,7 @@
 
 import org.jboss.jms.server.endpoint.ConnectionEndpoint;
 import org.jboss.messaging.core.plugin.contract.MessagingComponent;
+import java.util.List;
 
 
 /**
@@ -44,4 +45,11 @@
    void handleClientFailure(String remotingSessionID);
    
    boolean containsSession(String remotingClientSessionID);
+
+   /**
+    * Returns a list of active connections on this Manager.
+    * The implementation should make a copy of the list to avoid ConcurrentModificationException
+    * @return
+    */
+   List getActiveConnectionsList();
 }

Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -12,8 +12,7 @@
 import org.jboss.jms.server.ConnectorManager;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.util.ExceptionUtil;
-import org.jboss.jms.client.plugin.LoadBalancingPolicy;
-import org.jboss.jms.client.plugin.RoundRobinLoadBalancingPolicy;
+import org.jboss.jms.client.plugin.LoadBalancingFactory;
 import org.jboss.remoting.InvokerLocator;
 import org.jboss.system.ServiceMBeanSupport;
 import org.w3c.dom.Element;
@@ -39,7 +38,7 @@
    protected JNDIBindings jndiBindings;
    protected int prefetchSize = 150;
    protected boolean clustered;
-   protected LoadBalancingPolicy loadBalancingPolicy;
+   protected LoadBalancingFactory loadBalancingFactory;
    
    protected int defaultTempQueueFullSize = 75000;
    protected int defaultTempQueuePageSize = 2000;
@@ -67,7 +66,7 @@
       this.clientID = clientID;
 
       // by default, a clustered connection uses a round-robin load balancing policy
-      this.loadBalancingPolicy = new RoundRobinLoadBalancingPolicy();
+      this.loadBalancingFactory = LoadBalancingFactory.getDefaultFactory();
    }
 
    // ServiceMBeanSupport overrides ---------------------------------
@@ -123,7 +122,7 @@
                                       locatorURI, enablePing, prefetchSize,
                                       defaultTempQueueFullSize, defaultTempQueuePageSize,
                                       defaultTempQueueDownCacheSize, clustered,
-                                      loadBalancingPolicy);
+                                      loadBalancingFactory);
       
          InvokerLocator locator = new InvokerLocator(locatorURI);
 
@@ -276,19 +275,19 @@
       this.clustered = clustered;
    }
 
-   public LoadBalancingPolicy getLoadBalancingPolicy()
+   public LoadBalancingFactory getLoadBalancingFactory()
    {
-      return loadBalancingPolicy;
+      return loadBalancingFactory;
    }
 
-   public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy)
+   public void setLoadBalancingFactory(LoadBalancingFactory loadBalancingFactory)
    {
       if (started)
       {
          log.warn("Load balancing policy can only be changed when connection factory is stopped");
          return;
       }
-      this.loadBalancingPolicy = loadBalancingPolicy;
+      this.loadBalancingFactory = loadBalancingFactory;
    }
 
    // JMX managed operations ----------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -38,12 +38,15 @@
 
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.client.plugin.LoadBalancingPolicy;
+import org.jboss.jms.client.plugin.LoadBalancingFactory;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
 import org.jboss.jms.server.ConnectionFactoryManager;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.Version;
 import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
+import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ConnectionFactoryUpdateMessage;
 import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
 import org.jboss.jms.server.remoting.JMSDispatcher;
 import org.jboss.jms.util.JNDIUtil;
@@ -53,6 +56,7 @@
 import org.jboss.messaging.core.plugin.contract.Replicator;
 import org.jboss.messaging.core.plugin.contract.FailoverMapper;
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.remoting.callback.Callback;
 
 /**
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -103,7 +107,7 @@
    // ConnectionFactoryManager implementation -----------------------
 
    /**
-    * @param loadBalancingPolicy - ignored for non-clustered connection factories.
+    * @param loadBalancingFactory - ignored for non-clustered connection factories.
     */
    public synchronized void registerConnectionFactory(String uniqueName,
                                                       String clientID,
@@ -115,7 +119,7 @@
                                                       int defaultTempQueuePageSize,
                                                       int defaultTempQueueDownCacheSize,
                                                       boolean clustered,
-                                                      LoadBalancingPolicy loadBalancingPolicy)
+                                                      LoadBalancingFactory loadBalancingFactory)
       throws Exception
    {
       log.debug(this + " registering connection factory '" + uniqueName +
@@ -170,7 +174,7 @@
          // Create a clustered delegate
 
          Map localDelegates = replicator.get(CF_PREFIX + uniqueName);
-         delegate = createClusteredDelegate(localDelegates.values(), loadBalancingPolicy);
+         delegate = createClusteredDelegate(localDelegates.values(), loadBalancingFactory);
 
          log.debug(this + " created clustered delegate " + delegate);
       }
@@ -324,7 +328,8 @@
 
             String uniqueName = sKey.substring(CF_PREFIX.length());
 
-            log.debug(this + " received '" + uniqueName + "' connection factory update " + updatedReplicantMap);
+            log.debug(this + " received '" + uniqueName +
+                                "' connection factory update " + updatedReplicantMap);
 
             ClientClusteredConnectionFactoryDelegate del =
                (ClientClusteredConnectionFactoryDelegate)delegates.get(uniqueName);
@@ -351,6 +356,8 @@
             }
 
             rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
+
+            endpoint.updateClusteredClients(delArr, failoverMap);
          }
       }
       catch (Exception e)
@@ -397,7 +404,7 @@
     * @param localDelegates - Collection<ClientConnectionFactoryDelegate>
     */
    private ClientClusteredConnectionFactoryDelegate
-      createClusteredDelegate(Collection localDelegates, LoadBalancingPolicy loadBalancingPolicy)
+      createClusteredDelegate(Collection localDelegates, LoadBalancingFactory loadBalancingFactory)
       throws Exception
    {
       log.trace(this + " creating a clustered ConnectionFactoryDelegate based on " + localDelegates);
@@ -425,7 +432,8 @@
 
       return new ClientClusteredConnectionFactoryDelegate(delegates,
                                                           failoverMap,
-                                                          loadBalancingPolicy);
+                                                          loadBalancingFactory.
+                                                             createLoadBalancingPolicy(delegates));
    }
 
    private void rebindConnectionFactory(Context ic, JNDIBindings jndiBindings,

Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -27,6 +27,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
 
 import javax.jms.JMSException;
 
@@ -58,7 +60,9 @@
    protected Map jmsClients;
    
    protected Map sessions;
-   
+
+   protected Set activeConnections; 
+
    // Constructors --------------------------------------------------
 
    public SimpleConnectionManager()
@@ -66,6 +70,8 @@
       jmsClients = new HashMap();
       
       sessions = new HashMap();
+
+      activeConnections = new HashSet();
    }
 
    // ConnectionManager ---------------------------------------------
@@ -83,6 +89,8 @@
       endpoints.put(remotingClientSessionID, endpoint);
       
       sessions.put(remotingClientSessionID, jmsClientVMId);
+
+      activeConnections.add(endpoint);
       
       log.debug("registered connection " + endpoint + " as " +
                 Util.guidToString(remotingClientSessionID));
@@ -95,7 +103,12 @@
       if (endpoints != null)
       {
          ConnectionEndpoint e = (ConnectionEndpoint)endpoints.remove(remotingClientSessionID);
-         
+
+         if (e != null)
+         {
+            endpoints.remove(e);
+         }
+
          log.debug("unregistered connection " + e + " with remoting session ID " +
                Util.guidToString(remotingClientSessionID));
          
@@ -174,7 +187,15 @@
    {
       return sessions.containsKey(remotingClientSessionID);
    }
-   
+
+   public synchronized List getActiveConnectionsList()
+   {
+      // I will make a copy to avoid ConcurrentModification
+      ArrayList list = new ArrayList();
+      list.addAll(activeConnections);
+      return list;
+   }
+
    /*
     * Used in testing only
     */

Added: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryUpdateMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryUpdateMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryUpdateMessage.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,90 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.server.endpoint;
+
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import java.util.Map;
+import java.io.Serializable;
+
+/**
+ * This class holds the updated information about the server to ConnectionFactories
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          
+ *          $Id$
+ */
+public class ConnectionFactoryUpdateMessage implements Serializable
+{
+   
+
+   // Constants ------------------------------------------------------------------------------------
+
+   static final long serialVersionUID = 7978093036163402989L;
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private ClientConnectionFactoryDelegate[] delegates;
+   private Map failoverMap;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ConnectionFactoryUpdateMessage(ClientConnectionFactoryDelegate[] delegates,
+                                         Map failoverMap)
+   {
+      this.delegates = delegates;
+      this.failoverMap = failoverMap;
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public ClientConnectionFactoryDelegate[] getDelegates()
+   {
+      return delegates;
+   }
+
+   public void setDelegates(ClientConnectionFactoryDelegate[] delegates)
+   {
+      this.delegates = delegates;
+   }
+
+   public Map getFailoverMap()
+   {
+      return failoverMap;
+   }
+
+   public void setFailoverMap(Map failoverMap)
+   {
+      this.failoverMap = failoverMap;
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+}


Property changes on: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryUpdateMessage.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -114,6 +114,7 @@
    private int defaultTempQueueFullSize;
    private int defaultTempQueuePageSize;
    private int defaultTempQueueDownCacheSize;
+   private ServerConnectionFactoryEndpoint cfendpoint;
 
    private byte usingVersion;
 
@@ -126,14 +127,17 @@
     * @param failedNodeID - zero or positive values mean connection creation attempt is result of
     *        failover. Negative values are ignored (mean regular connection creation attempt).
     */
-   protected ServerConnectionEndpoint(ServerPeer serverPeer, String clientID,
+   public ServerConnectionEndpoint(ServerPeer serverPeer, String clientID,
                                       String username, String password, int prefetchSize,
                                       int defaultTempQueueFullSize,
                                       int defaultTempQueuePageSize,
                                       int defaultTempQueueDownCacheSize,
-                                      int failedNodeID) throws Exception
+                                      int failedNodeID,
+                                      ServerConnectionFactoryEndpoint cfendpoint) throws Exception
    {
       this.serverPeer = serverPeer;
+
+      this.cfendpoint = cfendpoint;
       
       sm = serverPeer.getSecurityManager();
       tr = serverPeer.getTxRepository();
@@ -162,7 +166,7 @@
          this.failedNodeID = new Integer(failedNodeID);
       }
    }
-   
+
    // ConnectionDelegate implementation ------------------------------------------------------------
    
    public SessionDelegate createSessionDelegate(boolean transacted,
@@ -203,7 +207,7 @@
          log.debug("created and registered " + ep);
 
          ClientSessionDelegate d = new ClientSessionDelegate(sessionID);
-                 
+
          log.debug("created " + d);
 
          return d;
@@ -346,7 +350,7 @@
          cm.unregisterConnection(jmsClientVMId, remotingClientSessionId);
    
          JMSDispatcher.instance.unregisterTarget(new Integer(id));
-         
+
          closed = true;
       }
       catch (Throwable t)
@@ -483,7 +487,12 @@
                    "must be using pull callbacks");
       }
    }
-   
+
+   public ServerInvokerCallbackHandler getCallbackHandler()
+   {
+      return callbackHandler;
+   }
+
    // IOC
    public void setRemotingInformation(String jmsClientVMId, String remotingClientSessionId)
    {
@@ -504,7 +513,12 @@
    {
       return serverPeer;
    }
-        
+
+   public ServerConnectionFactoryEndpoint getConnectionFactoryEndpoint()
+   {
+      return cfendpoint;
+   }
+
    public String toString()
    {
       return "ConnectionEndpoint[" + id + "]";
@@ -537,11 +551,6 @@
       return defaultTempQueueDownCacheSize;
    }
    
-   ServerInvokerCallbackHandler getCallbackHandler()
-   {
-      return callbackHandler;
-   }   
-   
    int getConnectionID()
    {
       return id;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -23,6 +23,7 @@
 
 import javax.jms.JMSException;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.connectionfactory.JNDIBindings;
@@ -31,6 +32,12 @@
 import org.jboss.jms.util.ExceptionUtil;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.plugin.IDBlock;
+import org.jboss.remoting.callback.Callback;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import java.util.List;
+import java.util.Map;
+import java.util.Iterator;
 
 /**
  * Concrete implementation of ConnectionFactoryEndpoint
@@ -151,7 +158,7 @@
                                                                int failedNodeID)
       throws Exception
    {
-      log.debug("creating a new connection for user " + username);        
+      log.debug("creating a new connection for user " + username);
 
       // authenticate the user
       serverPeer.getSecurityManager().authenticate(username, password);
@@ -173,7 +180,7 @@
       ServerConnectionEndpoint endpoint =
          new ServerConnectionEndpoint(serverPeer, clientID, username, password, prefetchSize,
                                       defaultTempQueueFullSize, defaultTempQueuePageSize,
-                                      defaultTempQueueDownCacheSize, failedNodeID);
+                                      defaultTempQueueDownCacheSize, failedNodeID, this);
 
       int connectionID = endpoint.getConnectionID();
 
@@ -209,7 +216,6 @@
       }
    }
 
-
    // Public ---------------------------------------------------------------------------------------
    
    public int getID()
@@ -222,6 +228,42 @@
       return jndiBindings;
    }
 
+
+   /** Sends an update message on ClusteredConnectionFactories.
+    * Observation: I have placed here, because if we decide to lock the ServerEndpoint
+    * while we send updates, we would need the method here to perform WriteLocks on objects */
+   public void updateClusteredClients(ClientConnectionFactoryDelegate[] delegates, Map failoverMap)
+      throws Exception
+   {
+
+      // Should we lock the CFEndpoint now allowing new connections to come while doing this?
+
+      List connectionList = serverPeer.getConnectionManager().getActiveConnectionsList();
+
+
+      log.info("Sending update list to active connections. It got " +
+                 connectionList.size() + " elements");
+
+      ConnectionFactoryUpdateMessage message = new ConnectionFactoryUpdateMessage(delegates,
+                                                          failoverMap);
+      Callback callback = new Callback(message);
+
+      for (Iterator iter = connectionList.iterator(); iter.hasNext();)
+      {
+         ServerConnectionEndpoint connEndpoint = (ServerConnectionEndpoint) iter.next();
+         log.trace("Updating connection " + connEndpoint);
+         try
+         {
+            connEndpoint.getCallbackHandler().handleCallback(callback);
+         }
+         catch (Exception e)
+         {
+            log.error("Callback failed on connection " + connEndpoint, e);
+         }
+      }
+
+   }
+
    public String toString()
    {
       return "ConnectionFactoryEndpoint[" + id + "]";

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -162,7 +162,7 @@
       handleVersion(obj, dos);
 
       try
-      {         
+      {
          if (obj instanceof InvocationRequest)
          {
             if (trace) { log.trace("writing InvocationRequest"); }
@@ -180,8 +180,15 @@
                if (params != null && params.length > 0 && params[0] instanceof Callback)
                {
                   Callback callback = (Callback) params[0];
-                  MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
-                  param = mm.getLoad();
+                  if (callback.getParameter() instanceof MessagingMarshallable)
+                  {
+                     MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
+                     param = mm.getLoad();
+                  }
+                  else
+                  {
+                     param = callback.getParameter();
+                  }
                }
                else
                {

Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringAspectInternalTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringAspectInternalTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringAspectInternalTest.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,105 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.jms.client.container.ClusteringAspect;
+import java.util.Map;
+import java.lang.reflect.Method;
+
+/**
+ * This class tests internal methods of ClusteringAspect
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ *          $Id$
+ */
+public class ClusteringAspectInternalTest extends MessagingTestCase
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ClusteringAspectInternalTest(String name)
+   {
+      super(name);
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public void testGuessFailoverMap() throws Exception
+   {
+      Map failoverMapTest = new java.util.HashMap();
+      failoverMapTest.put(new Integer(1), new Integer(3));
+      failoverMapTest.put(new Integer(3), new Integer(4));
+      failoverMapTest.put(new Integer(4), new Integer(1));
+
+      assertEquals(new Integer(1), callGuessFailoverID(failoverMapTest, new Integer(0)));
+      assertEquals(new Integer(3), callGuessFailoverID(failoverMapTest, new Integer(2)));
+      assertEquals(new Integer(1), callGuessFailoverID(failoverMapTest, new Integer(5)));
+      assertEquals(new Integer(3), callGuessFailoverID(failoverMapTest, new Integer(1)));
+
+   }
+
+
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();    //To change body of overridden methods use File | Settings | File Templates.
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();    //To change body of overridden methods use File | Settings | File Templates.
+   }
+
+   // Private --------------------------------------------------------------------------------------
+
+   /**
+    * guessFailoverID is a private method that I want to test.
+    * This method will use reflection to call that method instead of making it public
+    * @param map
+    * @param value
+    */
+   private Integer callGuessFailoverID(Map map, Integer value) throws Exception
+   {
+      Method method = ClusteringAspect.class.getDeclaredMethod("guessFailoverID",
+         new Class[]{Map.class, Integer.class});
+
+      method.setAccessible(true);
+
+      return (Integer) method.invoke(null, new Object[]{map, value});
+   }
+   // Inner classes --------------------------------------------------------------------------------
+
+}


Property changes on: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringAspectInternalTest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ConnectionFactoryUpdateTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ConnectionFactoryUpdateTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ConnectionFactoryUpdateTest.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -0,0 +1,156 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
+import javax.jms.Connection;
+import javax.jms.Session;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *
+ *          $Id$
+ */
+public class ConnectionFactoryUpdateTest extends ClusteringTestBase
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ConnectionFactoryUpdateTest(String name)
+   {
+      super(name);
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+   /**
+    */
+   public void testUpdateConnectionFactory() throws Exception
+   {
+      Connection conn = cf.createConnection();
+      JBossConnectionFactory jbcf = (JBossConnectionFactory) cf;
+      ClientClusteredConnectionFactoryDelegate cfDelegate =
+         (ClientClusteredConnectionFactoryDelegate) jbcf.getDelegate();
+      assertEquals(3, cfDelegate.getDelegates().length);
+
+
+      Connection conn1 = cf.createConnection();
+
+      assertEquals(1, getServerId(conn1));
+
+      ServerManagement.killAndWait(1);
+
+      Thread.sleep(5000);
+
+      // first part of the test, verifies if the CF was updated
+      assertEquals(2, cfDelegate.getDelegates().length);
+      conn.close();
+
+      Thread.sleep(25000);
+
+      // Second part, verifies a possible racing condition on failoverMap and handleFilover
+
+      log.info("ServerId=" + getServerId(conn1));
+      assertTrue(1 != getServerId(conn1));
+
+      //Session sess = conn1.createSession(true, Session.SESSION_TRANSACTED);
+      conn1.close();
+
+   }
+
+   /**
+    * Test if an update on failoverMap on the connectionFactory would
+    * cause any problems during failover
+    */
+   public void testUpdateConnectionFactoryRaceCondition() throws Exception
+   {
+      // This connection needs to be opened, as we need the callback to update CF from this conn
+      Connection conn = cf.createConnection();
+      JBossConnectionFactory jbcf = (JBossConnectionFactory) cf;
+      ClientClusteredConnectionFactoryDelegate cfDelegate =
+         (ClientClusteredConnectionFactoryDelegate) jbcf.getDelegate();
+      assertEquals(3, cfDelegate.getDelegates().length);
+
+      Connection conn1 = cf.createConnection();
+
+      Connection conn2 = cf.createConnection();
+
+      assertEquals(2, getServerId(conn2));
+
+      assertEquals(1, getServerId(conn1));
+
+      ConnectionState state = this.getConnectionState(conn1);
+
+      // Disable Leasing for Failover
+      state.getRemotingConnection().removeConnectionListener();
+
+      ServerManagement.killAndWait(1);
+
+      Thread.sleep(15000);
+
+      // This will force Failover from Valve to kick in
+      Session sess = conn1.createSession(true, Session.SESSION_TRANSACTED);
+
+      // first part of the test, verifies if the CF was updated
+      assertEquals(2, cfDelegate.getDelegates().length);
+
+      log.info("ServerId=" + getServerId(conn1));
+      assertTrue(1 != getServerId(conn1));
+
+      conn.close();
+      conn1.close();
+      conn2.close();
+
+   }
+
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      nodeCount = 3;
+      super.setUp();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+}


Property changes on: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ConnectionFactoryUpdateTest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-01-18 12:10:23 UTC (rev 1980)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-01-18 16:12:55 UTC (rev 1981)
@@ -1635,6 +1635,9 @@
          }
          conn.start();
 
+         // Disable Lease for this test.. as the ValveAspect should capture this
+         getConnectionState(conn).getRemotingConnection().removeConnectionListener();
+
          // make sure we're connecting to node 1
 
          int nodeID = ((ConnectionState)((DelegateSupport)((JBossConnection)conn).




More information about the jboss-cvs-commits mailing list