[jboss-cvs] JBoss Messaging SVN: r1763 - in branches/Branch_Client_Failover_Experiment: src/etc src/etc/server/default/deploy src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/jms/server/container src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/etc tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Dec 11 10:27:44 EST 2006
Author: timfox
Date: 2006-12-11 10:27:26 -0500 (Mon, 11 Dec 2006)
New Revision: 1763
Added:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/CreateConnectionResult.java
Removed:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java
Modified:
branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml
branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-server.xml
branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionFactory.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/StateCreationAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/InjectionAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/SecurityAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
branches/Branch_Client_Failover_Experiment/tests/etc/log4j.xml
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
More HA fixes.
Modified: branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-client.xml 2006-12-11 15:27:26 UTC (rev 1763)
@@ -28,7 +28,7 @@
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate->createConnectionDelegate(..))">
<advice name="handleCreateConnectionDelegate" aspect="org.jboss.jms.client.container.StateCreationAspect"/>
- </bind>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate->createConnectionDelegate(..))">
<advice name="handleCreateConnectionDelegate" aspect="org.jboss.jms.client.container.HAAspect"/>
</bind>
Modified: branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-server.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-server.xml 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/etc/aop-messaging-server.xml 2006-12-11 15:27:26 UTC (rev 1763)
@@ -37,13 +37,8 @@
<bind pointcut="execution(* org.jboss.jms.server.endpoint.advised.ConsumerAdvised->$implementing{org.jboss.jms.server.endpoint.ConsumerEndpoint}(..))">
<interceptor-ref name="org.jboss.jms.server.container.ServerLogInterceptor"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.server.endpoint.advised.ConsumerAdvised->activate(..))">
- <advice name="handleActivate" aspect="org.jboss.jms.server.container.SecurityAspect"/>
- </bind>
- <bind pointcut="execution(* org.jboss.jms.server.endpoint.advised.ConsumerAdvised->getMessageNow(..))">
- <advice name="handleGetMessageNow" aspect="org.jboss.jms.server.container.SecurityAspect"/>
- </bind>
+
<!-- Browser -->
<bind pointcut="execution(* org.jboss.jms.server.endpoint.advised.BrowserAdvised->$implementing{org.jboss.jms.server.endpoint.BrowserEndpoint}(..))">
<interceptor-ref name="org.jboss.jms.server.container.ServerLogInterceptor"/>
Modified: branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2006-12-11 15:27:26 UTC (rev 1763)
@@ -87,7 +87,8 @@
<AUTOCONF down_thread="false" up_thread="false"/>
<PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
<MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
- <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+ <FD_SOCK down_thread="false" up_thread="false"/>
+ <FD timeout="20000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
<VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
<pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
retransmit_timeout="100,200,600,1200,2400,4800"/>
@@ -107,7 +108,8 @@
<AUTOCONF down_thread="false" up_thread="false"/>
<PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
<MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
- <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+ <FD_SOCK down_thread="false" up_thread="false"/>
+ <FD timeout="20000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
<VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
<pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
retransmit_timeout="100,200,600,1200,2400,4800"/>
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionFactory.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionFactory.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/JBossConnectionFactory.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -42,9 +42,9 @@
import org.jboss.aop.Advised;
import org.jboss.jms.client.container.JmsClientAspectXMLLoader;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
-import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.referenceable.SerializableObjectRefAddr;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
import org.jboss.jms.util.ThreadContextClassLoaderChanger;
import org.jboss.logging.Logger;
@@ -199,9 +199,9 @@
// The version used by the connection is the minimum of the server version for the
// connection factory and the client code version
- ConnectionDelegate cd = delegate.createConnectionDelegate(username, password);
+ CreateConnectionResult res = delegate.createConnectionDelegate(username, password, -1);
- return new JBossConnection(cd, type);
+ return new JBossConnection(res.getDelegate(), type);
}
finally
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -48,7 +48,7 @@
import org.jboss.jms.client.state.ProducerState;
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.server.endpoint.ConnectionStatus;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
import org.jboss.remoting.ConnectionListener;
@@ -103,7 +103,7 @@
ClientConnectionDelegate connDelegate = createConnection(cfDelegate, username, password);
- return connDelegate;
+ return new CreateConnectionResult(connDelegate);
}
else
{
@@ -163,8 +163,11 @@
throws Exception
{
log.info("createConnection");
+
+ CreateConnectionResult res = (CreateConnectionResult)cf.createConnectionDelegate(username, password, -1);
+
ClientConnectionDelegate connDelegate =
- (ClientConnectionDelegate)cf.createConnectionDelegate(username, password);
+ (ClientConnectionDelegate)res.getDelegate();
initialiseConnection(connDelegate);
@@ -194,26 +197,28 @@
ConnectionState state = (ConnectionState)((DelegateSupport)failedConnection).getState();
- log.info("Failing over connection");
+ log.info("calling createFailoverConnectionDelegate");
- ConnectionStatus status =
- newCF.createFailoverConnectionDelegate(state.getUser(), state.getPassword(), state.getServerID());
+ CreateConnectionResult res =
+ newCF.createConnectionDelegate(state.getUser(), state.getPassword(), state.getServerID());
log.info("returned from createFailoverConnectionDelegate");
- if (status.getDelegate() != null)
+ if (res.getDelegate() != null)
{
log.info("Got connection");
- ClientConnectionDelegate newConnection = (ClientConnectionDelegate)state.getDelegate();
+ ClientConnectionDelegate newConnection = (ClientConnectionDelegate)res.getDelegate();
+ log.info("newconnection is " + newConnection);
+
//We got the right server and created a new connection
failover(failedConnection, newConnection);
}
else
{
- if (status.getActualFailoverNode() == -1)
+ if (res.getActualFailoverNode() == -1)
{
//No trace of failover was detected on the server side - this might happen if the client side
//network fails temporarily so the client connection breaks but the server side network is still
@@ -272,32 +277,6 @@
throw new IllegalStateException("Cannot find failover delegate for node " + failoverServerID.intValue());
}
- // Redirect connection routine.
- // Verify the failureMap on the server and if out of sync find the correct delegate
-
-
- //THIS IS WRONG - cannot use getFailoverNode method since failover node might change
- //between getting the result and actually failing over
-
-// int failoverNode = delegateFound.getFailoverNode(currentState.getServerID());
-// if (failoverNode!=delegateFound.getServerId())
-// {
-// delegateFound = null;
-// for (int i = 0; i < delegates.length; i++)
-// {
-// if (delegates[i].getServerId() == failoverNode)
-// {
-// delegateFound = delegates[i];
-// }
-// }
-//
-// if (delegateFound==null)
-// {
-// throw new IllegalStateException("Cannot find failover node on current map for nodeId=" + failoverNode);
-// }
-//
-// }
-
return del;
}
@@ -312,12 +291,14 @@
int oldServerID = failedState.getServerID();
ConnectionState newState = (ConnectionState)newConnection.getState();
+
+ log.info("new state is: " + newState);
failedState.copy(newState);
// this is necessary so the connection will start "talking" to the new server instead
failedState.setRemotingConnection(newState.getRemotingConnection());
-
+
if (failedState.getClientID() != null)
{
newConnection.setClientID(failedState.getClientID());
@@ -325,26 +306,30 @@
// Transfering state from newDelegate to currentDelegate
failedConnection.copyState(newConnection);
+
+ log.info("failing over children");
- if (failedState.isStarted())
- {
- failedConnection.start();
- }
-
for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
{
SessionState failedSessionState = (SessionState)i.next();
+ log.info("Creating session");
+
ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newConnection.
createSessionDelegate(failedSessionState.isTransacted(),
failedSessionState.getAcknowledgeMode(),
failedSessionState.isXA());
-
+
+ log.info("Created session");
+
ClientSessionDelegate failedSessionDelegate =
(ClientSessionDelegate)failedSessionState.getDelegate();
failedSessionDelegate.copyState(newSessionDelegate);
+
+ log.info("copied state");
+
if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
//TODO Clebert please add comment as to why this clone is necessary
@@ -375,6 +360,13 @@
}
}
}
+
+ //We must not start the connection until the end
+ if (failedState.isStarted())
+ {
+ failedConnection.start();
+ }
+
log.info("Failover done");
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -28,7 +28,6 @@
import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.aop.metadata.SimpleMetaData;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.ClientProducerDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
@@ -39,6 +38,7 @@
import org.jboss.jms.client.state.ProducerState;
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.delegate.BrowserDelegate;
+import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.ProducerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
@@ -46,6 +46,7 @@
import org.jboss.jms.message.MessageIdGenerator;
import org.jboss.jms.message.MessageIdGeneratorFactory;
import org.jboss.jms.server.Version;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
import org.jboss.jms.server.remoting.MetaDataConstants;
import org.jboss.jms.tx.ResourceManager;
import org.jboss.jms.tx.ResourceManagerFactory;
@@ -81,40 +82,49 @@
public Object handleCreateConnectionDelegate(Invocation inv) throws Throwable
{
- ClientConnectionFactoryDelegate cfd = (ClientConnectionFactoryDelegate)inv.getTargetObject();
- ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)inv.invokeNext();
- connectionDelegate.init();
-
- SimpleMetaData md = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
+ ConnectionFactoryDelegate cfd = (ConnectionFactoryDelegate)inv.getTargetObject();
- int serverID =
- ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.SERVER_ID)).intValue();
+ CreateConnectionResult res = (CreateConnectionResult)inv.invokeNext();
- Version connectionVersion =
- (Version)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION);
+ ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)res.getDelegate();
- JMSRemotingConnection connection =
- (JMSRemotingConnection)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION);
-
- if (connectionVersion == null)
+ if (connectionDelegate != null)
{
- throw new IllegalStateException("Connection version is null");
+
+ connectionDelegate.init();
+
+ SimpleMetaData md = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
+
+ int serverID =
+ ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.SERVER_ID)).intValue();
+
+ Version connectionVersion =
+ (Version)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION);
+
+ JMSRemotingConnection connection =
+ (JMSRemotingConnection)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION);
+
+ if (connectionVersion == null)
+ {
+ throw new IllegalStateException("Connection version is null");
+ }
+
+ // We have one resource manager per unique server
+ ResourceManager rm = ResourceManagerFactory.instance.checkOutResourceManager(serverID);
+
+ //We have one message id generator per unique server
+ MessageIdGenerator gen =
+ MessageIdGeneratorFactory.instance.checkOutGenerator(serverID, cfd);
+
+ ConnectionState connectionState =
+ new ConnectionState(serverID, connectionDelegate,
+ connection,
+ connectionVersion, rm, gen);
+
+ connectionDelegate.setState(connectionState);
}
-
- //We have one resource manager per unique server
- ResourceManager rm = ResourceManagerFactory.instance.checkOutResourceManager(serverID);
- //We have one message id generator per unique server
- MessageIdGenerator gen =
- MessageIdGeneratorFactory.instance.checkOutGenerator(serverID, cfd);
-
- ConnectionState connectionState =
- new ConnectionState(serverID, connectionDelegate,
- connection,
- connectionVersion, rm, gen);
-
- connectionDelegate.setState(connectionState);
- return connectionDelegate;
+ return res;
}
public Object handleCreateSessionDelegate(Invocation invocation) throws Throwable
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -23,7 +23,9 @@
import java.util.HashMap;
import java.util.Map;
+
import javax.jms.JMSException;
+
import org.jboss.aop.Advised;
import org.jboss.aop.Dispatcher;
import org.jboss.aop.joinpoint.Invocation;
@@ -32,11 +34,10 @@
import org.jboss.aop.util.PayloadKey;
import org.jboss.jms.client.container.JMSClientVMIdentifier;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.Version;
-import org.jboss.jms.server.endpoint.ConnectionStatus;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
import org.jboss.jms.server.remoting.JMSWireFormat;
import org.jboss.jms.server.remoting.MessagingMarshallable;
import org.jboss.jms.server.remoting.MetaDataConstants;
@@ -117,28 +118,16 @@
}
// ConnectionFactoryDelegateImplementation -----------------------
-
+
/**
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
- * @see org.jboss.jms.client.container.StateCreationAspect#handleCreateConnectionDelegate(org.jboss.aop.joinpoint.Invocation)
- * @see org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint#createConnectionDelegate(String, String)
*/
- public ConnectionDelegate createConnectionDelegate(String username, String password)
+ public CreateConnectionResult createConnectionDelegate(String username, String password, int failedNodeId)
throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
-
- /**
- * This invocation should either be handled by the client-side interceptor chain or by the
- * server-side endpoint.
- */
- public ConnectionStatus createFailoverConnectionDelegate(String username, String password, int failedNodeId)
- throws JMSException
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
/**
* This invocation should either be handled by the client-side interceptor chain or by the
@@ -268,19 +257,37 @@
if (remotingConnection != null)
{
// It was a call to createConnectionDelegate - set the remoting connection to use
- ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)ret;
+
+ CreateConnectionResult res = (CreateConnectionResult)ret;
+
+ ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)res.getDelegate();
+
+ if (connectionDelegate != null)
+ {
+ //We set the version for the connection and the remoting connection on the meta-data
+ //this is so the StateCreationAspect can pick it up
+
+ SimpleMetaData metaData = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
+
+ metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION,
+ remotingConnection, PayloadKey.TRANSIENT);
+
+ metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION,
+ version, PayloadKey.TRANSIENT);
+ }
+ else
+ {
+ //Wrong server redirect on failure
+ //close the remoting connection
+ try
+ {
+ remotingConnection.stop();
+ }
+ catch (Throwable ignore)
+ {
+ }
+ }
- //We set the version for the connection and the remoting connection on the meta-data
- //this is so the StateCreationAspect can pick it up
-
- SimpleMetaData metaData = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
-
- metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION,
- remotingConnection, PayloadKey.TRANSIENT);
-
- metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION,
- version, PayloadKey.TRANSIENT);
-
}
return ret;
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -83,6 +83,7 @@
// Enable client pinging. Server leasing is enabled separately on the server side
Map config = new HashMap();
+
config.put(Client.ENABLE_LEASE, String.valueOf(clientPing));
client = new Client(serverLocator, config);
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -22,14 +22,15 @@
package org.jboss.jms.server.connectionfactory;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
+
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
@@ -382,11 +383,9 @@
private void recalculateFailoverMap(Map nodeAddressMap) throws Exception
{
- List nodes = new ArrayList(nodeAddressMap.keySet());
-
FailoverMapper mapper = replicator.getFailoverMapper();
- failoverMap = mapper.generateMapping(nodes);
+ failoverMap = mapper.generateMapping(nodeAddressMap.keySet());
}
/**
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/InjectionAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/InjectionAspect.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/InjectionAspect.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -24,6 +24,7 @@
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
import org.jboss.jms.server.endpoint.advised.ConnectionAdvised;
import org.jboss.jms.server.remoting.JMSDispatcher;
@@ -66,51 +67,56 @@
throw new IllegalStateException("Can't find handler");
}
- ClientConnectionDelegate del = (ClientConnectionDelegate)invocation.invokeNext();
+ CreateConnectionResult res = (CreateConnectionResult)invocation.invokeNext();
- ConnectionAdvised advised =
- (ConnectionAdvised)JMSDispatcher.instance.getRegistered(new Integer(del.getID()));
+ ClientConnectionDelegate del = (ClientConnectionDelegate)res.getDelegate();
- ServerConnectionEndpoint endpoint = (ServerConnectionEndpoint)advised.getEndpoint();
-
- endpoint.setCallbackClient(handler.getCallbackClient());
-
- // Then we inject the remoting session id of the client
- String sessionId =
- (String)mi.getMetaData(MetaDataConstants.JMS,
- MetaDataConstants.REMOTING_SESSION_ID);
-
- if (sessionId == null)
- {
- throw new IllegalStateException("Can't find session id");
+ if (del != null)
+ {
+ ConnectionAdvised advised =
+ (ConnectionAdvised)JMSDispatcher.instance.getRegistered(new Integer(del.getID()));
+
+ ServerConnectionEndpoint endpoint = (ServerConnectionEndpoint)advised.getEndpoint();
+
+ endpoint.setCallbackClient(handler.getCallbackClient());
+
+ // Then we inject the remoting session id of the client
+ String sessionId =
+ (String)mi.getMetaData(MetaDataConstants.JMS,
+ MetaDataConstants.REMOTING_SESSION_ID);
+
+ if (sessionId == null)
+ {
+ throw new IllegalStateException("Can't find session id");
+ }
+
+ // Then we inject the unique id of the client VM
+ String jmsClientVMID =
+ (String)mi.getMetaData(MetaDataConstants.JMS,
+ MetaDataConstants.JMS_CLIENT_VM_ID);
+
+ if (jmsClientVMID == null)
+ {
+ throw new IllegalStateException("Can't find jms client id");
+ }
+
+ endpoint.setRemotingInformation(jmsClientVMID, sessionId);
+
+ // Then we inject the version number from to be used
+
+ Byte ver =
+ (Byte)mi.getMetaData(MetaDataConstants.JMS,
+ MetaDataConstants.VERSION_NUMBER);
+
+ if (ver == null)
+ {
+ throw new IllegalStateException("Can't find version");
+ }
+
+ endpoint.setUsingVersion(ver.byteValue());
}
- // Then we inject the unique id of the client VM
- String jmsClientVMID =
- (String)mi.getMetaData(MetaDataConstants.JMS,
- MetaDataConstants.JMS_CLIENT_VM_ID);
-
- if (jmsClientVMID == null)
- {
- throw new IllegalStateException("Can't find jms client id");
- }
-
- endpoint.setRemotingInformation(jmsClientVMID, sessionId);
-
- // Then we inject the version number from to be used
-
- Byte ver =
- (Byte)mi.getMetaData(MetaDataConstants.JMS,
- MetaDataConstants.VERSION_NUMBER);
-
- if (ver == null)
- {
- throw new IllegalStateException("Can't find version");
- }
-
- endpoint.setUsingVersion(ver.byteValue());
-
- return del;
+ return res;
}
// Package protected ---------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/SecurityAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/SecurityAspect.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/container/SecurityAspect.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -150,20 +150,6 @@
return invocation.invokeNext();
}
- public Object handleGetMessageNow(Invocation invocation) throws Throwable
- {
- checkConsumerAccess(invocation);
-
- return invocation.invokeNext();
- }
-
- public Object handleActivate(Invocation invocation) throws Throwable
- {
- checkConsumerAccess(invocation);
-
- return invocation.invokeNext();
- }
-
protected void checkConsumerAccess(Invocation invocation) throws Throwable
{
ConsumerAdvised del = (ConsumerAdvised)invocation.getTargetObject();
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -22,7 +22,7 @@
package org.jboss.jms.server.endpoint;
import javax.jms.JMSException;
-import org.jboss.jms.delegate.ConnectionDelegate;
+
import org.jboss.messaging.core.plugin.IdBlock;
/**
@@ -36,15 +36,12 @@
* $Id$
*/
public interface ConnectionFactoryEndpoint
-{
- ConnectionDelegate createConnectionDelegate(String username, String password)
+{
+ CreateConnectionResult createConnectionDelegate(String username,
+ String password,
+ int failedNodeId)
throws JMSException;
- ConnectionStatus createFailoverConnectionDelegate(String username,
- String password,
- int failedNodeId)
- throws JMSException;
-
byte[] getClientAOPConfig() throws JMSException;
IdBlock getIdBlock(int size) throws JMSException;
Deleted: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -1,65 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.server.endpoint;
-
-import java.io.Serializable;
-
-import org.jboss.jms.delegate.ConnectionDelegate;
-
-/**
- * A ConnectionStatus
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class ConnectionStatus implements Serializable
-{
- private static final long serialVersionUID = 4311863642735135167L;
-
- private ConnectionDelegate delegate;
-
- private int actualFailoverNodeId;
-
- public ConnectionStatus(ConnectionDelegate del)
- {
- this.delegate = del;
- }
-
- public ConnectionStatus(int actualFailoverNode)
- {
- this.actualFailoverNodeId = actualFailoverNode;
- }
-
- public ConnectionDelegate getDelegate()
- {
- return delegate;
- }
-
- public int getActualFailoverNode()
- {
- return actualFailoverNodeId;
- }
-
-}
Copied: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/CreateConnectionResult.java (from rev 1760, branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java)
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionStatus.java 2006-12-11 01:05:12 UTC (rev 1760)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/CreateConnectionResult.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.endpoint;
+
+import java.io.Serializable;
+
+import org.jboss.jms.delegate.ConnectionDelegate;
+
+/**
+ *
+ * A CreateConnectionResult
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class CreateConnectionResult implements Serializable
+{
+ private static final long serialVersionUID = 4311863642735135167L;
+
+ private ConnectionDelegate delegate;
+
+ private int actualFailoverNodeId;
+
+ public CreateConnectionResult(ConnectionDelegate del)
+ {
+ this.delegate = del;
+ }
+
+ public CreateConnectionResult(int actualFailoverNode)
+ {
+ this.actualFailoverNodeId = actualFailoverNode;
+ }
+
+ public ConnectionDelegate getDelegate()
+ {
+ return delegate;
+ }
+
+ public int getActualFailoverNode()
+ {
+ return actualFailoverNodeId;
+ }
+
+}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -173,6 +173,8 @@
{
log.debug("creating session " + (transacted ? "transacted" :"non transacted")+ ", " + ToString.acknowledgmentMode(acknowledgmentMode) + ", " + (isXA ? "XA": "non XA"));
+ log.info("********** CREATING NEW SESSION");
+
if (closed)
{
throw new IllegalStateException("Connection is closed");
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -94,30 +94,33 @@
// ConnectionFactoryDelegate implementation ----------------------
- /**
- * Called when a new connection is to be created for failover
- * The ConnectionStatus instance returns either contains the delegate or the node id of the actual node
- * Note we cannot query the failover node in a separate call since that might change between
- * querying it and actually creating the delegate
- */
- public ConnectionStatus createFailoverConnectionDelegate(String username, String password, int failedNodeId)
+ public CreateConnectionResult createConnectionDelegate(String username, String password, int failedNodeId)
throws JMSException
{
try
{
- //Wait for server side failover to complete
- int failoverNodeId = serverPeer.waitForFailover(failedNodeId);
-
- if (failoverNodeId == -1 || failoverNodeId != serverPeer.getServerPeerID())
+ if (failedNodeId == -1)
{
- //We are on the wrong node - or no failover has occurred
- return new ConnectionStatus(failoverNodeId);
+ //Just a standard createConnection
+ return new CreateConnectionResult(createConnectionDelegateInternal(username, password));
}
else
{
- //We are on the right node, and failover has completed
- //we can now create a connection delegate
- return new ConnectionStatus(createConnectionDelegateInternal(username, password));
+ //Failover
+ //Wait for server side failover to complete
+ int failoverNodeId = serverPeer.waitForFailover(failedNodeId);
+
+ if (failoverNodeId == -1 || failoverNodeId != serverPeer.getServerPeerID())
+ {
+ //We are on the wrong node - or no failover has occurred
+ return new CreateConnectionResult(failoverNodeId);
+ }
+ else
+ {
+ //We are on the right node, and failover has completed
+ //we can now create a connection delegate
+ return new CreateConnectionResult(createConnectionDelegateInternal(username, password));
+ }
}
}
catch (Throwable t)
@@ -162,19 +165,6 @@
return new ClientConnectionDelegate(connectionID, serverPeer.getServerPeerID());
}
- public ConnectionDelegate createConnectionDelegate(String username, String password)
- throws JMSException
- {
- try
- {
- return createConnectionDelegateInternal(username, password);
- }
- catch (Throwable t)
- {
- throw ExceptionUtil.handleJMSInvocation(t, this + " createConnectionDelegate");
- }
- }
-
public byte[] getClientAOPConfig() throws JMSException
{
try
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -24,7 +24,7 @@
import javax.jms.JMSException;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.server.endpoint.ConnectionFactoryEndpoint;
-import org.jboss.jms.server.endpoint.ConnectionStatus;
+import org.jboss.jms.server.endpoint.CreateConnectionResult;
import org.jboss.messaging.core.plugin.IdBlock;
/**
@@ -58,17 +58,11 @@
// ConnectionFactoryEndpoint implementation -----------------------
- public ConnectionDelegate createConnectionDelegate(String username, String password)
+ public CreateConnectionResult createConnectionDelegate(String username, String password, int failedNodeId)
throws JMSException
{
- return endpoint.createConnectionDelegate(username, password);
+ return endpoint.createConnectionDelegate(username, password, failedNodeId);
}
-
- public ConnectionStatus createFailoverConnectionDelegate(String username, String password, int failedNodeId)
- throws JMSException
- {
- return endpoint.createFailoverConnectionDelegate(username, password, failedNodeId);
- }
public byte[] getClientAOPConfig() throws JMSException
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -21,8 +21,8 @@
*/
package org.jboss.messaging.core.plugin.contract;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* A FailoverMapper
@@ -37,5 +37,5 @@
*/
public interface FailoverMapper
{
- Map generateMapping(List nodes);
+ Map generateMapping(Set nodes);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -627,7 +627,6 @@
Map valuesOnNode = (Map)entry.getValue();
out.println("<tr><td>Key</td><td>Value</td><td>Class of Value</td></tr>");
- int bindingCount=0;
for (Iterator valuesIterator=valuesOnNode.entrySet().iterator();valuesIterator.hasNext();)
{
Map.Entry entry2 = (Map.Entry)valuesIterator.next();
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -168,6 +168,8 @@
private ReplicationListener nodeAddressMapListener;
private boolean started;
+
+ private QueuedExecutor viewExecutor;
/*
* Constructor using Element for configuration
@@ -283,6 +285,8 @@
failoverMap = new LinkedHashMap();
leftSet = new HashSet();
+
+ viewExecutor = new QueuedExecutor();
}
// MessagingComponent overrides
@@ -664,21 +668,6 @@
return listBindingsForConditionInternal(condition, false);
}
- public int getFailoverNodeID(int nodeId)
- {
- synchronized (failoverMap)
- {
- Integer failoverNode = (Integer)failoverMap.get(new Integer(nodeId));
-
- if (failoverNode == null)
- {
- return nodeId;
- }
-
- return failoverNode.intValue();
- }
- }
-
public FailoverMapper getFailoverMapper()
{
return failoverMapper;
@@ -762,8 +751,11 @@
public void putReplicantLocally(int originatorNodeID, Serializable key, Serializable replicant)
throws Exception
{
+ log.info("##########putReplicantLocally received, before lock");
+
synchronized (replicatedData)
{
+ log.info("putReplicantLocally received, after lock");
Map m = (Map)replicatedData.get(key);
if (m == null)
@@ -777,6 +769,8 @@
notifyListeners(key, m, true, originatorNodeID);
}
+
+ log.info("putReplicantLocally, completed");
}
/**
@@ -862,6 +856,8 @@
{
lock.writeLock().release();
}
+
+ log.info("****** binding added");
}
/*
@@ -1430,7 +1426,7 @@
try
{
- log.info("Preparing failover against node " + failedNodeId);
+ log.info(this.currentNodeId + " is performing failover for node " + failedNodeId);
/*
We make sure a FailoverStatus object is put in the replicated data for the node
@@ -1456,9 +1452,11 @@
status.startFailingOverForNode(failedNodeId);
+ log.info("Putting state that failover is starting");
+
put(FAILED_OVER_FOR_KEY, status);
- log.info("Put into failed over map that starting failover");
+ log.info("Put state that failover is starting");
//Get the map of queues for the failed node
@@ -1523,13 +1521,11 @@
this.deleteBinding(failedNodeId, queueName);
log.info("deleted binding for " + queueName);
+
+ //Note we do not need to send an unbind request across the cluster - this is because
+ //when the node crashes a view change will hit the other nodes and that will cause
+ //all binding data for that node to be removed anyway
- //Then an unbind request is sent - this cause other nodes to also remove it from the in memory
- //condition and name maps
- UnbindRequest unbindRequest = new UnbindRequest(failedNodeId, queueName);
-
- syncSendRequest(unbindRequest);
-
//If there is already a queue registered with the same name, then we set a flag "failed" on the
//binding and then the queue will go into a special list of failed bindings
//otherwise we treat at as a normal queue
@@ -1586,7 +1582,9 @@
//TODO - should this be in a finally? I'm not sure
status.finishFailingOver();
+ log.info("Putting state that failover has completed");
put(FAILED_OVER_FOR_KEY, status);
+ log.info("Put state that failover has completed");
}
finally
{
@@ -2105,15 +2103,6 @@
}
}
- /**
- * Is the current node the failover node for node <nodeId>?
- * @param nodeId
- */
- private boolean isFailoverNodeForNode(int nodeId)
- {
- return this.currentNodeId == getFailoverNodeID(nodeId);
- }
-
private byte[] getStateAsBytes() throws Exception
{
List bindings = new ArrayList();
@@ -2322,26 +2311,55 @@
//Cleanup any hanging transactions - we do this irrespective of whether we crashed
check(theNodeId);
- //Need to evaluate this before we regenerate the failover map
- boolean isFailover = isFailoverNodeForNode(theNodeId.intValue());
-
- log.info("Am I failover node for node " + theNodeId + "? " + isFailover);
-
- log.info("Crashed: " + crashed);
-
- //Remove any replicant data and non durable bindings for the node - again we need to do this
- //irrespective of whether we crashed
- //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
- removeDataForNode(theNodeId);
-
- if (crashed && isFailover)
- {
- //The node crashed and we are the failover node
- //so let's perform failover
-
- //TODO server side valve
-
- failOver(theNodeId.intValue());
+ synchronized (failoverMap)
+ {
+ //Need to evaluate this before we regenerate the failover map
+ Integer failoverNode = (Integer)failoverMap.get(theNodeId);
+
+ if (failoverNode == null)
+ {
+ throw new IllegalStateException("Cannot find failover node for node " + theNodeId);
+ }
+
+ //debug dump failover map
+
+ Iterator iter = failoverMap.entrySet().iterator();
+
+ log.info("Dumping failover map");
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ Integer nodeId = (Integer)entry.getKey();
+
+ Integer failoverNodeId = (Integer)entry.getValue();
+
+ log.info("node->failover node: " + nodeId + " --> " + failoverNodeId);
+ }
+ log.info("end dump");
+
+ //end debug
+
+ boolean isFailover = failoverNode.intValue() == this.currentNodeId;
+
+ log.info("Am I failover node for node " + theNodeId + "? " + isFailover);
+
+ log.info("Crashed: " + crashed);
+
+ //Remove any replicant data and non durable bindings for the node - again we need to do this
+ //irrespective of whether we crashed
+ //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
+ removeDataForNode(theNodeId);
+
+ if (crashed && isFailover)
+ {
+ //The node crashed and we are the failover node
+ //so let's perform failover
+
+ //TODO server side valve
+
+ failOver(theNodeId.intValue());
+ }
}
}
@@ -2442,26 +2460,17 @@
public void viewAccepted(View newView)
{
- //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
- //TODO: can't we do the same since this is pretty useful?
- log.info(currentNodeId + " got new view: " + newView + " postOffice:"
- + DefaultClusteredPostOffice.this.getOfficeName());
-
- // JGroups will make sure this method is never called by more than one thread concurrently
-
- View oldView = currentView;
- currentView = newView;
-
try
{
- verifyMembership(oldView, newView);
+ //We queue up changes and execute them asynchronously.
+ //This is because JGroups will not let us do stuff like send synch messages
+ //using the same thread that delivered the view change and this is what we need to
+ //do in failover, for example.
+ viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
}
- catch (Throwable e)
+ catch (InterruptedException e)
{
- log.error("Caught Exception in MembershipListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
+ log.warn("Caught InterruptedException", e);
}
}
@@ -2471,9 +2480,47 @@
return null;
}
}
+
+ private void handleViewAccepted(View newView)
+ {
+ //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
+ //TODO: can't we do the same since this is pretty useful?
+ log.info(currentNodeId + " got new view: " + newView + " postOffice:"
+ + DefaultClusteredPostOffice.this.getOfficeName());
+ // JGroups will make sure this method is never called by more than one thread concurrently
+ View oldView = currentView;
+ currentView = newView;
+ try
+ {
+ verifyMembership(oldView, newView);
+ }
+ catch (Throwable e)
+ {
+ log.error("Caught Exception in MembershipListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
+ }
+
+ private class HandleViewAcceptedRunnable implements Runnable
+ {
+ private View newView;
+
+ HandleViewAcceptedRunnable(View newView)
+ {
+ this.newView = newView;
+ }
+
+ public void run()
+ {
+ handleViewAccepted(newView);
+ }
+ }
+
/*
* This class is used to listen for messages on the async channel
*/
@@ -2585,7 +2632,7 @@
private void generateFailoverMap(Map nodeAddressMap)
{
- failoverMap = failoverMapper.generateMapping(new ArrayList(nodeAddressMap.keySet()));
+ failoverMap = failoverMapper.generateMapping(nodeAddressMap.keySet());
}
}
}
\ No newline at end of file
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -21,10 +21,10 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.contract.FailoverMapper;
@@ -45,15 +45,17 @@
{
private static final Logger log = Logger.getLogger(DefaultFailoverMapper.class);
- /** This receives a List<Integer> of nodes and returns a Map<Integer,Integer> of nodes to be used on failover logic */
- public Map generateMapping(List nodes)
+ /*
+ * Generate a mapping given a set of nodes - nodes will be sorted by the method
+ * @see org.jboss.messaging.core.plugin.contract.FailoverMapper#generateMapping(java.util.Set)
+ */
+ public Map generateMapping(Set nodes)
{
- if (!(nodes instanceof ArrayList))
- {
- //Convert to array list for fast index access
- nodes = new ArrayList(nodes);
- }
+ Integer[] nodesArr = (Integer[])nodes.toArray(new Integer[nodes.size()]);
+ //First sort them so every node has a consistent view
+ Arrays.sort(nodesArr);
+
int s = nodes.size();
log.info("Generating failover mapping, node size= "+ s);
@@ -70,7 +72,7 @@
j = 0;
}
- failoverNodes.put(nodes.get(i), nodes.get(j));
+ failoverNodes.put(nodesArr[i], nodesArr[j]);
}
return failoverNodes;
Modified: branches/Branch_Client_Failover_Experiment/tests/etc/log4j.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/etc/log4j.xml 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/tests/etc/log4j.xml 2006-12-11 15:27:26 UTC (rev 1763)
@@ -37,9 +37,13 @@
</category>
<category name="org.jgroups">
- <priority value="WARN"/>
+ <priority value="TRACE"/>
</category>
+ <category name="org.jboss.remoting">
+ <priority value="TRACE"/>
+ </category>
+
<category name="org.jboss">
<priority value="INFO"/>
</category>
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -22,9 +22,12 @@
package org.jboss.test.messaging.core.plugin.postoffice.cluster;
-import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+
import junit.framework.TestCase;
+
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
/**
@@ -38,18 +41,18 @@
public void testMapper()
{
- ArrayList list = new ArrayList();
+ Set set = new HashSet();
- list.add(new Integer(50));
- list.add(new Integer(25));
- list.add(new Integer(15));
+ set.add(new Integer(50));
+ set.add(new Integer(25));
+ set.add(new Integer(15));
DefaultFailoverMapper mapper = new DefaultFailoverMapper();
- Map map = mapper.generateMapping(list);
+ Map map = mapper.generateMapping(set);
- assertEquals(new Integer(15),map.get(new Integer(50)));
- assertEquals(new Integer(50),map.get(new Integer(25)));
- assertEquals(new Integer(25),map.get(new Integer(15)));
+ assertEquals(new Integer(15),map.get(new Integer(25)));
+ assertEquals(new Integer(50),map.get(new Integer(15)));
+ assertEquals(new Integer(25),map.get(new Integer(50)));
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-11 11:52:22 UTC (rev 1762)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-11 15:27:26 UTC (rev 1763)
@@ -434,27 +434,44 @@
ClientConnectionFactoryDelegate cf2 = delegates[1];
+ ClientConnectionFactoryDelegate cf3 = delegates[2];
+
int server0Id = cf1.getServerId();
int server1Id = cf2.getServerId();
+
+ int server2Id = cf3.getServerId();
+
+ log.info("server 0 id: " + server0Id);
+
+ log.info("server 1 id: " + server1Id);
+
+ log.info("server 2 id: " + server2Id);
Map failoverMap = delegate.getFailoverMap();
- int server0FailoverId = ((Integer)failoverMap.get(new Integer(server0Id))).intValue();
+ log.info(failoverMap.get(new Integer(server0Id)));
+ log.info(failoverMap.get(new Integer(server1Id)));
+ log.info(failoverMap.get(new Integer(server2Id)));
- // server 0 should failover onto server 1
+ int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
- assertEquals(server1Id, server0FailoverId);
+ // server 1 should failover onto server 2
+ assertEquals(server2Id, server1FailoverId);
+
Connection conn = null;
try
{
- //Send a message into a queue on server 0
+ //Get a connection on server 1
+ conn = factory.createConnection(); //connection on server 0
- conn = factory.createConnection();
+ conn.close();
+ conn = factory.createConnection(); //connection on server 1
+
JBossConnection jbc = (JBossConnection)conn;
ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
@@ -463,13 +480,13 @@
int initialServerID = state.getServerID();
- assertEquals(0, initialServerID);
+ assertEquals(1, initialServerID);
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sess.createProducer(queue0);
+ MessageProducer prod = sess.createProducer(queue1);
- MessageConsumer cons = sess.createConsumer(queue0);
+ MessageConsumer cons = sess.createConsumer(queue1);
final int NUM_MESSAGES = 100;
@@ -480,17 +497,17 @@
prod.send(tm);
}
- //So now, messages should be in queue0 on server 0
- //So we now kill server 0
+ //So now, messages should be in queue1 on server 1
+ //So we now kill server 1
//Which should cause transparent failover of connection conn onto server 1
- log.info("************ KILLING (CRASHING) SERVER 0");
+ log.info("************ KILLING (CRASHING) SERVER 1");
- ServerManagement.getServer(0).destroy();
+ ServerManagement.getServer(1).destroy();
log.info("killed server, now waiting");
- Thread.sleep(25000);
+ Thread.sleep(5000);
log.info("done wait");
@@ -500,16 +517,12 @@
log.info("final server id= " + finalServerID);
- //server id should now be 1
+ //server id should now be 2
- assertEquals(1, finalServerID);
+ assertEquals(2, finalServerID);
- log.info("here 2");
-
conn.start();
- log.info("here 3");
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = (TextMessage)cons.receive(1000);
@@ -520,7 +533,7 @@
assertEquals("message:" + i, tm.getText());
}
- log.info("here 4");
+ log.info("done");
}
finally
{
@@ -539,42 +552,42 @@
}
- public void testEvenSimplerFailover() throws Exception
- {
- JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-
- Connection conn = null;
-
- try
- {
- conn = factory.createConnection();
-
- log.info("************ KILLING (CRASHING) SERVER 0");
-
- ServerManagement.getServer(0).destroy();
-
- log.info("killed server, now waiting");
-
- Thread.sleep(25000);
-
- log.info("done wait");
- }
- finally
- {
- if (conn != null)
- {
- try
- {
- conn.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- }
-
- }
+// public void testEvenSimplerFailover() throws Exception
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//
+// Connection conn = null;
+//
+// try
+// {
+// conn = factory.createConnection();
+//
+// log.info("************ KILLING (CRASHING) SERVER 0");
+//
+// ServerManagement.getServer(0).destroy();
+//
+// log.info("killed server, now waiting");
+//
+// Thread.sleep(25000);
+//
+// log.info("done wait");
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// try
+// {
+// conn.close();
+// }
+// catch (Exception e)
+// {
+// e.printStackTrace();
+// }
+// }
+// }
+//
+// }
// public void testConnectionFactoryConnect() throws Exception
More information about the jboss-cvs-commits
mailing list