[jboss-cvs] JBoss Messaging SVN: r1605 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/ha src/main/org/jboss/jms/client/state tests/src/org/jboss/test/messaging/core/ha

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Nov 21 00:48:36 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-11-21 00:48:33 -0500 (Tue, 21 Nov 2006)
New Revision: 1605

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/ha/ClusteredConnectionFactoryDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ClusterecConnectionTest.java
Log:
Implementing ConnectionListener on remoting

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/ha/ClusteredConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/ha/ClusteredConnectionFactoryDelegate.java	2006-11-21 05:19:36 UTC (rev 1604)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/ha/ClusteredConnectionFactoryDelegate.java	2006-11-21 05:48:33 UTC (rev 1605)
@@ -1,11 +1,15 @@
 package org.jboss.jms.client.ha;
 
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
 import java.io.Serializable;
 import javax.jms.JMSException;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.messaging.core.plugin.IdBlock;
+import org.jboss.remoting.Client;
 
 /**
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -18,6 +22,8 @@
 
    private int current;
    ClientConnectionFactoryDelegate[] delegates;
+   private ConcurrentHashMap clientsMap = new ConcurrentHashMap();
+   transient private Listener listener = new Listener();
 
    public ClusteredConnectionFactoryDelegate(ClientConnectionFactoryDelegate[] delegates)
    {
@@ -26,10 +32,34 @@
 
    public ConnectionDelegate createConnectionDelegate(String username, String password) throws JMSException
    {
-      return getRoundRobbinFactoryDelegate().createConnectionDelegate(username, password);
+      ClientConnectionDelegate delegate = (ClientConnectionDelegate)getRoundRobbinFactoryDelegate().createConnectionDelegate(username, password);
+      delegate.getRemotingConnection().getInvokingClient().addConnectionListener(new Listener());
+      clientsMap.put(delegate.getRemotingConnection().getInvokingClient(),delegate);
+      return delegate;
    }
 
+   class Listener implements org.jboss.remoting.ConnectionListener
+   {
+      public void handleConnectionException(Throwable throwable, Client client)
+      {
+         try
+         {
+            System.out.println("Failing over client " + client);
+            ClientConnectionFactoryDelegate alternateFactory = getAlternateDelegate(client);
+            ClientConnectionDelegate delegate = (ClientConnectionDelegate) clientsMap.get(client);
+            ConnectionState connectionState = (ConnectionState) delegate.getState();
+            ClientConnectionDelegate newConnection = (ClientConnectionDelegate) alternateFactory.createConnectionDelegate(connectionState.getUser(), connectionState.getPassword());
+            delegate.failOver(newConnection);
+         }
+         catch (Throwable e)
+         {
+            e.printStackTrace();
+         }
+      }
+   }
 
+
+
    public synchronized ClientConnectionFactoryDelegate getRoundRobbinFactoryDelegate()
    {
       ClientConnectionFactoryDelegate currentDelegate = delegates[current++];
@@ -63,4 +93,34 @@
          delegates[i].init();
       }
    }
+
+   // private
+
+   /**
+    * Return the next client needed for a failover
+    * @param client
+    * @return
+    */
+   private ClientConnectionFactoryDelegate getAlternateDelegate(Client client)
+   {
+      String serverURI = client.getInvoker().getLocator().getLocatorURI();
+      int local = -1;
+      for (int i = 0; i < delegates.length; i++)
+      {
+         if (delegates[i].getServerLocatorURI().equals(serverURI))
+         {
+            local = i;
+            break;
+         }
+      }
+
+      local++;
+      if (local > delegates.length)
+      {
+         local = 0;
+      }
+
+      return delegates[local];
+
+   }
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java	2006-11-21 05:19:36 UTC (rev 1604)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java	2006-11-21 05:48:33 UTC (rev 1605)
@@ -21,21 +21,18 @@
   */
 package org.jboss.jms.client.state;
 
+import EDU.oswego.cs.dl.util.concurrent.SyncSet;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
 import java.util.HashSet;
-
+import javax.jms.ExceptionListener;
+import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.message.MessageIdGenerator;
 import org.jboss.jms.server.Version;
 import org.jboss.jms.tx.ResourceManager;
 import org.jboss.logging.Logger;
 
-import EDU.oswego.cs.dl.util.concurrent.SyncSet;
-import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
-
-import javax.jms.ExceptionListener;
-
 /**
  * 
  * State corresponding to a connection. This state is acessible inside aspects/interceptors.
@@ -62,6 +59,12 @@
 
    private ConnectionDelegate delegate;
 
+   // This is filled from and for the HA interceptors only
+   private transient String user;
+
+   // This is filled from and for the HA interceptors only
+   private transient String password;
+
    protected boolean started;
 
 
@@ -132,32 +135,55 @@
    }
 
 
-    public DelegateSupport getDelegate()
-    {
-        return (DelegateSupport)delegate;
-    }
+   public DelegateSupport getDelegate()
+   {
+      return (DelegateSupport) delegate;
+   }
 
-    public void setDelegate(DelegateSupport delegate)
-    {
-        this.delegate=(ConnectionDelegate)delegate;
-    }
+   public void setDelegate(DelegateSupport delegate)
+   {
+      this.delegate = (ConnectionDelegate) delegate;
+   }
 
-    /** Connection doesn't have a parent */
-    public void setParent(HierarchicalState parent)
-    {
-    }
+   /**
+    * Connection doesn't have a parent
+    */
+   public void setParent(HierarchicalState parent)
+   {
+   }
 
-    public boolean isStarted()
-    {
-        return started;
-    }
+   public boolean isStarted()
+   {
+      return started;
+   }
 
-    public void setStarted(boolean started)
-    {
-        this.started = started;
-    }
+   public void setStarted(boolean started)
+   {
+      this.started = started;
+   }
 
-    public String getClientID()
+
+   public String getPassword()
+   {
+      return password;
+   }
+
+   public void setPassword(String password)
+   {
+      this.password = password;
+   }
+
+   public String getUser()
+   {
+      return user;
+   }
+
+   public void setUser(String user)
+   {
+      this.user = user;
+   }
+
+   public String getClientID()
     {
         return clientID;
     }

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ClusterecConnectionTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ClusterecConnectionTest.java	2006-11-21 05:19:36 UTC (rev 1604)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ClusterecConnectionTest.java	2006-11-21 05:48:33 UTC (rev 1605)
@@ -1,5 +1,8 @@
 package org.jboss.test.messaging.core.ha;
 
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
@@ -27,13 +30,23 @@
 
       assertEquals(2,delegate.getDelegates().length);
 
+      Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
 
       for (int i=0;i<100;i++)
       {
          JBossConnection conn2 = (JBossConnection)factory.createConnection();
+         conn2.setClientID("client" + i);
+         conn2.start();
+         Session session = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(destination); 
          ConnectionState state = (ConnectionState) ((ClientConnectionDelegate)conn2.getDelegate()).getState();
          log.info("state.serverId=" + state.getServerID());
-         conn2.close();
+         //conn2.close();
       }
+
+      System.out.println("****************** finish *********************");
+
+
+      Thread.sleep(120000);
    }
 }




More information about the jboss-cvs-commits mailing list