[jboss-cvs] JBoss Messaging SVN: r1714 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/server src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Dec 6 22:17:54 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-12-06 22:17:47 -0500 (Wed, 06 Dec 2006)
New Revision: 1714
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Fixing clustering CF & adding server redirect logic into HAAspect
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-07 03:17:47 UTC (rev 1714)
@@ -25,9 +25,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-
import javax.jms.JMSException;
-
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.aop.metadata.SimpleMetaData;
@@ -37,6 +35,7 @@
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.client.delegate.ClientProducerDelegate;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.client.remoting.MessageCallbackHandler;
@@ -47,7 +46,6 @@
import org.jboss.jms.client.state.ProducerState;
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.server.remoting.MetaDataConstants;
import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
import org.jboss.remoting.ConnectionListener;
@@ -103,6 +101,8 @@
else
{
//Non clustered
+
+ log.info("Assumed non clustered");
return invocation.invokeNext();
}
@@ -123,15 +123,31 @@
private synchronized ClientConnectionFactoryDelegate[] getServers(Invocation invocation)
{
if (delegates == null)
- {
+ {
+ log.info("Looking for delegates");
SimpleMetaData metaData = invocation.getMetaData();
-
- delegates = (ClientConnectionFactoryDelegate[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES);
-
+
+ //delegates = (ClientConnectionFactoryDelegate[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES);
+
+ MethodInvocation methodInvoke = (MethodInvocation)invocation;
+ //delegates = (ClientConnectionFactoryDelegate[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES);
+
+
+ // this is a hack, but I couldn't get this working through metaData
+ // TODO: FIX THIS! metaData should contain CF_DELEGATES
+ Object target = methodInvoke.getTargetObject();
+ if (target instanceof ClusteredClientConnectionFactoryDelegate)
+ {
+ delegates = ((ClusteredClientConnectionFactoryDelegate)target).getDelegates();
+ }
+
+
if (delegates != null)
- {
- failoverIndexes = (int[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_FAILOVER_INDEXES);
-
+ {
+ //TODO: Fix this! metadata should contain CF_FAILOVER_INDEXES
+ //failoverIndexes = (int[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_FAILOVER_INDEXES);
+ failoverIndexes = (int[])((ClusteredClientConnectionFactoryDelegate)target).getFailoverIndexes();
+
if (failoverIndexes == null)
{
throw new IllegalStateException("Cannot find failoverIndexes!");
@@ -145,6 +161,7 @@
private ClientConnectionDelegate createConnection(ClientConnectionFactoryDelegate cf, String username, String password)
throws Exception
{
+ log.info("createConnection");
ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)cf.createConnectionDelegate(username, password);
//Add a connection listener
@@ -161,11 +178,11 @@
private void handleFailure(ClientConnectionDelegate failedConnection) throws Exception
{
ClientConnectionFactoryDelegate newCF = getAlternateDelegate(failedConnection);
-
+
//TODO implement client side valve to prevent invocations occurring whilst failover is occurring
ConnectionState state = (ConnectionState)((DelegateSupport)failedConnection).getState();
-
+
ClientConnectionDelegate newConnection = createConnection(newCF, state.getUser(), state.getPassword());
try
@@ -181,7 +198,7 @@
}
}
- private ClientConnectionFactoryDelegate getAlternateDelegate(ClientConnectionDelegate currentDelegate)
+ private ClientConnectionFactoryDelegate getAlternateDelegate(ClientConnectionDelegate currentDelegate) throws JMSException
{
//We need to choose which delegate to fail over to
@@ -189,13 +206,6 @@
//The fail over delegate for delegates[i] is given by
//delegates[failoverIndexes[i]]
- //The technique (TODO) is to try that server. There is a possibility the expected failover server
- //is not actually the real failover server, e.g. since the client side array and server side state
- //are not totally in synch.
- //In this case we need to implement hopping, so the incorrect server redirects us to the correct server.
- //This may require several hops to get the right server (since another failure(s) can occur when the hop is
- //happening!)
-
ConnectionState currentState = (ConnectionState)((DelegateSupport)currentDelegate).getState();
String currentLocator =
@@ -229,7 +239,33 @@
throw new IllegalStateException("Cannot failover connection since no servers to fail over onto");
}
- return delegates[failoverIndexes[local]];
+ ClientConnectionFactoryDelegate delegateFound = delegates[failoverIndexes[local]];
+
+
+ // Redirect connection routine.
+ // Verify the failureMap on the server and if out of sync find the correct delegate
+
+ int failoverNode = delegateFound.getFailoverNode(currentState.getServerID());
+ if (failoverNode!=delegateFound.getServerId())
+ {
+ delegateFound = null;
+ for (int i = 0; i < delegates.length; i++)
+ {
+ if (delegates[i].getServerId() == failoverNode)
+ {
+ delegateFound = delegates[i];
+ }
+ }
+
+ if (delegateFound==null)
+ {
+ throw new IllegalStateException("Cannot find failover node on current map for nodeId=" + failoverNode);
+ }
+
+ }
+
+ return delegateFound;
+
}
private void failover(ClientConnectionDelegate failedConnection, ClientConnectionDelegate newConnection) throws Exception
@@ -244,6 +280,9 @@
failedState.copy(newState);
+ // this is necessary so the connection will start "talking" to the new server instead
+ failedState.setRemotingConnection(newState.getRemotingConnection());
+
if (failedState.getClientID() != null)
{
newConnection.setClientID(failedState.getClientID());
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-07 03:17:47 UTC (rev 1714)
@@ -23,9 +23,7 @@
import java.util.HashMap;
import java.util.Map;
-
import javax.jms.JMSException;
-
import org.jboss.aop.Advised;
import org.jboss.aop.Dispatcher;
import org.jboss.aop.joinpoint.Invocation;
@@ -70,6 +68,9 @@
//This data is needed in order to create a connection
protected String serverLocatorURI;
protected Version serverVersion;
+
+ // This property is used on redirect on failover logic (verify if a new delegate could be used during a failover)
+ protected int serverId;
protected boolean clientPing;
private transient boolean trace;
@@ -101,12 +102,13 @@
// Constructors --------------------------------------------------
- public ClientConnectionFactoryDelegate(int objectID, String serverLocatorURI,
+ public ClientConnectionFactoryDelegate(int objectID, int serverId, String serverLocatorURI,
Version serverVersion,
boolean clientPing)
{
super(objectID);
+ this.serverId = serverId;
this.serverLocatorURI = serverLocatorURI;
this.serverVersion = serverVersion;
this.clientPing = clientPing;
@@ -147,6 +149,16 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
+ public int getFailoverNode(int node) throws JMSException
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+ public boolean stillAround(int node) throws JMSException
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
// Public --------------------------------------------------------
public synchronized Object invoke(Invocation invocation) throws Throwable
@@ -157,7 +169,7 @@
if (trace) { log.trace("invoking " + methodName + " on server"); }
SimpleMetaData md = mi.getMetaData();
-
+
md.addMetaData(Dispatcher.DISPATCHER,
Dispatcher.OID,
new Integer(id),
@@ -174,19 +186,19 @@
* difficulties in knowing when to close it.
*/
- Client client;
-
+ Client client;
+
JMSRemotingConnection remotingConnection = null;
if ("createConnectionDelegate".equals(methodName))
{
// Create a new connection
-
+
remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing);
remotingConnection.start();
-
+
client = remotingConnection.getInvokingClient();
-
+
md.addMetaData(MetaDataConstants.JMS,
MetaDataConstants.REMOTING_SESSION_ID,
client.getSessionId(),
@@ -200,42 +212,42 @@
else
{
//getClientAOPConfig or getIDBlock
-
+
// Create a client - make sure pinging is off
-
+
Map configuration = new HashMap();
configuration.put(Client.ENABLE_LEASE, String.valueOf(false));
-
- client = new Client(new InvokerLocator(serverLocatorURI), configuration);
-
+
+ client = new Client(new InvokerLocator(serverLocatorURI), configuration);
+
client.setSubsystem(ServerPeer.REMOTING_JMS_SUBSYSTEM);
-
- client.connect();
-
+
+ client.connect();
+
client.setMarshaller(new JMSWireFormat());
- client.setUnMarshaller(new JMSWireFormat());
+ client.setUnMarshaller(new JMSWireFormat());
}
//What version should we use for invocations on this connection factory?
Version version = getVersionToUse(serverVersion);
byte v = version.getProviderIncrementingVersion();
-
+
MessagingMarshallable request = new MessagingMarshallable(v, mi);
-
+
MessagingMarshallable response;
try
{
response = (MessagingMarshallable)client.invoke(request, null);
-
+
if (trace) { log.trace("got server response for " + methodName); }
}
catch (Throwable t)
{
//If we were invoking createConnectionDelegate and failure occurs then we need to clear
//up the JMSRemotingConnection
-
+
if (remotingConnection != null)
{
try
@@ -246,25 +258,25 @@
{
}
}
-
+
throw t;
}
Object ret = response.getLoad();
-
+
if (remotingConnection != null)
{
// It was a call to createConnectionDelegate - set the remoting connection to use
ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)ret;
-
+
//We set the version for the connection and the remoting connection on the meta-data
//this is so the StateCreationAspect can pick it up
-
+
SimpleMetaData metaData = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
-
+
metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION,
remotingConnection, PayloadKey.TRANSIENT);
-
+
metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION,
version, PayloadKey.TRANSIENT);
@@ -284,6 +296,11 @@
return serverLocatorURI;
}
+ public int getServerId()
+ {
+ return serverId;
+ }
+
// Protected -----------------------------------------------------
protected Client getClient()
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-07 03:17:47 UTC (rev 1714)
@@ -59,10 +59,10 @@
// Constructors --------------------------------------------------
- public ClusteredClientConnectionFactoryDelegate(int objectID, String serverLocatorURI,
+ public ClusteredClientConnectionFactoryDelegate(int objectID, int serverId, String serverLocatorURI,
Version serverVersion, boolean clientPing)
{
- super(objectID, serverLocatorURI, serverVersion, clientPing);
+ super(objectID, serverId, serverLocatorURI, serverVersion, clientPing);
}
// DelegateSupport overrides -------------------------------------
@@ -105,6 +105,12 @@
return delegates;
}
+ /** As metadata is not working, I'm exposing this temporarily */
+ public int[] getFailoverIndexes()
+ {
+ return failoverIndexes;
+ }
+
public String toString()
{
return "ClusteredClientConnectionFactoryDelegate[" + id + "] with delegates.length = " + (delegates==null?"null":Integer.toString(delegates.length));
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/ServerPeer.java 2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/ServerPeer.java 2006-12-07 03:17:47 UTC (rev 1714)
@@ -21,16 +21,15 @@
*/
package org.jboss.jms.server;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.URL;
import java.util.Map;
import java.util.Set;
-
import javax.management.Attribute;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-
import org.jboss.aop.AspectXmlLoader;
import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
@@ -58,8 +57,6 @@
import org.jboss.system.ServiceMBeanSupport;
import org.w3c.dom.Element;
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
-
/**
* A JMS server peer.
*
@@ -591,6 +588,16 @@
{
return jmsUserManager;
}
+
+ public Replicator getReplicator() throws Exception
+ {
+ PostOffice postOffice = getQueuePostOfficeInstance();
+ if (!(postOffice instanceof Replicator))
+ {
+ throw new IllegalAccessException("This operations is only legal on clustering configurations");
+ }
+ return (Replicator)postOffice;
+ }
public PostOffice getQueuePostOfficeInstance() throws Exception
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-07 03:17:47 UTC (rev 1714)
@@ -127,14 +127,15 @@
{
//Replicator might still be null since we might be deploying a clustered cf in a non clustered
//post office (which is ok)
- delegate = new ClusteredClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
+ delegate = new ClusteredClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(), locatorURI, version, clientPing);
}
}
if (delegate == null)
{
//Local
- delegate = new ClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
+ delegate = new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
+ locatorURI, version, clientPing);
}
log.trace("Adding delegates factory " + uniqueName + " pointing to " + delegate);
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java 2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java 2006-12-07 03:17:47 UTC (rev 1714)
@@ -22,7 +22,6 @@
package org.jboss.jms.server.endpoint;
import javax.jms.JMSException;
-
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.messaging.core.plugin.IdBlock;
@@ -44,5 +43,25 @@
byte[] getClientAOPConfig() throws JMSException;
IdBlock getIdBlock(int size) throws JMSException;
+
+ /** Return the node that should take over a given node */
+ int getFailoverNode(int node) throws JMSException;
+
+ /**
+ * This method will be used by HAAspect interceptor to ask other servers if
+ * a given server still around.
+ *
+ * This will be useful on retries from client2server communication. If a communication exception happened
+ * on the communication with a given server we will check with other peers to confirm if the server really
+ * diead before restabilishing a connection.
+ *
+ * This method is not very useful on leasing but it will be usefull on Exception listeners.
+ *
+ * Say, if a communication exception happens on an interceptor, the interceptor could ask other servers
+ * about their current membership and decide about a retry or a failover.
+ *
+ * TODO: a better name for this method?
+ * */
+ boolean stillAround(int node) throws JMSException;
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2006-12-07 03:17:47 UTC (rev 1714)
@@ -22,7 +22,6 @@
package org.jboss.jms.server.endpoint;
import javax.jms.JMSException;
-
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.server.ServerPeer;
@@ -137,7 +136,36 @@
throw ExceptionUtil.handleJMSInvocation(t, this + " createConnectionDelegate");
}
}
-
+
+
+ public int getFailoverNode(int node) throws JMSException
+ {
+ try
+ {
+ ServerPeer peer = (ServerPeer )serverPeer.getInstance();
+ return peer.getReplicator().getFailoverNodeID(node);
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " getFailoverNode");
+ }
+ }
+
+
+ // TODO: Better name
+ public boolean stillAround(int node) throws JMSException
+ {
+ try
+ {
+ ServerPeer peer = (ServerPeer )serverPeer.getInstance();
+ return peer.getReplicator().stillAround(node);
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " stillAround");
+ }
+ }
+
public byte[] getClientAOPConfig() throws JMSException
{
try
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2006-12-07 03:17:47 UTC (rev 1714)
@@ -22,7 +22,6 @@
package org.jboss.jms.server.endpoint.advised;
import javax.jms.JMSException;
-
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.server.endpoint.ConnectionFactoryEndpoint;
import org.jboss.messaging.core.plugin.IdBlock;
@@ -74,6 +73,17 @@
return endpoint.getIdBlock(size);
}
+ public int getFailoverNode(int node) throws JMSException
+ {
+ return endpoint.getFailoverNode(node);
+ }
+
+ // TODO: find a better name... look at javadoc on ConnectionFactoryInterface
+ public boolean stillAround(int node) throws JMSException
+ {
+ return endpoint.stillAround(node);
+ }
+
// AdvisedSupport override ---------------------------------------
public Object getEndpoint()
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java 2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java 2006-12-07 03:17:47 UTC (rev 1714)
@@ -79,4 +79,18 @@
* returns the original nodeID.
*/
int getFailoverNodeID(int nodeID);
+
+
+ /**
+ *
+ * This method will be used by HAAspect interceptor to ask other servers if
+ * a given server still around.
+ *
+ * This will be useful on retries from client2server communication. If a communication exception happened
+ * on the communication with a given server we will check with other peers to confirm if the server really
+ * diead before restabilishing a connection
+ *
+ * TODO: a better name for this method?
+ * */
+ public boolean stillAround(int nodeId);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-07 03:17:47 UTC (rev 1714)
@@ -608,6 +608,15 @@
}
}
+ // TODO: find a better name
+ public boolean stillAround(int nodeId)
+ {
+ synchronized (failoverMap)
+ {
+ return failoverMap.get(new Integer(nodeId))!=null;
+ }
+ }
+
// Replicator implementation --------------------------------------------------------------------------
public Map get(Serializable key) throws Exception
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-06 22:37:53 UTC (rev 1713)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-07 03:17:47 UTC (rev 1714)
@@ -22,10 +22,22 @@
package org.jboss.test.messaging.jms.clustering;
+import javax.jms.Destination;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossMessageConsumer;
+import org.jboss.jms.client.JBossSession;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
+import org.jboss.jms.client.state.ConsumerState;
+import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.message.MessageProxy;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -53,146 +65,145 @@
// Public --------------------------------------------------------
- public void testConnectionFactoryConnect() throws Exception
- {
- try
- {
- JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
- log.info ("number of delegates = " + delegate.getDelegates().length);
- log.info ("number of servers = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-
- assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
- assertEquals(3, delegate.getDelegates().length);
-
-
- ServerManagement.log(ServerManagement.INFO,"Stopping server 2 as part of testConnectionFactoryConnect");
- ServerManagement.stop(2,true);
-
- ServerManagement.log(ServerManagement.INFO,"##### Looking up ConnectionFactory at testConnectionFactoryConnect");
-
- factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
- delegate = (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- assertEquals(2, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
- assertEquals(2, delegate.getDelegates().length);
- }
- finally
- {
- ServerManagement.start("all", 2);
- }
-
-
- }
-
-
-// public void testTopicSubscriber() throws Exception
+// public void testConnectionFactoryConnect() throws Exception
// {
// try
// {
-// log.info("++testTopicSubscriber");
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+// log.info ("number of delegates = " + delegate.getDelegates().length);
+// log.info ("number of servers = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
//
-// log.info(">>Lookup Queue");
-// Destination destination = (Destination) ic1.lookup("topic/testDistributedTopic");
+// delegate.init();
//
-// log.info("Creating connection server1");
-// JBossConnection conn = (JBossConnection) cf1.createConnection();
-// conn.setClientID("testClient");
-// conn.start();
+// for (int i = 0; i < 3; i++)
+// {
+// int failNode = delegate.getDelegates()[0].getFailoverNode(i);
+// log.info("Failover node for server" + i + " = " + failNode);
+// log.info("InvokerLocator for server " + i + " = " + delegate.getDelegates()[i].getServerLocatorURI());
//
-// JBossSession session = (JBossSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
-// ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
-// SessionState sessionState = (SessionState) clientSessionDelegate.getState();
-//
-// MessageConsumer consumerHA = session.createDurableSubscriber((Topic) destination, "T1");
-// JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
-//
-// org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
-// ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
-//
-// log.info("subscriptionName=" + consumerState.getSubscriptionName());
-//
-//
-// log.info(">>Creating Producer");
-// MessageProducer producer = session.createProducer(destination);
-// log.info(">>creating Message");
-// Message message = session.createTextMessage("Hello Before");
-// log.info(">>sending Message");
-// producer.send(message);
-// session.commit();
-//
-// receiveMessage("consumerHA", consumerHA, true, false);
-//
-// session.commit();
-// //if (true) return;
-//
-// Object txID = sessionState.getCurrentTxId();
-//
-// producer.send(session.createTextMessage("Hello again before failover"));
-//
-// ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
-//
-// JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
-//
-// log.info(">>Creating alternate connection");
-// JBossConnection conn2 = (JBossConnection) cf2.createConnection();
-// log.info("NewConnectionCreated=" + conn2);
-//
-// log.info(">>Failling over");
-// assertSame(originalRemoting, delegate.getRemotingConnection());
-// conn.getDelegate().failOver(conn2.getDelegate());
-//
-//
-//
-// try
-// {
-// originalRemoting.stop();
-// } catch (Throwable throwable)
-// {
-// throwable.printStackTrace();
+// assertEquals("Server1 should have the same failoverMapping",
+// failNode, delegate.getDelegates()[1].getFailoverNode(i));
+// assertEquals("Server2 should have the same failoverMapping",
+// failNode, delegate.getDelegates()[2].getFailoverNode(i));
// }
//
-// ServerManagement.stop(0, false);
+// assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+// assertEquals(3, delegate.getDelegates().length);
//
-// assertNotSame(originalRemoting, delegate.getRemotingConnection());
//
-// //System.out.println("Kill server1"); Thread.sleep(10000);
+// ServerManagement.log(ServerManagement.INFO,"Stopping server 2 as part of testConnectionFactoryConnect");
+// ServerManagement.stop(2,true);
//
-// message = session.createTextMessage("Hello After");
-// log.info(">>Sending new message");
-// producer.send(message);
+// ServerManagement.log(ServerManagement.INFO,"##### Looking up ConnectionFactory at testConnectionFactoryConnect");
//
-// assertEquals(txID, sessionState.getCurrentTxId());
-// System.out.println("TransactionID on client = " + txID);
-// log.info(">>Final commit");
+// factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+// delegate = (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
//
-// /* JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
-// connSecondServer.start();
-// JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
-// MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination); */
-//
-// session.commit();
-//
-// /* receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
-// receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
-// receiveMessage("consumerSecondServer",consumerSecondServer,true,true); */
-//
-// log.info("Calling alternate receiver");
-// receiveMessage("consumerHA", consumerHA, true, false);
-// receiveMessage("consumerHA", consumerHA, true, false);
-// receiveMessage("consumerHA", consumerHA, true, true);
-//
-//
-// session.commit();
-//
+// assertEquals(2, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+// assertEquals(2, delegate.getDelegates().length);
// }
// finally
// {
-// // restart the server as it was probably stopped (tearDown will need that)
-// ServerManagement.start("all", 0, true);
+// ServerManagement.start("all", 2);
// }
+//
+//
// }
+
+
+ public void testTopicSubscriber() throws Exception
+ {
+ try
+ {
+ log.info("++testTopicSubscriber");
+
+
+ JBossConnectionFactory jbcf1 = (JBossConnectionFactory)cf1;
+ assertTrue(jbcf1.getDelegate() instanceof ClusteredClientConnectionFactoryDelegate);
+
+ log.info(">>Lookup Queue");
+ Destination destination = (Destination) ic1.lookup("topic/testDistributedTopic");
+
+ log.info("Creating connection server1");
+ JBossConnection conn = (JBossConnection) cf1.createConnection();
+ conn.setClientID("testClient");
+ conn.start();
+
+ JBossSession session = (JBossSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
+ SessionState sessionState = (SessionState) clientSessionDelegate.getState();
+
+ MessageConsumer consumerHA = session.createDurableSubscriber((Topic) destination, "T1");
+ JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
+
+ org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate = (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
+ ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
+
+ log.info("subscriptionName=" + consumerState.getSubscriptionName());
+
+
+ log.info(">>Creating Producer");
+ MessageProducer producer = session.createProducer(destination);
+ log.info(">>creating Message");
+ Message message = session.createTextMessage("Hello Before");
+ log.info(">>sending Message");
+ producer.send(message);
+ session.commit();
+
+ receiveMessage("consumerHA", consumerHA, true, false);
+
+ session.commit();
+ //if (true) return;
+
+ Object txID = sessionState.getCurrentTxId();
+
+ producer.send(session.createTextMessage("Hello again before failover"));
+
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
+
+ ServerManagement.stop(0, false);
+
+ Thread.sleep(25000);
+
+ //System.out.println("Kill server1"); Thread.sleep(10000);
+
+ message = session.createTextMessage("Hello After");
+ log.info(">>Sending new message");
+ producer.send(message);
+
+ assertEquals(txID, sessionState.getCurrentTxId());
+ System.out.println("TransactionID on client = " + txID);
+ log.info(">>Final commit");
+
+ /* JBossConnection connSecondServer = (JBossConnection)this.factoryServer2.createConnection();
+ connSecondServer.start();
+ JBossSession sessionSecondServer = (JBossSession)connSecondServer.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerSecondServer = sessionSecondServer.createConsumer(destination); */
+
+ session.commit();
+
+ /* receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+ receiveMessage("consumerSecondServer",consumerSecondServer,true,false);
+ receiveMessage("consumerSecondServer",consumerSecondServer,true,true); */
+
+ log.info("Calling alternate receiver");
+ receiveMessage("consumerHA", consumerHA, true, false);
+ receiveMessage("consumerHA", consumerHA, true, false);
+ receiveMessage("consumerHA", consumerHA, true, true);
+
+
+ session.commit();
+
+ }
+ finally
+ {
+ // restart the server as it was probably stopped (tearDown will need that)
+ ServerManagement.start("all", 0);
+ }
+ }
+
//
// public void testQueueHA() throws Exception
// {
More information about the jboss-cvs-commits
mailing list