[jboss-cvs] JBoss Messaging SVN: r1605 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/ha src/main/org/jboss/jms/client/state tests/src/org/jboss/test/messaging/core/ha
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Nov 21 00:48:36 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-11-21 00:48:33 -0500 (Tue, 21 Nov 2006)
New Revision: 1605
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/ha/ClusteredConnectionFactoryDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ClusterecConnectionTest.java
Log:
Implementing ConnectionListener on remoting
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/ha/ClusteredConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/ha/ClusteredConnectionFactoryDelegate.java 2006-11-21 05:19:36 UTC (rev 1604)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/ha/ClusteredConnectionFactoryDelegate.java 2006-11-21 05:48:33 UTC (rev 1605)
@@ -1,11 +1,15 @@
package org.jboss.jms.client.ha;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import java.io.Serializable;
import javax.jms.JMSException;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.messaging.core.plugin.IdBlock;
+import org.jboss.remoting.Client;
/**
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -18,6 +22,8 @@
private int current;
ClientConnectionFactoryDelegate[] delegates;
+ private ConcurrentHashMap clientsMap = new ConcurrentHashMap();
+ transient private Listener listener = new Listener();
public ClusteredConnectionFactoryDelegate(ClientConnectionFactoryDelegate[] delegates)
{
@@ -26,10 +32,34 @@
public ConnectionDelegate createConnectionDelegate(String username, String password) throws JMSException
{
- return getRoundRobbinFactoryDelegate().createConnectionDelegate(username, password);
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate)getRoundRobbinFactoryDelegate().createConnectionDelegate(username, password);
+ delegate.getRemotingConnection().getInvokingClient().addConnectionListener(new Listener());
+ clientsMap.put(delegate.getRemotingConnection().getInvokingClient(),delegate);
+ return delegate;
}
+ class Listener implements org.jboss.remoting.ConnectionListener
+ {
+ public void handleConnectionException(Throwable throwable, Client client)
+ {
+ try
+ {
+ System.out.println("Failing over client " + client);
+ ClientConnectionFactoryDelegate alternateFactory = getAlternateDelegate(client);
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate) clientsMap.get(client);
+ ConnectionState connectionState = (ConnectionState) delegate.getState();
+ ClientConnectionDelegate newConnection = (ClientConnectionDelegate) alternateFactory.createConnectionDelegate(connectionState.getUser(), connectionState.getPassword());
+ delegate.failOver(newConnection);
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
public synchronized ClientConnectionFactoryDelegate getRoundRobbinFactoryDelegate()
{
ClientConnectionFactoryDelegate currentDelegate = delegates[current++];
@@ -63,4 +93,34 @@
delegates[i].init();
}
}
+
+ // private
+
+ /**
+ * Return the next client needed for a failover
+ * @param client
+ * @return
+ */
+ private ClientConnectionFactoryDelegate getAlternateDelegate(Client client)
+ {
+ String serverURI = client.getInvoker().getLocator().getLocatorURI();
+ int local = -1;
+ for (int i = 0; i < delegates.length; i++)
+ {
+ if (delegates[i].getServerLocatorURI().equals(serverURI))
+ {
+ local = i;
+ break;
+ }
+ }
+
+ local++;
+ if (local > delegates.length)
+ {
+ local = 0;
+ }
+
+ return delegates[local];
+
+ }
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java 2006-11-21 05:19:36 UTC (rev 1604)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java 2006-11-21 05:48:33 UTC (rev 1605)
@@ -21,21 +21,18 @@
*/
package org.jboss.jms.client.state;
+import EDU.oswego.cs.dl.util.concurrent.SyncSet;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
import java.util.HashSet;
-
+import javax.jms.ExceptionListener;
+import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.message.MessageIdGenerator;
import org.jboss.jms.server.Version;
import org.jboss.jms.tx.ResourceManager;
import org.jboss.logging.Logger;
-import EDU.oswego.cs.dl.util.concurrent.SyncSet;
-import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
-
-import javax.jms.ExceptionListener;
-
/**
*
* State corresponding to a connection. This state is acessible inside aspects/interceptors.
@@ -62,6 +59,12 @@
private ConnectionDelegate delegate;
+ // This is filled from and for the HA interceptors only
+ private transient String user;
+
+ // This is filled from and for the HA interceptors only
+ private transient String password;
+
protected boolean started;
@@ -132,32 +135,55 @@
}
- public DelegateSupport getDelegate()
- {
- return (DelegateSupport)delegate;
- }
+ public DelegateSupport getDelegate()
+ {
+ return (DelegateSupport) delegate;
+ }
- public void setDelegate(DelegateSupport delegate)
- {
- this.delegate=(ConnectionDelegate)delegate;
- }
+ public void setDelegate(DelegateSupport delegate)
+ {
+ this.delegate = (ConnectionDelegate) delegate;
+ }
- /** Connection doesn't have a parent */
- public void setParent(HierarchicalState parent)
- {
- }
+ /**
+ * Connection doesn't have a parent
+ */
+ public void setParent(HierarchicalState parent)
+ {
+ }
- public boolean isStarted()
- {
- return started;
- }
+ public boolean isStarted()
+ {
+ return started;
+ }
- public void setStarted(boolean started)
- {
- this.started = started;
- }
+ public void setStarted(boolean started)
+ {
+ this.started = started;
+ }
- public String getClientID()
+
+ public String getPassword()
+ {
+ return password;
+ }
+
+ public void setPassword(String password)
+ {
+ this.password = password;
+ }
+
+ public String getUser()
+ {
+ return user;
+ }
+
+ public void setUser(String user)
+ {
+ this.user = user;
+ }
+
+ public String getClientID()
{
return clientID;
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ClusterecConnectionTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ClusterecConnectionTest.java 2006-11-21 05:19:36 UTC (rev 1604)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/ha/ClusterecConnectionTest.java 2006-11-21 05:48:33 UTC (rev 1605)
@@ -1,5 +1,8 @@
package org.jboss.test.messaging.core.ha;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
@@ -27,13 +30,23 @@
assertEquals(2,delegate.getDelegates().length);
+ Destination destination = (Destination)getCtx1().lookup("topic/testTopic");
for (int i=0;i<100;i++)
{
JBossConnection conn2 = (JBossConnection)factory.createConnection();
+ conn2.setClientID("client" + i);
+ conn2.start();
+ Session session = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
ConnectionState state = (ConnectionState) ((ClientConnectionDelegate)conn2.getDelegate()).getState();
log.info("state.serverId=" + state.getServerID());
- conn2.close();
+ //conn2.close();
}
+
+ System.out.println("****************** finish *********************");
+
+
+ Thread.sleep(120000);
}
}
More information about the jboss-cvs-commits
mailing list