[jboss-cvs] JBoss Messaging SVN: r1771 - in trunk: src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/tx tests tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 12 13:26:15 EST 2006
Author: timfox
Date: 2006-12-12 13:25:55 -0500 (Tue, 12 Dec 2006)
New Revision: 1771
Removed:
trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java
Modified:
trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
trunk/src/main/org/jboss/jms/client/JBossSession.java
trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
trunk/src/main/org/jboss/jms/client/container/HAAspect.java
trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/client/state/BrowserState.java
trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java
trunk/src/main/org/jboss/jms/client/state/ProducerState.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
trunk/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
trunk/src/main/org/jboss/jms/tx/ResourceManager.java
trunk/src/main/org/jboss/jms/tx/TxState.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/jms/ConnectionTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
More HA stuff
Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -143,7 +143,7 @@
// call pre or postDeliver so messages won't be acked, or stored in session/tx
sess = conn.createSessionDelegate(false, Session.CLIENT_ACKNOWLEDGE, false);
- cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true);
+ cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true, -1);
}
finally
{
Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -252,7 +252,7 @@
tccc.set(getClass().getClassLoader());
ConsumerDelegate consumerDelegate = delegate.
- createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false);
+ createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false, -1);
return new JBossMessageConsumer(consumerDelegate);
}
@@ -305,7 +305,7 @@
tccc.set(getClass().getClassLoader());
ConsumerDelegate consumerDelegate =
- delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false);
+ delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false, -1);
return new JBossMessageConsumer(consumerDelegate);
}
@@ -339,7 +339,7 @@
messageSelector = null;
}
ConsumerDelegate consumerDelegate =
- delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false);
+ delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false, -1);
return new JBossMessageConsumer(consumerDelegate);
}
Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -30,9 +30,7 @@
import org.jboss.jms.client.JBossConnectionMetaData;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.message.MessageIdGeneratorFactory;
-import org.jboss.jms.tx.ResourceManagerFactory;
import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
import org.jboss.remoting.ConnectionListener;
@@ -200,10 +198,7 @@
// Finished with the connection - we need to shutdown callback server
state.getRemotingConnection().stop();
-
- // Remove reference to resource manager
- ResourceManagerFactory.instance.checkInResourceManager(state.getServerID());
-
+
// Remove reference to message id generator
MessageIdGeneratorFactory.instance.checkInGenerator(state.getServerID());
Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -23,9 +23,11 @@
package org.jboss.jms.client.container;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.jms.JMSException;
@@ -50,6 +52,7 @@
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.server.endpoint.CreateConnectionResult;
import org.jboss.jms.tx.AckInfo;
+import org.jboss.jms.tx.ResourceManager;
import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
import org.jboss.remoting.ConnectionListener;
@@ -77,6 +80,7 @@
//Cache this here
private ClientConnectionFactoryDelegate[] delegates;
+ //Cache this here
private Map failoverMap;
private int currentRobinIndex;
@@ -125,6 +129,7 @@
{
currentRobinIndex = 0;
}
+
return currentDelegate;
}
@@ -136,7 +141,6 @@
MethodInvocation methodInvoke = (MethodInvocation)invocation;
- // TODO: FIX THIS! metaData should contain CF_DELEGATES
Object target = methodInvoke.getTargetObject();
if (target instanceof ClusteredClientConnectionFactoryDelegate)
@@ -146,8 +150,6 @@
if (delegates != null)
{
- //TODO: Fix this! metadata should contain CF_FAILOVER_INDEXES
- //failoverIndexes = (int[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_FAILOVER_INDEXES);
failoverMap = ((ClusteredClientConnectionFactoryDelegate)target).getFailoverMap();
if (failoverMap == null)
@@ -167,15 +169,14 @@
CreateConnectionResult res = (CreateConnectionResult)cf.createConnectionDelegate(username, password, -1);
- ClientConnectionDelegate connDelegate =
- (ClientConnectionDelegate)res.getDelegate();
+ ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)res.getDelegate();
- initialiseConnection(connDelegate);
+ addListener(connDelegate);
return connDelegate;
}
- private void initialiseConnection(ClientConnectionDelegate connDelegate)
+ private void addListener(ClientConnectionDelegate connDelegate)
{
//Add a connection listener
@@ -186,20 +187,19 @@
state.getRemotingConnection().getInvokingClient().addConnectionListener(listener);
}
-
//The connection has failed
private void handleFailure(ClientConnectionDelegate failedConnection) throws Exception
{
log.info("Handling failure");
+ //Get the connection factory we are going to failover onto
ClientConnectionFactoryDelegate newCF = getFailoverDelegate(failedConnection);
-
- //TODO implement client side valve to prevent invocations occurring whilst failover is occurring
-
+
ConnectionState state = (ConnectionState)((DelegateSupport)failedConnection).getState();
log.info("calling createFailoverConnectionDelegate");
+ //Create a connection using that connection factory
CreateConnectionResult res =
newCF.createConnectionDelegate(state.getUser(), state.getPassword(), state.getServerID());
@@ -209,12 +209,12 @@
{
log.info("Got connection");
+ //We got the right server and created a new connection ok
+
ClientConnectionDelegate newConnection = (ClientConnectionDelegate)res.getDelegate();
log.info("newconnection is " + newConnection);
- //We got the right server and created a new connection
-
failover(failedConnection, newConnection);
}
else
@@ -289,71 +289,54 @@
ConnectionState failedState = (ConnectionState)failedConnection.getState();
- int oldServerID = failedState.getServerID();
-
ConnectionState newState = (ConnectionState)newConnection.getState();
- log.info("new state is: " + newState);
-
- failedState.copy(newState);
-
- // this is necessary so the connection will start "talking" to the new server instead
- failedState.setRemotingConnection(newState.getRemotingConnection());
-
if (failedState.getClientID() != null)
{
newConnection.setClientID(failedState.getClientID());
}
- // Transfering state from newDelegate to currentDelegate
- failedConnection.copyState(newConnection);
+ // Transfer attributes from newDelegate to failedDelegate
+ failedConnection.copyAttributes(newConnection);
+ int oldServerId = failedState.getServerID();
+
+ CallbackManager oldCallbackManager = failedState.getRemotingConnection().getCallbackManager();
+
+ //We need to update some of the attributes on the state
+ failedState.copyState(newState);
+
log.info("failing over children");
for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
{
SessionState failedSessionState = (SessionState)i.next();
- log.info("Creating session");
-
+ ClientSessionDelegate failedSessionDelegate =
+ (ClientSessionDelegate)failedSessionState.getDelegate();
+
ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newConnection.
createSessionDelegate(failedSessionState.isTransacted(),
failedSessionState.getAcknowledgeMode(),
failedSessionState.isXA());
- log.info("Created session");
-
- ClientSessionDelegate failedSessionDelegate =
- (ClientSessionDelegate)failedSessionState.getDelegate();
+ SessionState newSessionState = (SessionState)newSessionDelegate.getState();
- failedSessionDelegate.copyState(newSessionDelegate);
+ failedSessionDelegate.copyAttributes(newSessionDelegate);
- log.info("copied state");
+ //We need to update some of the attributes on the state
+ newSessionState.copyState(newSessionState);
- //Now we remove any unacked np messages - this is because we don't want to ack them
- //since the server won't know about them and will barf
- Iterator iter = failedSessionState.getToAck().iterator();
+ if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
- while (iter.hasNext())
- {
- AckInfo info = (AckInfo)iter.next();
-
- if (!info.getMessage().getMessage().isReliable())
- {
- iter.remove();
- }
- }
+ List children = new ArrayList();
- //TODO remove any unacked from the resource manager
-
- if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
-
- //TODO Clebert please add comment as to why this clone is necessary
- //In general, please comment more - there is a serious lack of comments!!
- List children = new ArrayList();
+ // TODO Why is this clone necessary?
children.addAll(failedSessionState.getChildren());
+
+ Set consumerIds = new HashSet();
- for(Iterator j = children.iterator(); j.hasNext(); )
+ for (Iterator j = children.iterator(); j.hasNext(); )
{
HierarchicalStateSupport sessionChild = (HierarchicalStateSupport)j.next();
@@ -362,45 +345,83 @@
handleFailoverOnProducer((ProducerState)sessionChild, newSessionDelegate);
}
else if (sessionChild instanceof ConsumerState)
- {
+ {
handleFailoverOnConsumer(failedConnection,
failedState,
failedSessionState,
(ConsumerState)sessionChild,
failedSessionDelegate,
- oldServerID);
+ oldServerId,
+ oldCallbackManager);
+
+ // We add the new consumer id to the list of old ids
+ consumerIds.add(new Integer(((ConsumerState)sessionChild).getConsumerID()));
+
}
else if (sessionChild instanceof BrowserState)
{
handleFailoverOnBrowser((BrowserState)sessionChild, newSessionDelegate);
}
}
-
+
/* Now we must sent the list of unacked AckInfos to the server - so the consumers
* delivery lists can be repopulated
*/
List ackInfos = null;
- if (!failedSessionState.isTransacted())
+ if (!failedSessionState.isTransacted() && !failedSessionState.isXA())
{
+ /*
+ Now we remove any unacked np messages - this is because we don't want to ack them
+ since the server won't know about them and will barf
+ */
+
+ Iterator iter = newSessionState.getToAck().iterator();
+
+ while (iter.hasNext())
+ {
+ AckInfo info = (AckInfo)iter.next();
+
+ if (!info.getMessage().getMessage().isReliable())
+ {
+ iter.remove();
+ }
+ }
+
//Get the ack infos from the list in the session state
ackInfos = failedSessionState.getToAck();
}
else
- {
- //Transacted session - we need to get the acks
- //TODO
+ {
+ //Transacted session - we need to get the acks from the resource manager
+ //btw we have kept the old resource manager
+ ResourceManager rm = failedState.getResourceManager();
+
+ // Remove any non persistent acks - so server doesn't barf on commit
+
+ rm.removeNonPersistentAcks(consumerIds);
+
+ ackInfos = rm.getAckInfosForConsumerIds(consumerIds);
}
- //TODO for a transacted session the ackinfos will be in the resource manager!!
-
if (!ackInfos.isEmpty())
{
+ log.info("Sending " + ackInfos.size() + " unacked");
newSessionDelegate.sendUnackedAckInfos(ackInfos);
- }
-
+ }
}
+// problem - what if the consumer has closed - but there are still acks in the session or rm?
+//
+// we still need to replace them but with what?
+//
+// in this case we can't recreate a consumer on the server since it has closed
+//
+// solution here is to store by session id - major reworking!!!!!!!!
+
+
+ // todo need to replace consumer id
+
//We must not start the connection until the end
if (failedState.isStarted())
{
@@ -410,14 +431,13 @@
log.info("Failover done");
}
-
-
- private void handleFailoverOnConsumer(ClientConnectionDelegate connectionDelegate,
+ private void handleFailoverOnConsumer(ClientConnectionDelegate failedConnectionDelegate,
ConnectionState failedConnectionState,
SessionState failedSessionState,
ConsumerState failedConsumerState,
ClientSessionDelegate failedSessionDelegate,
- int oldServerID)
+ int oldServerID,
+ CallbackManager oldCallbackManager)
throws JMSException
{
log.info("Failing over consumer");
@@ -428,48 +448,63 @@
if (trace) { log.trace("handleFailoverOnConsumer: creating alternate consumer"); }
ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
- failOverConsumer((JBossDestination)failedConsumerState.getDestination(),
- failedConsumerState.getSelector(),
- failedConsumerState.isNoLocal(),
- failedConsumerState.getSubscriptionName(),
- failedConsumerState.isConnectionConsumer(),
- failedConsumerDelegate.getChannelId());
+ createConsumerDelegate((JBossDestination)failedConsumerState.getDestination(),
+ failedConsumerState.getSelector(),
+ failedConsumerState.isNoLocal(),
+ failedConsumerState.getSubscriptionName(),
+ failedConsumerState.isConnectionConsumer(),
+ failedConsumerState.getChannelId());
if (trace) { log.trace("handleFailoverOnConsumer: alternate consumer created"); }
+
+ //Copy the attributes from the new consumer to the old consumer
+ failedConsumerDelegate.copyAttributes(newConsumerDelegate);
- failedConsumerDelegate.copyState(newConsumerDelegate);
-
+ ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
+
int oldConsumerID = failedConsumerState.getConsumerID();
+
+ //Update attributes on the old state
+ failedConsumerState.copyState(newState);
- ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
- failedConsumerState.copy(newState);
-
- if (failedSessionState.isTransacted())
+ if (failedSessionState.isTransacted() || failedSessionState.isXA())
{
//Replace the old consumer id with the new consumer id
- //TODO what about XA?? - may have done work in many transactions - so need to replace all
+ ResourceManager rm = failedConnectionState.getResourceManager();
- failedConnectionState.getResourceManager().
- handleFailover(failedSessionState.getCurrentTxId(),
- oldConsumerID,
- failedConsumerState.getConsumerID());
+ rm.handleFailover(oldConsumerID, failedConsumerState.getConsumerID());
}
-
- CallbackManager cm = failedConnectionState.getRemotingConnection().getCallbackManager();
-
- MessageCallbackHandler handler = cm.unregisterHandler(oldServerID, oldConsumerID);
- handler.setConsumerId(failedConsumerState.getConsumerID());
+
+ //We need to re-use the existing message callback handler
+
+ log.info("Old server id:" + oldServerID + " old consumer id:" + oldConsumerID);
+ MessageCallbackHandler oldHandler = oldCallbackManager.unregisterHandler(oldServerID, oldConsumerID);
- //Clear the buffer of the handler
- handler.clearBuffer();
-
- cm.registerHandler(failedConnectionState.getServerID(),
- failedConsumerState.getConsumerID(),
- handler);
+ ConnectionState newConnectionState = (ConnectionState)failedConnectionDelegate.getState();
- failedSessionState.addCallbackHandler(handler);
+ CallbackManager newCallbackManager = newConnectionState.getRemotingConnection().getCallbackManager();
+ log.info("New server id:" + newConnectionState.getServerID() + " new consuer id:" + newState.getConsumerID());
+
+ //Remove the new handler
+ MessageCallbackHandler newHandler = newCallbackManager.unregisterHandler(newConnectionState.getServerID(),
+ newState.getConsumerID());
+
+ log.info("New handler is " + System.identityHashCode(newHandler));
+
+ //But we need to update some fields from the new one
+ oldHandler.copyState(newHandler);
+
+ //Now we re-register the old handler with the new callback manager
+
+ newCallbackManager.registerHandler(newConnectionState.getServerID(),
+ newState.getConsumerID(),
+ oldHandler);
+
+ //We don't need to add the handler to the session state since it is already there - we
+ //are re-using the old handler
+
log.info("failed over consumer");
}
@@ -484,8 +519,10 @@
ClientProducerDelegate failedProducerDelegate =
(ClientProducerDelegate)failedProducerState.getDelegate();
- failedProducerDelegate.copyState(newProducerDelegate);
-
+ failedProducerDelegate.copyAttributes(newProducerDelegate);
+
+ failedProducerState.copyState((ProducerState)newProducerDelegate.getState());
+
if (trace) { log.trace("handling fail over on producerDelegate " + failedProducerDelegate + " destination=" + failedProducerState.getDestination()); }
}
@@ -499,8 +536,10 @@
ClientBrowserDelegate failedBrowserDelegate =
(ClientBrowserDelegate)failedBrowserState.getDelegate();
- failedBrowserDelegate.copyState(newBrowserDelegate);
-
+ failedBrowserDelegate.copyAttributes(newBrowserDelegate);
+
+ failedBrowserState.copyState((BrowserState)newBrowserDelegate.getState());
+
if (trace) { log.trace("handling fail over on browserDelegate " + failedBrowserDelegate + " destination=" + failedBrowserState.getJmsDestination() + " selector=" + failedBrowserState.getMessageSelector()); }
}
Modified: trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -23,11 +23,10 @@
import javax.jms.Destination;
-import org.jboss.aop.Advised;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.aop.metadata.SimpleMetaData;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.client.delegate.ClientProducerDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
@@ -39,7 +38,6 @@
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.delegate.BrowserDelegate;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
-import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.ProducerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.destination.JBossDestination;
@@ -47,9 +45,6 @@
import org.jboss.jms.message.MessageIdGeneratorFactory;
import org.jboss.jms.server.Version;
import org.jboss.jms.server.endpoint.CreateConnectionResult;
-import org.jboss.jms.server.remoting.MetaDataConstants;
-import org.jboss.jms.tx.ResourceManager;
-import org.jboss.jms.tx.ResourceManagerFactory;
/**
* Maintains the hierarchy of parent and child state objects. For each delegate, this interceptor
@@ -89,37 +84,29 @@
ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)res.getDelegate();
if (connectionDelegate != null)
- {
-
+ {
connectionDelegate.init();
- SimpleMetaData md = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
+ //SimpleMetaData md = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
- int serverID =
- ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.SERVER_ID)).intValue();
+ int serverID = connectionDelegate.getServerId();
- Version connectionVersion =
- (Version)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION);
+ Version versionToUse = connectionDelegate.getVersionToUse();
- JMSRemotingConnection connection =
- (JMSRemotingConnection)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION);
+ JMSRemotingConnection connection = connectionDelegate.getRemotingConnection();
- if (connectionVersion == null)
+ if (versionToUse == null)
{
throw new IllegalStateException("Connection version is null");
}
- // We have one resource manager per unique server
- ResourceManager rm = ResourceManagerFactory.instance.checkOutResourceManager(serverID);
-
//We have one message id generator per unique server
MessageIdGenerator gen =
MessageIdGeneratorFactory.instance.checkOutGenerator(serverID, cfd);
ConnectionState connectionState =
new ConnectionState(serverID, connectionDelegate,
- connection,
- connectionVersion, rm, gen);
+ connection, versionToUse, gen);
connectionDelegate.setState(connectionState);
}
@@ -150,7 +137,7 @@
public Object handleCreateConsumerDelegate(Invocation invocation) throws Throwable
{
- ConsumerDelegate consumerDelegate = (ConsumerDelegate)invocation.invokeNext();
+ ClientConsumerDelegate consumerDelegate = (ClientConsumerDelegate)invocation.invokeNext();
DelegateSupport delegate = (DelegateSupport)consumerDelegate;
delegate.init();
@@ -164,21 +151,18 @@
String subscriptionName = (String)mi.getArguments()[3];
boolean connectionConsumer = ((Boolean)mi.getArguments()[4]).booleanValue();
- SimpleMetaData md = ((Advised)consumerDelegate)._getInstanceAdvisor().getMetaData();
+ int consumerID = consumerDelegate.getID();
- int consumerID =
- ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID)).intValue();
+ int prefetchSize = consumerDelegate.getPrefetchSize();
- int prefetchSize =
- ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.PREFETCH_SIZE)).intValue();
+ int maxDeliveries = consumerDelegate.getMaxDeliveries();
- int maxDeliveries =
- ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.MAX_DELIVERIES)).intValue();
+ long channelId = consumerDelegate.getChannelId();
ConsumerState consumerState =
new ConsumerState(sessionState, consumerDelegate, dest, selector, noLocal,
subscriptionName, consumerID, connectionConsumer, prefetchSize,
- maxDeliveries);
+ maxDeliveries, channelId);
delegate.setState(consumerState);
return consumerDelegate;
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -28,13 +28,12 @@
import javax.jms.ServerSessionPool;
import javax.transaction.xa.Xid;
-import org.jboss.aop.util.PayloadKey;
import org.jboss.jms.client.JBossConnectionConsumer;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.SessionDelegate;
-import org.jboss.jms.server.remoting.MetaDataConstants;
+import org.jboss.jms.server.Version;
import org.jboss.jms.tx.TransactionRequest;
import org.jboss.remoting.Client;
@@ -57,11 +56,12 @@
// Attributes ----------------------------------------------------
- // This should not be exposed other than through meta data
private int serverId;
private transient JMSRemotingConnection remotingConnection;
+ private Version versionToUse;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -223,8 +223,6 @@
public void init()
{
super.init();
- getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.SERVER_ID,
- new Integer(serverId), PayloadKey.TRANSIENT);
}
public void setRemotingConnection(JMSRemotingConnection conn)
@@ -236,8 +234,32 @@
{
return remotingConnection;
}
+
+ public int getServerId()
+ {
+ return serverId;
+ }
+
+ public Version getVersionToUse()
+ {
+ return versionToUse;
+ }
+
+ public void setVersionToUse(Version versionToUse)
+ {
+ this.versionToUse = versionToUse;
+ }
+
+ public void copyAttributes(DelegateSupport newDelegate)
+ {
+ super.copyAttributes(newDelegate);
+
+ this.remotingConnection = ((ClientConnectionDelegate)newDelegate).getRemotingConnection();
+
+ this.versionToUse = ((ClientConnectionDelegate)newDelegate).getVersionToUse();
+ }
- // Protected -----------------------------------------------------
+ // Protected -----------------------------------------------------
protected Client getClient()
{
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -26,7 +26,6 @@
import javax.jms.JMSException;
-import org.jboss.aop.Advised;
import org.jboss.aop.Dispatcher;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
@@ -68,13 +67,14 @@
// Attributes ----------------------------------------------------
//This data is needed in order to create a connection
- protected String serverLocatorURI;
- protected Version serverVersion;
-
- // This property is used on redirect on failover logic (verify if a new delegate could be used during a failover)
- protected int serverId;
- protected boolean clientPing;
+ private String serverLocatorURI;
+
+ private Version serverVersion;
+
+ private int serverId;
+ private boolean clientPing;
+
private transient boolean trace;
// Static --------------------------------------------------------
@@ -271,18 +271,9 @@
if (connectionDelegate != null)
{
- //We set the version for the connection and the remoting connection on the meta-data
- //this is so the StateCreationAspect can pick it up
-
- SimpleMetaData metaData = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
-
- metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION,
- remotingConnection, PayloadKey.TRANSIENT);
-
- metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION,
- version, PayloadKey.TRANSIENT);
-
connectionDelegate.setRemotingConnection(remotingConnection);
+
+ connectionDelegate.setVersionToUse(version);
}
else
{
@@ -296,7 +287,6 @@
{
}
}
-
}
return ret;
@@ -307,7 +297,6 @@
return "ClientConnectionFactoryDelegate[" + id + "]";
}
- //This MUST ONLY be used in testing
public String getServerLocatorURI()
{
return serverLocatorURI;
@@ -317,9 +306,26 @@
{
return serverId;
}
+
+ public boolean getClientPing()
+ {
+ return clientPing;
+ }
+
+ public Version getServerVersion()
+ {
+ return serverVersion;
+ }
+
+ public void copyAttributes(DelegateSupport newDelegate)
+ {
+ super.copyAttributes(newDelegate);
+ }
// Protected -----------------------------------------------------
+
+
protected Client getClient()
{
return null;
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -25,11 +25,9 @@
import javax.jms.Message;
import javax.jms.MessageListener;
-import org.jboss.aop.util.PayloadKey;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.server.remoting.MetaDataConstants;
import org.jboss.remoting.Client;
/**
@@ -50,11 +48,10 @@
// Attributes ----------------------------------------------------
- // This should not be exposed other than through meta data
private int bufferSize;
- protected int maxDeliveries;
+
+ private int maxDeliveries;
- // This should not be exposed other than through meta data
private long channelId;
// Static --------------------------------------------------------
@@ -73,11 +70,6 @@
{
}
- public long getChannelId()
- {
- return channelId;
- }
-
// ConsumerDelegate implementation -------------------------------
/**
@@ -182,24 +174,41 @@
// Public --------------------------------------------------------
- public void init()
- {
- super.init();
- getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID,
- new Integer(id), PayloadKey.TRANSIENT);
- getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.PREFETCH_SIZE,
- new Integer(bufferSize), PayloadKey.TRANSIENT);
- getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.MAX_DELIVERIES,
- new Integer(maxDeliveries), PayloadKey.TRANSIENT);
- }
-
public String toString()
{
return "ConsumerDelegate[" + id + "](ChannelId=" + this.channelId+")" ;
}
+
+ public int getPrefetchSize()
+ {
+ return bufferSize;
+ }
+
+ public int getMaxDeliveries()
+ {
+ return maxDeliveries;
+ }
+
+ public long getChannelId()
+ {
+ return channelId;
+ }
+
+ public void copyAttributes(DelegateSupport newDelegate)
+ {
+ super.copyAttributes(newDelegate);
+
+ this.bufferSize = ((ClientConsumerDelegate)newDelegate).getPrefetchSize();
+
+ this.maxDeliveries = ((ClientConsumerDelegate)newDelegate).getMaxDeliveries();
+
+ this.channelId = ((ClientConsumerDelegate)newDelegate).getChannelId();
+ }
// Protected -----------------------------------------------------
+
+
protected Client getClient()
{
// Use the Client in the Connection's state
@@ -207,20 +216,7 @@
getInvokingClient();
}
- public void copyState(DelegateSupport newDelegate)
- {
- super.copyState(newDelegate);
- this.channelId = ((ClientConsumerDelegate)newDelegate).channelId;
- this.getMetaData().removeMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID);
- this.getMetaData().addMetaData(MetaDataConstants.JMS,
- MetaDataConstants.CONSUMER_ID,
- newDelegate.getMetaData().
- getMetaData(MetaDataConstants.JMS,
- MetaDataConstants.CONSUMER_ID),
- PayloadKey.TRANSIENT);
- }
-
// Package Private -----------------------------------------------
// Private -------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -187,25 +187,12 @@
*/
public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
- boolean connectionConsumer) throws JMSException
+ boolean connectionConsumer, long failoverChannelId) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
/**
- * @see org.jboss.jms.server.endpoint.ServerSessionEndpoint#failOverConsumer(org.jboss.jms.destination.JBossDestination, String, boolean, String, boolean, long, int)
- * @see org.jboss.jms.client.container.StateCreationAspect#handleCreateConsumerDelegate(org.jboss.aop.joinpoint.Invocation)
- * */
- public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
- String selectorString,
- boolean noLocal, String subscriptionName,
- boolean connectionConsumer,
- long oldChannelID) throws JMSException
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
- /**
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -73,8 +73,8 @@
ClientConnectionFactoryDelegate[] delegates,
Map failoverMap)
{
- this(mainDelegate.getID(), mainDelegate.serverId, mainDelegate.serverLocatorURI,
- mainDelegate.serverVersion, mainDelegate.clientPing, delegates, failoverMap);
+ this(mainDelegate.getID(), mainDelegate.getServerId(), mainDelegate.getServerLocatorURI(),
+ mainDelegate.getServerVersion(), mainDelegate.getClientPing(), delegates, failoverMap);
}
// DelegateSupport overrides -------------------------------------
@@ -89,18 +89,7 @@
{
delegates[i].init();
}
- }
-
- //This doesn't seem to be used so I'm commenting it out
-
-// // We add this to the meta data so the failOver aspect can get access to it
-// getMetaData().addMetaData(MetaDataConstants.JMS,
-// MetaDataConstants.CF_DELEGATES,
-// delegates, PayloadKey.TRANSIENT);
-//
-// getMetaData().addMetaData(MetaDataConstants.JMS,
-// MetaDataConstants.FAILOVER_MAP,
-// failoverMap, PayloadKey.TRANSIENT);
+ }
}
// Public --------------------------------------------------------
@@ -112,7 +101,6 @@
return delegates;
}
- /** TODO As metadata is not working, I'm exposing this temporarily */
public Map getFailoverMap()
{
return failoverMap;
Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -152,7 +152,7 @@
* has to be transfered to this actual object. For example, a Connection will have to assume the
* ObjectID of the new connection endpoint and the new RemotingConnection.
*/
- public void copyState(DelegateSupport newDelegate)
+ public void copyAttributes(DelegateSupport newDelegate)
{
id = newDelegate.getID();
}
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -94,17 +94,13 @@
public void registerHandler(int serverID, int consumerID, MessageCallbackHandler handler)
{
- log.info(this + " registeringHandler, serverID:" + serverID + " consumerID:" + consumerID);
-
Long lookup = computeLookup(serverID, consumerID);
callbackHandlers.put(lookup, handler);
}
public MessageCallbackHandler unregisterHandler(int serverID, int consumerID)
- {
- log.info(this + " unregisteringHandler, serverID:" + serverID + " consumerID:" + consumerID);
-
+ {
Long lookup = computeLookup(serverID, consumerID);
return (MessageCallbackHandler)callbackHandlers.remove(lookup);
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -36,6 +36,7 @@
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.tx.AckInfo;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Message;
import org.jboss.messaging.util.Future;
import org.jboss.remoting.callback.HandleCallbackException;
@@ -216,9 +217,9 @@
* or -1 if closed
*/
public HandleMessageResponse handleMessage(List msgs) throws HandleCallbackException
- {
+ {
if (trace) { log.trace(this + " receiving " + msgs.size() + " message(s) from the remoting layer"); }
-
+
synchronized (mainLock)
{
if (closed)
@@ -516,12 +517,7 @@
messagesAdded();
}
}
-
- public void clearBuffer()
- {
- buffer.clear();
- }
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -780,6 +776,22 @@
consumerDelegate.confirmDelivery(count);
}
}
+
+ public void copyState(MessageCallbackHandler newHandler)
+ {
+ synchronized (mainLock)
+ {
+ this.consumerID = newHandler.consumerID;
+
+ this.consumerDelegate = newHandler.consumerDelegate;
+
+ this.sessionDelegate = newHandler.sessionDelegate;
+
+ this.serverSending = false;
+
+ this.buffer.clear();
+ }
+ }
}
Modified: trunk/src/main/org/jboss/jms/client/state/BrowserState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/BrowserState.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/BrowserState.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -67,7 +67,6 @@
return parent.getVersionToUse();
}
-
public org.jboss.jms.destination.JBossDestination getJmsDestination()
{
return jmsDestination;
@@ -77,15 +76,23 @@
{
return messageSelector;
}
-
+
public void setParent(HierarchicalState parent)
{
this.parent=(SessionState)parent;
}
+
public HierarchicalState getParent()
{
return parent;
}
-
+
+ // When failing over a browser, we keep the old browser's state but there are certain fields
+ // we need to update
+ public void copyState(BrowserState newState)
+ {
+ //Actually only one field
+ this.delegate = newState.delegate;
+ }
}
Modified: trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConnectionState.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/ConnectionState.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -70,7 +70,6 @@
protected boolean started;
-
/** This property used to be delcared on ConnectionAspect */
private String clientID;
@@ -83,10 +82,9 @@
/** This property used to be delcared on ConnectionAspect */
private boolean listenerAdded;
-
public ConnectionState(int serverID, ConnectionDelegate delegate,
JMSRemotingConnection remotingConnection, Version versionToUse,
- ResourceManager rm, MessageIdGenerator gen)
+ MessageIdGenerator gen)
throws Exception
{
super(null, (DelegateSupport)delegate);
@@ -99,7 +97,10 @@
this.versionToUse = versionToUse;
- this.resourceManager = rm;
+ //Each connection has it's own resource manager
+ //If we can failover all connections with the same server id at the same time
+ //then we can maintain one rm per unique server as opposed to per connection
+ this.resourceManager = new ResourceManager();
this.idGenerator = gen;
@@ -230,12 +231,19 @@
{
return null;
}
-
- public void copy(ConnectionState newState)
+
+ //When failing over a connection, we keep the old connection's state but there are certain fields
+ //we need to update
+ public void copyState(ConnectionState newState)
{
- this.serverID = newState.serverID;
- this.idGenerator = newState.idGenerator;
+ this.remotingConnection = newState.remotingConnection;
+
+ this.idGenerator = newState.idGenerator;
+
+ this.serverID = newState.serverID;
+
+ this.versionToUse = newState.versionToUse;
+
+ this.delegate = newState.delegate;
}
-
-
}
Modified: trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/ConsumerState.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -63,9 +63,12 @@
private int maxDeliveries;
+ //Needed for failover
+ private long channelId;
+
public ConsumerState(SessionState parent, ConsumerDelegate delegate, Destination dest,
String selector, boolean noLocal, String subscriptionName, int consumerID,
- boolean isCC, int prefetchSize, int maxDeliveries)
+ boolean isCC, int prefetchSize, int maxDeliveries, long channelId)
{
super(parent, (DelegateSupport)delegate);
children = Collections.EMPTY_SET;
@@ -77,6 +80,7 @@
this.prefetchSize = prefetchSize;
this.subscriptionName=subscriptionName;
this.maxDeliveries = maxDeliveries;
+ this.channelId = channelId;
}
public DelegateSupport getDelegate()
@@ -155,14 +159,25 @@
this.subscriptionName = subscriptionName;
}
- public void copy(ConsumerState newState)
- {
- this.consumerID = newState.consumerID;
- }
-
public int getMaxDeliveries()
{
return maxDeliveries;
}
+
+ public long getChannelId()
+ {
+ return channelId;
+ }
+
+ // When failing over a consumer, we keep the old consumer's state but there are certain fields
+ // we need to update
+ public void copyState(ConsumerState newState)
+ {
+ this.consumerID = newState.consumerID;
+
+ this.delegate = newState.delegate;
+
+ this.channelId = newState.channelId;
+ }
}
Modified: trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -41,9 +41,11 @@
Set getChildren();
DelegateSupport getDelegate();
+
void setDelegate(DelegateSupport delegate);
HierarchicalState getParent();
+
void setParent(HierarchicalState parent);
Version getVersionToUse();
Modified: trunk/src/main/org/jboss/jms/client/state/ProducerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ProducerState.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/ProducerState.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -56,102 +56,107 @@
private SessionState parent;
private ProducerDelegate delegate;
-
+
public ProducerState(SessionState parent, ProducerDelegate delegate, Destination dest)
{
super(parent, (DelegateSupport)delegate);
children = Collections.EMPTY_SET;
this.destination = dest;
}
-
+
public Destination getDestination()
{
return destination;
}
-
-
- public DelegateSupport getDelegate()
- {
- return (DelegateSupport)delegate;
- }
-
- public void setDelegate(DelegateSupport delegate)
- {
- this.delegate=(ProducerDelegate)delegate;
- }
-
-
- public void setParent(HierarchicalState parent)
- {
- this.parent = (SessionState)parent;
- }
- public HierarchicalState getParent()
- {
- return parent;
- }
-
-
+
+ public DelegateSupport getDelegate()
+ {
+ return (DelegateSupport)delegate;
+ }
+
+ public void setDelegate(DelegateSupport delegate)
+ {
+ this.delegate=(ProducerDelegate)delegate;
+ }
+
+ public void setParent(HierarchicalState parent)
+ {
+ this.parent = (SessionState)parent;
+ }
+
+ public HierarchicalState getParent()
+ {
+ return parent;
+ }
+
public void setDestination(Destination dest)
{
this.destination = dest;
-
+
}
public boolean isDisableMessageID()
{
return disableMessageID;
}
-
+
public void setDisableMessageID(boolean disableMessageID)
{
this.disableMessageID = disableMessageID;
}
-
+
public boolean isDisableMessageTimestamp()
{
return disableMessageTimestamp;
}
-
+
public void setDisableMessageTimestamp(boolean disableMessageTimestamp)
{
this.disableMessageTimestamp = disableMessageTimestamp;
}
-
+
public int getPriority()
{
return priority;
}
-
+
public void setPriority(int priority)
{
this.priority = priority;
}
-
+
public long getTimeToLive()
{
return timeToLive;
}
-
+
public void setTimeToLive(long timeToLive)
{
this.timeToLive = timeToLive;
}
-
+
public int getDeliveryMode()
{
return deliveryMode;
}
-
+
public void setDeliveryMode(int deliveryMode)
{
this.deliveryMode = deliveryMode;
}
-
+
public Version getVersionToUse()
{
return parent.getVersionToUse();
}
-
-
+
+ // When failing over a producer, we keep the old producer's state but there are certain fields
+ // we need to update
+ public void copyState(ProducerState newState)
+ {
+ //Actually only one field
+ this.delegate = newState.delegate;
+ }
+
}
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -69,64 +69,65 @@
private List toAck;
private ConnectionState parent;
+
private SessionDelegate delegate;
private Map callbackHandlers;
public SessionState(ConnectionState parent, SessionDelegate delegate,
- boolean transacted, int ackMode, boolean xa)
+ boolean transacted, int ackMode, boolean xa)
{
super(parent, (DelegateSupport)delegate);
children = new HashSet();
this.acknowledgeMode = ackMode;
this.transacted = transacted;
this.xa = xa;
-
+
if (xa)
{
// Create an XA resource
xaResource = new MessagingXAResource(parent.getResourceManager(), this);
}
-
+
// If session is transacted and XA, the currentTxId will be updated when the XAResource will
// be enrolled with a global transaction.
-
+
if (transacted & !xa)
{
// Create a local tx
currentTxId = parent.getResourceManager().createLocalTx();
}
-
+
executor = new QueuedExecutor(new LinkedQueue());
toAck = new ArrayList();
-
+
// TODO could optimise this to use the same map of callbackmanagers (which holds refs
// to callbackhandlers) in the connection, instead of maintaining another map
callbackHandlers = new HashMap();
}
-
-
- public void setParent(HierarchicalState parent)
- {
- this.parent = (ConnectionState)parent;
- }
- public HierarchicalState getParent()
- {
- return parent;
- }
-
- public DelegateSupport getDelegate()
- {
- return (DelegateSupport)delegate;
- }
-
- public void setDelegate(DelegateSupport delegate)
- {
- this.delegate=(SessionDelegate)delegate;
- }
-
-
+
+
+ public void setParent(HierarchicalState parent)
+ {
+ this.parent = (ConnectionState)parent;
+ }
+ public HierarchicalState getParent()
+ {
+ return parent;
+ }
+
+ public DelegateSupport getDelegate()
+ {
+ return (DelegateSupport)delegate;
+ }
+
+ public void setDelegate(DelegateSupport delegate)
+ {
+ this.delegate=(SessionDelegate)delegate;
+ }
+
+
/**
* @return List<AckInfo>
*/
@@ -134,7 +135,7 @@
{
return toAck;
}
-
+
public int getAcknowledgeMode()
{
return acknowledgeMode;
@@ -204,5 +205,13 @@
{
return new ArrayList(callbackHandlers.values());
}
+
+ // When failing over a session, we keep the old session's state but there are certain fields
+ // we need to update
+ public void copyState(SessionState newState)
+ {
+ //Actually only one field
+ this.delegate = newState.delegate;
+ }
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -649,6 +649,9 @@
deliveries.put(new Long(del.getReference().getMessageID()), del);
}
}
+
+ //Prompt delivery
+ messageQueue.deliver(false);
}
protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -59,6 +60,7 @@
import org.jboss.jms.util.ExceptionUtil;
import org.jboss.jms.util.MessageQueueNameHelper;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.local.PagingFilteredQueue;
import org.jboss.messaging.core.plugin.IdManager;
@@ -117,7 +119,7 @@
private int nodeId;
private int maxDeliveryAttempts;
private Queue dlq;
-
+
// Constructors --------------------------------------------------
protected ServerSessionEndpoint(int sessionID, ServerConnectionEndpoint connectionEndpoint)
@@ -144,349 +146,41 @@
dlq = sp.getDLQ();
tr = sp.getTxRepository();
maxDeliveryAttempts = sp.getMaxDeliveryAttempts();
+
}
-
+
// SessionDelegate implementation --------------------------------
-
- public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
- String selectorString,
- boolean noLocal, String subscriptionName,
- boolean connectionConsumer,
- long oldChannelID) throws JMSException
- {
- //TODO we must ensure that the server side failover has completed first before
- //letting this method run
-
- try
- {
- // fail over channel
- if (postOffice.isLocal())
- {
- throw new IllegalStateException("Cannot failover on a non clustered post office!");
- }
-
- // this is a Clustered operation... so postOffice here must be Clustered
- Binding binding = ((ClusteredPostOffice)postOffice).getBindingforChannelId(oldChannelID);
- if (binding == null)
- {
- throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
- }
-
- // TODO - Remove this log.info before merging into trunk
- if (binding.getQueue() instanceof RemoteQueueStub)
- {
- log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((RemoteQueueStub)binding.getQueue()).getChannelID());
- }
- else
- {
- log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((PagingFilteredQueue)binding.getQueue()).getChannelID());
- }
-
- int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
-
- int prefetchSize = connectionEndpoint.getPrefetchSize();
-
- ServerConsumerEndpoint ep =
-
- new ServerConsumerEndpoint(consumerID, binding.getQueue(),
- binding.getQueue().getName(), this, selectorString, noLocal,
- jmsDestination, prefetchSize, dlq);
-
- JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
-
- ClientConsumerDelegate stub =
- new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(),
- prefetchSize, maxDeliveryAttempts);
-
-
- putConsumerEndpoint(consumerID, ep); // caching consumer locally
-
- connectionEndpoint.getServerPeer().putConsumerEndpoint(consumerID, ep); // cachin consumer in server peer
-
- return stub;
- }
- catch (Throwable t)
- {
- throw ExceptionUtil.handleJMSInvocation(t, this + " createConsumerDelegate");
- }
- }
-
- /*
- * Please don't put failover logic in here
- */
- public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
+
+ public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
String selectorString,
boolean noLocal,
String subscriptionName,
- boolean isCC) throws JMSException
+ boolean isCC,
+ long failoverChannelId) throws JMSException
{
try
{
- if (closed)
+ if (failoverChannelId == -1)
{
- throw new IllegalStateException("Session is closed");
- }
-
- if ("".equals(selectorString))
- {
- selectorString = null;
- }
-
- log.debug("creating consumer for " + jmsDestination + ", selector " + selectorString + ", " + (noLocal ? "noLocal, " : "") + "subscription " + subscriptionName);
-
- ManagedDestination mDest = dm.getDestination(jmsDestination.getName(), jmsDestination.isQueue());
-
- if (mDest == null)
- {
- throw new InvalidDestinationException("No such destination: " + jmsDestination);
- }
-
- if (jmsDestination.isTemporary())
- {
- // Can only create a consumer for a temporary destination on the same connection
- // that created it
- if (!connectionEndpoint.hasTemporaryDestination(jmsDestination))
- {
- String msg = "Cannot create a message consumer on a different connection " +
- "to that which created the temporary destination";
- throw new IllegalStateException(msg);
- }
- }
-
- int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
-
- Binding binding = null;
-
- // Always validate the selector first
- Selector selector = null;
- if (selectorString != null)
- {
- selector = new Selector(selectorString);
- }
-
- if (jmsDestination.isTopic())
- {
- JMSCondition topicCond = new JMSCondition(false, jmsDestination.getName());
+ //Standard createConsumerDelegate
- if (subscriptionName == null)
- {
- // non-durable subscription
- if (log.isTraceEnabled()) { log.trace("creating new non-durable subscription on " + jmsDestination); }
-
- //Create the non durable sub
- QueuedExecutor executor = (QueuedExecutor)pool.get();
-
- PagingFilteredQueue q;
-
- if (postOffice.isLocal())
- {
- q = new PagingFilteredQueue(new GUID().toString(), idm.getId(), ms, pm, true, false,
- executor, selector,
- mDest.getFullSize(),
- mDest.getPageSize(),
- mDest.getDownCacheSize());
-
- binding = postOffice.bindQueue(topicCond, q);
- }
- else
- {
- q = new LocalClusteredQueue(postOffice, nodeId, new GUID().toString(), idm.getId(), ms, pm, true, false,
- executor, selector, tr,
- mDest.getFullSize(),
- mDest.getPageSize(),
- mDest.getDownCacheSize());
-
- ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
-
- if (mDest.isClustered())
- {
- binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
- }
- else
- {
- binding = cpo.bindQueue(topicCond, q);
- }
- }
- }
- else
- {
- if (jmsDestination.isTemporary())
- {
- throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
- }
-
- // we have a durable subscription, look it up
- String clientID = connectionEndpoint.getClientID();
- if (clientID == null)
- {
- throw new JMSException("Cannot create durable subscriber without a valid client ID");
- }
-
- // See if there any bindings with the same client_id.subscription_name name
-
- String name = MessageQueueNameHelper.createSubscriptionName(clientID, subscriptionName);
-
- binding = postOffice.getBindingForQueueName(name);
-
- if (binding == null)
- {
- //Does not already exist
-
- if (trace) { log.trace("creating new durable subscription on " + jmsDestination); }
-
- QueuedExecutor executor = (QueuedExecutor)pool.get();
- PagingFilteredQueue q;
-
- if (postOffice.isLocal())
- {
- q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
- executor, selector,
- mDest.getFullSize(),
- mDest.getPageSize(),
- mDest.getDownCacheSize());
-
- binding = postOffice.bindQueue(topicCond, q);
- }
- else
- {
- q = new LocalClusteredQueue(postOffice, nodeId, name, idm.getId(), ms, pm, true, true,
- executor, selector, tr,
- mDest.getFullSize(),
- mDest.getPageSize(),
- mDest.getDownCacheSize());
-
- ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
-
- if (mDest.isClustered())
- {
- binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
- }
- else
- {
- binding = cpo.bindQueue(topicCond, q);
- }
- }
- }
- else
- {
- //Durable sub already exists
-
- if (trace) { log.trace("subscription " + subscriptionName + " already exists"); }
-
- // From javax.jms.Session Javadoc (and also JMS 1.1 6.11.1):
- // A client can change an existing durable subscription by creating a durable
- // TopicSubscriber with the same name and a new topic and/or message selector.
- // Changing a durable subscriber is equivalent to unsubscribing (deleting) the old
- // one and creating a new one.
-
- String filterString = binding.getQueue().getFilter() != null ? binding.getQueue().getFilter().getFilterString() : null;
-
- boolean selectorChanged =
- (selectorString == null && filterString != null) ||
- (filterString == null && selectorString != null) ||
- (filterString != null && selectorString != null &&
- !filterString.equals(selectorString));
-
- if (trace) { log.trace("selector " + (selectorChanged ? "has" : "has NOT") + " changed"); }
-
- boolean topicChanged = !binding.getCondition().equals(jmsDestination.getName());
-
- if (log.isTraceEnabled()) { log.trace("topic " + (topicChanged ? "has" : "has NOT") + " changed"); }
-
- if (selectorChanged || topicChanged)
- {
- if (trace) { log.trace("topic or selector changed so deleting old subscription"); }
-
- // Unbind the durable subscription
-
- if (mDest.isClustered() && !postOffice.isLocal())
- {
- ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
-
- cpo.unbindClusteredQueue(name);
- }
- else
- {
- postOffice.unbindQueue(name);
- }
-
- // create a fresh new subscription
-
- QueuedExecutor executor = (QueuedExecutor)pool.get();
- PagingFilteredQueue q;
-
- if (postOffice.isLocal())
- {
- q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
- executor, selector,
- mDest.getFullSize(),
- mDest.getPageSize(),
- mDest.getDownCacheSize());
- binding = postOffice.bindQueue(topicCond, q);
- }
- else
- {
- q = new LocalClusteredQueue(postOffice, nodeId, name, idm.getId(), ms, pm, true, true,
- executor, selector, tr,
- mDest.getFullSize(),
- mDest.getPageSize(),
- mDest.getDownCacheSize());
-
- ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
-
- if (mDest.isClustered())
- {
- binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
- }
- else
- {
- binding = cpo.bindQueue(topicCond, (LocalClusteredQueue)q);
- }
- }
- }
- }
- }
+ return createConsumerDelegateInternal(jmsDestination, selectorString, noLocal, subscriptionName,
+ isCC);
}
else
{
- //Consumer on a jms queue
-
- //Let's find the binding
- binding = postOffice.getBindingForQueueName(jmsDestination.getName());
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find binding for jms queue: " + jmsDestination.getName());
- }
- }
-
- int prefetchSize = connectionEndpoint.getPrefetchSize();
-
- ServerConsumerEndpoint ep =
- new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
- binding.getQueue().getName(), this, selectorString, noLocal,
- jmsDestination, prefetchSize, dlq);
-
- JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
-
- ClientConsumerDelegate stub =
- new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(),
- prefetchSize, maxDeliveryAttempts);
-
- putConsumerEndpoint(consumerID, ep); // caching consumer locally
-
- connectionEndpoint.getServerPeer().putConsumerEndpoint(consumerID, ep); // cachin consumer in server peer
-
- log.debug("created and registered " + ep);
-
- return stub;
+ //Failover of consumer
+
+ return failoverConsumer(jmsDestination, selectorString, noLocal, subscriptionName,
+ isCC, failoverChannelId);
+ }
}
catch (Throwable t)
{
throw ExceptionUtil.handleJMSInvocation(t, this + " createConsumerDelegate");
}
}
-
+
public BrowserDelegate createBrowserDelegate(JBossDestination jmsDestination, String messageSelector)
throws JMSException
{
@@ -1000,23 +694,7 @@
}
// Protected -----------------------------------------------------
-
- protected void acknowledgeInternal(AckInfo ackInfo) throws Throwable
- {
- //If the message was delivered via a connection consumer then the message needs to be acked
- //via the original consumer that was used to feed the connection consumer - which
- //won't be one of the consumers of this session
- //Therefore we always look in the global map of consumers held in the server peer
- ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
-
- if (consumer == null)
- {
- throw new IllegalArgumentException("Cannot find consumer id: " + ackInfo.getConsumerID());
- }
- consumer.acknowledge(ackInfo.getMessageID());
- }
-
protected ServerConsumerEndpoint putConsumerEndpoint(int consumerID, ServerConsumerEndpoint d)
{
if (trace) { log.trace(this + " caching consumer " + consumerID); }
@@ -1072,6 +750,341 @@
}
// Private -------------------------------------------------------
+
+ private void acknowledgeInternal(AckInfo ackInfo) throws Throwable
+ {
+ //If the message was delivered via a connection consumer then the message needs to be acked
+ //via the original consumer that was used to feed the connection consumer - which
+ //won't be one of the consumers of this session
+ //Therefore we always look in the global map of consumers held in the server peer
+ ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
+ if (consumer == null)
+ {
+ throw new IllegalArgumentException("Cannot find consumer id: " + ackInfo.getConsumerID());
+ }
+
+ consumer.acknowledge(ackInfo.getMessageID());
+ }
+
+ private ConsumerDelegate failoverConsumer(JBossDestination jmsDestination,
+ String selectorString,
+ boolean noLocal, String subscriptionName,
+ boolean connectionConsumer,
+ long oldChannelID) throws Exception
+ {
+ //fail over channel
+ if (postOffice.isLocal())
+ {
+ throw new IllegalStateException("Cannot failover on a non clustered post office!");
+ }
+
+ //this is a Clustered operation... so postOffice here must be Clustered
+ Binding binding = ((ClusteredPostOffice)postOffice).getBindingforChannelId(oldChannelID);
+ if (binding == null)
+ {
+ throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
+ }
+
+ // TODO - Remove this log.info before merging into trunk
+ if (binding.getQueue() instanceof RemoteQueueStub)
+ {
+ log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((RemoteQueueStub)binding.getQueue()).getChannelID());
+ }
+ else
+ {
+ log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((PagingFilteredQueue)binding.getQueue()).getChannelID());
+ }
+
+ int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
+
+ int prefetchSize = connectionEndpoint.getPrefetchSize();
+
+ ServerConsumerEndpoint ep =
+
+ new ServerConsumerEndpoint(consumerID, binding.getQueue(),
+ binding.getQueue().getName(), this, selectorString, noLocal,
+ jmsDestination, prefetchSize, dlq);
+
+ JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
+
+ ClientConsumerDelegate stub =
+ new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(),
+ prefetchSize, maxDeliveryAttempts);
+
+
+ putConsumerEndpoint(consumerID, ep); // caching consumer locally
+
+ connectionEndpoint.getServerPeer().putConsumerEndpoint(consumerID, ep); // cachin consumer in server peer
+
+ return stub;
+ }
+
+ private ConsumerDelegate createConsumerDelegateInternal(JBossDestination jmsDestination,
+ String selectorString,
+ boolean noLocal,
+ String subscriptionName,
+ boolean isCC) throws Throwable
+ {
+ if (closed)
+ {
+ throw new IllegalStateException("Session is closed");
+ }
+
+ if ("".equals(selectorString))
+ {
+ selectorString = null;
+ }
+
+ log.debug("creating consumer for " + jmsDestination + ", selector " + selectorString + ", " + (noLocal ? "noLocal, " : "") + "subscription " + subscriptionName);
+
+ ManagedDestination mDest = dm.getDestination(jmsDestination.getName(), jmsDestination.isQueue());
+
+ if (mDest == null)
+ {
+ throw new InvalidDestinationException("No such destination: " + jmsDestination);
+ }
+
+ if (jmsDestination.isTemporary())
+ {
+ // Can only create a consumer for a temporary destination on the same connection
+ // that created it
+ if (!connectionEndpoint.hasTemporaryDestination(jmsDestination))
+ {
+ String msg = "Cannot create a message consumer on a different connection " +
+ "to that which created the temporary destination";
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
+
+ Binding binding = null;
+
+ //Always validate the selector first
+ Selector selector = null;
+ if (selectorString != null)
+ {
+ selector = new Selector(selectorString);
+ }
+
+ if (jmsDestination.isTopic())
+ {
+ JMSCondition topicCond = new JMSCondition(false, jmsDestination.getName());
+
+ if (subscriptionName == null)
+ {
+ // non-durable subscription
+ if (log.isTraceEnabled()) { log.trace("creating new non-durable subscription on " + jmsDestination); }
+
+ // Create the non durable sub
+ QueuedExecutor executor = (QueuedExecutor)pool.get();
+
+ PagingFilteredQueue q;
+
+ if (postOffice.isLocal())
+ {
+ q = new PagingFilteredQueue(new GUID().toString(), idm.getId(), ms, pm, true, false,
+ executor, selector,
+ mDest.getFullSize(),
+ mDest.getPageSize(),
+ mDest.getDownCacheSize());
+
+ binding = postOffice.bindQueue(topicCond, q);
+ }
+ else
+ {
+ q = new LocalClusteredQueue(postOffice, nodeId, new GUID().toString(), idm.getId(), ms, pm, true, false,
+ executor, selector, tr,
+ mDest.getFullSize(),
+ mDest.getPageSize(),
+ mDest.getDownCacheSize());
+
+ ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
+
+ if (mDest.isClustered())
+ {
+ binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
+ }
+ else
+ {
+ binding = cpo.bindQueue(topicCond, q);
+ }
+ }
+ }
+ else
+ {
+ if (jmsDestination.isTemporary())
+ {
+ throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
+ }
+
+ // we have a durable subscription, look it up
+ String clientID = connectionEndpoint.getClientID();
+ if (clientID == null)
+ {
+ throw new JMSException("Cannot create durable subscriber without a valid client ID");
+ }
+
+ // See if there any bindings with the same client_id.subscription_name name
+
+ String name = MessageQueueNameHelper.createSubscriptionName(clientID, subscriptionName);
+
+ binding = postOffice.getBindingForQueueName(name);
+
+ if (binding == null)
+ {
+ // Does not already exist
+
+ if (trace) { log.trace("creating new durable subscription on " + jmsDestination); }
+
+ QueuedExecutor executor = (QueuedExecutor)pool.get();
+ PagingFilteredQueue q;
+
+ if (postOffice.isLocal())
+ {
+ q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
+ executor, selector,
+ mDest.getFullSize(),
+ mDest.getPageSize(),
+ mDest.getDownCacheSize());
+
+ binding = postOffice.bindQueue(topicCond, q);
+ }
+ else
+ {
+ q = new LocalClusteredQueue(postOffice, nodeId, name, idm.getId(), ms, pm, true, true,
+ executor, selector, tr,
+ mDest.getFullSize(),
+ mDest.getPageSize(),
+ mDest.getDownCacheSize());
+
+ ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
+
+ if (mDest.isClustered())
+ {
+ binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
+ }
+ else
+ {
+ binding = cpo.bindQueue(topicCond, q);
+ }
+ }
+ }
+ else
+ {
+ //Durable sub already exists
+
+ if (trace) { log.trace("subscription " + subscriptionName + " already exists"); }
+
+ // From javax.jms.Session Javadoc (and also JMS 1.1 6.11.1):
+ // A client can change an existing durable subscription by creating a durable
+ // TopicSubscriber with the same name and a new topic and/or message selector.
+ // Changing a durable subscriber is equivalent to unsubscribing (deleting) the old
+ // one and creating a new one.
+
+ String filterString = binding.getQueue().getFilter() != null ? binding.getQueue().getFilter().getFilterString() : null;
+
+ boolean selectorChanged =
+ (selectorString == null && filterString != null) ||
+ (filterString == null && selectorString != null) ||
+ (filterString != null && selectorString != null &&
+ !filterString.equals(selectorString));
+
+ if (trace) { log.trace("selector " + (selectorChanged ? "has" : "has NOT") + " changed"); }
+
+ boolean topicChanged = !binding.getCondition().equals(jmsDestination.getName());
+
+ if (log.isTraceEnabled()) { log.trace("topic " + (topicChanged ? "has" : "has NOT") + " changed"); }
+
+ if (selectorChanged || topicChanged)
+ {
+ if (trace) { log.trace("topic or selector changed so deleting old subscription"); }
+
+ // Unbind the durable subscription
+
+ if (mDest.isClustered() && !postOffice.isLocal())
+ {
+ ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
+
+ cpo.unbindClusteredQueue(name);
+ }
+ else
+ {
+ postOffice.unbindQueue(name);
+ }
+
+ // create a fresh new subscription
+
+ QueuedExecutor executor = (QueuedExecutor)pool.get();
+ PagingFilteredQueue q;
+
+ if (postOffice.isLocal())
+ {
+ q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
+ executor, selector,
+ mDest.getFullSize(),
+ mDest.getPageSize(),
+ mDest.getDownCacheSize());
+ binding = postOffice.bindQueue(topicCond, q);
+ }
+ else
+ {
+ q = new LocalClusteredQueue(postOffice, nodeId, name, idm.getId(), ms, pm, true, true,
+ executor, selector, tr,
+ mDest.getFullSize(),
+ mDest.getPageSize(),
+ mDest.getDownCacheSize());
+
+ ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
+
+ if (mDest.isClustered())
+ {
+ binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
+ }
+ else
+ {
+ binding = cpo.bindQueue(topicCond, (LocalClusteredQueue)q);
+ }
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ // Consumer on a jms queue
+
+ // Let's find the binding
+ binding = postOffice.getBindingForQueueName(jmsDestination.getName());
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding for jms queue: " + jmsDestination.getName());
+ }
+ }
+
+ int prefetchSize = connectionEndpoint.getPrefetchSize();
+
+ ServerConsumerEndpoint ep =
+ new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
+ binding.getQueue().getName(), this, selectorString, noLocal,
+ jmsDestination, prefetchSize, dlq);
+
+ JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
+
+ ClientConsumerDelegate stub =
+ new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(),
+ prefetchSize, maxDeliveryAttempts);
+
+ putConsumerEndpoint(consumerID, ep); // caching consumer locally
+
+ connectionEndpoint.getServerPeer().putConsumerEndpoint(consumerID, ep); // cachin consumer in server peer
+
+ log.debug("created and registered " + ep);
+
+ return stub;
+ }
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -46,15 +46,10 @@
*/
public interface SessionEndpoint extends Closeable
{
- ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
- String selectorString,
- boolean noLocal, String subscriptionName,
- boolean connectionConsumer,
- long oldchannelID) throws JMSException;
-
ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
- boolean connectionConsumer) throws JMSException;
+ boolean connectionConsumer,
+ long failoverChannelID) throws JMSException;
BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
throws JMSException;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -86,22 +86,11 @@
public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
- boolean connectionConsumer) throws JMSException
+ boolean connectionConsumer, long failoverChannelId) throws JMSException
{
- return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer);
+ return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer, failoverChannelId);
}
- public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
- String selectorString,
- boolean noLocal, String subscriptionName,
- boolean connectionConsumer,
- long oldChannelID) throws JMSException
- {
- return endpoint.failOverConsumer(jmsDestination, selectorString, noLocal,
- subscriptionName, connectionConsumer,
- oldChannelID);
- }
-
public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
throws JMSException
{
Modified: trunk/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -36,26 +36,8 @@
public static final String REMOTING_SESSION_ID = "REMOTING_SESSION_ID";
public static final String CALLBACK_HANDLER = "CALLBACK_HANDLER";
-
- public static final String CONSUMER_ID = "CONSUMER_ID";
-
- public static final String PREFETCH_SIZE = "BUFFER_SIZE";
-
- public static final String CLIENT_CONNECTION_ID = "CC_ID";
-
+
public static final String VERSION_NUMBER = "VERSION_NUMBER";
public static final String JMS_CLIENT_VM_ID = "JMS_CLIENT_VM_ID";
-
- public static final String CF_DELEGATES = "CF_DELEGATES";
-
- public static final String SERVER_ID = "SERVER_ID";
-
- public static final String REMOTING_CONNECTION = "REMOTING_CONNECTION";
-
- public static final String FAILOVER_MAP = "CF_FAIL_IND";
-
- public static final String CONNECTION_VERSION = "CONNECTION_VERSION";
-
- public static final String MAX_DELIVERIES = "MAX_DELS";
}
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -27,6 +27,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
@@ -72,11 +73,7 @@
private static final Logger log = Logger.getLogger(ResourceManager.class);
// Constructors --------------------------------------------------
-
- protected ResourceManager()
- {
- }
-
+
// Public --------------------------------------------------------
public TxState getTx(Object xid)
@@ -123,14 +120,56 @@
/**
* Navigate on ACK and change consumer ids on every ACK not sent yet.
*/
- public void handleFailover(Object xid, int oldConsumerID, int newConsumerID)
+ public void handleFailover(int oldConsumerID, int newConsumerID)
{
if (trace) { log.trace("handleFailover:: Transfering consumer id on ACKs from " + oldConsumerID + " to " + newConsumerID); }
- TxState tx = getTx(xid);
+ //TODO need to lock the rm while this is happening
- tx.handleFailover(oldConsumerID, newConsumerID);
+ //Note we need to replace ids for *all* transactions - this is because, for XA
+ //the session might have done work in many transactions
+ Iterator iter = this.transactions.values().iterator();
+
+ while (iter.hasNext())
+ {
+ TxState tx = (TxState)iter.next();
+
+ tx.handleFailover(oldConsumerID, newConsumerID);
+ }
}
+
+ /*
+ * Get all the ackinfos with a consumer id in the specified set
+ */
+ public List getAckInfosForConsumerIds(Set consumerIds)
+ {
+ Iterator iter = this.transactions.values().iterator();
+
+ List ackInfos = new ArrayList();
+
+ while (iter.hasNext())
+ {
+ TxState tx = (TxState)iter.next();
+
+ tx.getAckInfosForConsumerIds(ackInfos, consumerIds);
+ }
+
+ return ackInfos;
+ }
+
+ public void removeNonPersistentAcks(Set consumerIds)
+ {
+ Iterator iter = this.transactions.values().iterator();
+
+ List ackInfos = new ArrayList();
+
+ while (iter.hasNext())
+ {
+ TxState tx = (TxState)iter.next();
+
+ tx.removeNonPersistentAcks(consumerIds);
+ }
+ }
/**
Deleted: trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -1,107 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.tx;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class manages instances of ResourceManager. It ensures there is one instance per instance
- * of JMS server as specified by the server id.
- *
- * This allows different JMS connections to the same JMS server (the underlying resource is the JMS server)
- * to use the same resource manager.
- *
- * This means isSameRM() on XAResource returns true, allowing the Transaction manager to join work in one
- * tx to another thus allowing 1PC optimization which should help performance.
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version $Revision$
- *
- * $Id$
- */
-public class ResourceManagerFactory
-{
- public static final ResourceManagerFactory instance = new ResourceManagerFactory();
-
- private Map holders;
-
- private ResourceManagerFactory()
- {
- holders = new HashMap();
- }
-
- public synchronized boolean containsResourceManager(int serverID)
- {
- return holders.containsKey(new Integer(serverID));
- }
-
- /**
- * @param serverID - server peer ID.
- */
- public synchronized ResourceManager checkOutResourceManager(int serverID)
- {
- Integer in = new Integer(serverID);
-
- Holder h = (Holder)holders.get(in);
-
- if (h == null)
- {
- h = new Holder();
-
- holders.put(in, h);
- }
- else
- {
- h.refCount++;
- }
-
- return h.rm;
- }
-
- public synchronized void checkInResourceManager(int serverID)
- {
- Integer in = new Integer(serverID);
-
- Holder h = (Holder)holders.get(in);
-
- if (h == null)
- {
- throw new IllegalArgumentException("Cannot find resource manager for server: " + serverID);
- }
-
- h.refCount--;
-
- if (h.refCount == 0)
- {
- holders.remove(in);
- }
- }
-
- private static class Holder
- {
- ResourceManager rm = new ResourceManager();
-
- int refCount = 1;
- }
-
-}
Modified: trunk/src/main/org/jboss/jms/tx/TxState.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/TxState.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/tx/TxState.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.jboss.jms.message.JBossMessage;
import org.jboss.messaging.core.message.MessageFactory;
@@ -110,6 +111,32 @@
}
}
}
+
+ public void getAckInfosForConsumerIds(List ackInfos, Set consumerIds)
+ {
+ for (Iterator ackIterator = acks.iterator(); ackIterator.hasNext(); )
+ {
+ AckInfo ackInfo = (AckInfo)ackIterator.next();
+
+ if (consumerIds.contains(new Integer(ackInfo.getConsumerID())))
+ {
+ ackInfos.add(ackInfo);
+ }
+ }
+ }
+
+ public void removeNonPersistentAcks(Set consumerIds)
+ {
+ for (Iterator ackIterator = acks.iterator(); ackIterator.hasNext(); )
+ {
+ AckInfo ackInfo = (AckInfo)ackIterator.next();
+
+ if (!ackInfo.msg.getMessage().isReliable())
+ {
+ ackIterator.remove();
+ }
+ }
+ }
// Streamable implementation ---------------------------------
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/tests/build.xml 2006-12-12 18:25:55 UTC (rev 1771)
@@ -387,7 +387,7 @@
haltonerror="${junit.batchtest.haltonerror}">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
<fileset dir="${build.tests.classes}">
- <include name="**/messaging/core/**/*Test.class"/>
+ <!-- <include name="**/messaging/core/**/*Test.class"/> -->
<include name="**/messaging/jms/**/*Test.class"/>
<exclude name="**/jms/stress/**"/>
<exclude name="**/jms/crash/*Test.class"/>
@@ -745,7 +745,7 @@
haltonerror="${junit.batchtest.haltonerror}">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
<fileset dir="${build.tests.classes}">
- <include name="**/jms/clustering/HATest.class"/>
+ <include name="**/jms/clustering/*Test.class"/>
<!--
<include name="**/jms/clustering/SimpleClusteringTest.class"/>
-->
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionTest.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionTest.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -44,8 +44,6 @@
import org.jboss.jms.message.MessageIdGenerator;
import org.jboss.jms.message.MessageIdGeneratorFactory;
import org.jboss.jms.server.ServerPeer;
-import org.jboss.jms.tx.ResourceManager;
-import org.jboss.jms.tx.ResourceManagerFactory;
import org.jboss.logging.Logger;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.ServerInvocationHandler;
@@ -121,42 +119,8 @@
Connection conn = cf.createConnection();
conn.close();
}
- }
+ }
- public void testResourceManagersForSameServer() throws Exception
- {
- Connection conn1 = cf.createConnection();
-
- ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)conn1).getDelegate();
-
- ConnectionState state1 = (ConnectionState)del1.getState();
-
- ResourceManager rm1 = state1.getResourceManager();
-
- Connection conn2 = cf.createConnection();
-
- ClientConnectionDelegate del2 = (ClientConnectionDelegate)((JBossConnection)conn2).getDelegate();
-
- ConnectionState state2 = (ConnectionState)del2.getState();
-
- ResourceManager rm2 = state2.getResourceManager();
-
- //Two connections for same server should share the same resource manager
-
- assertTrue(rm1 == rm2);
-
- assertTrue(ResourceManagerFactory.instance.containsResourceManager(state2.getServerID()));
-
- conn1.close();
-
- //Check reference counting
- assertTrue(ResourceManagerFactory.instance.containsResourceManager(state2.getServerID()));
-
- conn2.close();
-
- assertFalse(ResourceManagerFactory.instance.containsResourceManager(state2.getServerID()));
- }
-
public void testMessageIDGeneratorsForSameServer() throws Exception
{
Connection conn1 = cf.createConnection();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-12 18:25:55 UTC (rev 1771)
@@ -37,6 +37,7 @@
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
+import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -69,489 +70,489 @@
}
// Public --------------------------------------------------------
-//
-// /*
-// * Test that connections created using a clustered connection factory are created round robin on
-// * different servers
-// */
-// public void testRoundRobinConnectionCreation() throws Exception
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// log.info ("number of delegates = " + delegate.getDelegates().length);
-// log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
-//
-// assertEquals(3, delegate.getDelegates().length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//
-// assertEquals(0, cf1.getServerId());
-//
-// assertEquals(1, cf2.getServerId());
-//
-// assertEquals(2, cf3.getServerId());
-//
-// assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-//
-// Connection conn1 = null;
-//
-// Connection conn2 = null;
-//
-// Connection conn3 = null;
-//
-// Connection conn4 = null;
-//
-// Connection conn5 = null;
-//
-// try
-// {
-// conn1 = factory.createConnection();
-//
-// conn2 = factory.createConnection();
-//
-// conn3 = factory.createConnection();
-//
-// conn4 = factory.createConnection();
-//
-// conn5 = factory.createConnection();
-//
-// ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
-//
-// ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
-//
-// ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
-//
-// ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
-//
-// ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
-//
-// int serverID1 = state1.getServerID();
-//
-// int serverID2 = state2.getServerID();
-//
-// int serverID3 = state3.getServerID();
-//
-// int serverID4 = state4.getServerID();
-//
-// int serverID5 = state5.getServerID();
-//
-// log.info("server id 1: " + serverID1);
-//
-// log.info("server id 2: " + serverID2);
-//
-// log.info("server id 3: " + serverID3);
-//
-// log.info("server id 4: " + serverID4);
-//
-// log.info("server id 5: " + serverID5);
-//
-// assertEquals(0, serverID1);
-//
-// assertEquals(1, serverID2);
-//
-// assertEquals(2, serverID3);
-//
-// assertEquals(0, serverID4);
-//
-// assertEquals(1, serverID5);
-// }
-// finally
-// {
-// if (conn1 != null)
-// {
-// conn1.close();
-// }
-//
-// if (conn2 != null)
-// {
-// conn2.close();
-// }
-//
-// if (conn3 != null)
-// {
-// conn3.close();
-// }
-//
-// if (conn4 != null)
-// {
-// conn4.close();
-// }
-//
-// if (conn5 != null)
-// {
-// conn5.close();
-// }
-// }
-//
-// }
-//
-// /*
-// * Test that the failover mapping is created correctly and updated properly when nodes leave
-// * or join
-// */
-// public void testDefaultFailoverMap() throws Exception
-// {
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//
-// //The order here depends on the order the servers were started in
-//
-// //If any servers get stopped and then started then the order will change
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// log.info("cf3 serverid=" + cf3.getServerId());
-//
-//
-// assertEquals(0, cf1.getServerId());
-//
-// assertEquals(1, cf2.getServerId());
-//
-// assertEquals(2, cf3.getServerId());
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// assertEquals(3, delegates.length);
-//
-// assertEquals(3, failoverMap.size());
-//
-// // Default failover policy just chooses the node to the right
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-// }
-//
-// //Now cleanly stop one of the servers
-//
-// log.info("************** STOPPING SERVER 0");
-// ServerManagement.stop(0);
-//
-// log.info("server stopped");
-//
-// assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
-//
-// {
-// //Lookup another connection factory
-//
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-//
-// log.info("Got connection factory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info("Got failover map");
-//
-// assertEquals(2, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// //Order here depends on order servers were started in
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// assertEquals(1, cf1.getServerId());
-//
-// assertEquals(2, cf2.getServerId());
-//
-//
-// assertEquals(2, failoverMap.size());
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-// }
-//
-// //Cleanly stop another server
-//
-// log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
-//
-// ServerManagement.stop(1);
-//
-// assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
-//
-// {
-// //Lookup another connection factory
-//
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// assertEquals(1, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// assertEquals(2, cf1.getServerId());
-//
-//
-// assertEquals(1, failoverMap.size());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-// }
-//
-// //Restart server 0
-//
-// ServerManagement.start("all", 0);
-//
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// log.info("Got connection factory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info("Got failover map");
-//
-// assertEquals(2, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// assertEquals(2, cf1.getServerId());
-//
-// assertEquals(0, cf2.getServerId());
-//
-//
-// assertEquals(2, failoverMap.size());
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-// }
-//
-//
-// //Restart server 1
-//
-// ServerManagement.start("all", 1);
-//
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-//
-// log.info("Got connection factory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info("Got failover map");
-//
-// assertEquals(3, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// log.info("cf3 serverid=" + cf3.getServerId());
-//
-// assertEquals(2, cf1.getServerId());
-//
-// assertEquals(0, cf2.getServerId());
-//
-// assertEquals(1, cf3.getServerId());
-//
-//
-// assertEquals(3, failoverMap.size());
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-// }
-// }
-//
-// public void testSimpleFailover() throws Exception
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
-// assertEquals(3, nodeIDView.size());
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// ClientConnectionFactoryDelegate cf1 = delegates[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegates[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegates[2];
-//
-// int server0Id = cf1.getServerId();
-//
-// int server1Id = cf2.getServerId();
-//
-// int server2Id = cf3.getServerId();
-//
-// log.info("server 0 id: " + server0Id);
-//
-// log.info("server 1 id: " + server1Id);
-//
-// log.info("server 2 id: " + server2Id);
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info(failoverMap.get(new Integer(server0Id)));
-// log.info(failoverMap.get(new Integer(server1Id)));
-// log.info(failoverMap.get(new Integer(server2Id)));
-//
-// int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
-//
-// // server 1 should failover onto server 2
-//
-// assertEquals(server2Id, server1FailoverId);
-//
-// Connection conn = null;
-//
-// try
-// {
-//
-// //Get a connection on server 1
-// conn = factory.createConnection(); //connection on server 0
-//
-// conn.close();
-//
-// conn = factory.createConnection(); //connection on server 1
-//
-// JBossConnection jbc = (JBossConnection)conn;
-//
-// ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
-//
-// ConnectionState state = (ConnectionState)del.getState();
-//
-// int initialServerID = state.getServerID();
-//
-// assertEquals(1, initialServerID);
-//
-// Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageProducer prod = sess.createProducer(queue1);
-//
-// MessageConsumer cons = sess.createConsumer(queue1);
-//
-// final int NUM_MESSAGES = 100;
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = sess.createTextMessage("message:" + i);
-//
-// prod.send(tm);
-// }
-//
-// //So now, messages should be in queue1 on server 1
-// //So we now kill server 1
-// //Which should cause transparent failover of connection conn onto server 1
-//
-// log.info("************ KILLING (CRASHING) SERVER 1");
-//
-// ServerManagement.getServer(1).destroy();
-//
-// log.info("killed server, now waiting");
-//
-// Thread.sleep(5000);
-//
-// log.info("done wait");
-//
-// state = (ConnectionState)del.getState();
-//
-// int finalServerID = state.getServerID();
-//
-// log.info("final server id= " + finalServerID);
-//
-// //server id should now be 2
-//
-// assertEquals(2, finalServerID);
-//
-// conn.start();
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = (TextMessage)cons.receive(1000);
-//
-// log.info("message is " + tm);
-//
-// assertNotNull(tm);
-//
-// assertEquals("message:" + i, tm.getText());
-// }
-// log.info("done");
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// try
-// {
-// conn.close();
-// }
-// catch (Exception e)
-// {
-// e.printStackTrace();
-// }
-// }
-// }
-//
-// }
+ /*
+ * Test that connections created using a clustered connection factory are created round robin on
+ * different servers
+ */
+ public void testRoundRobinConnectionCreation() throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ log.info ("number of delegates = " + delegate.getDelegates().length);
+ log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
+
+ assertEquals(3, delegate.getDelegates().length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+
+ assertEquals(0, cf1.getServerId());
+
+ assertEquals(1, cf2.getServerId());
+
+ assertEquals(2, cf3.getServerId());
+
+ assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+
+ Connection conn3 = null;
+
+ Connection conn4 = null;
+
+ Connection conn5 = null;
+
+ try
+ {
+ conn1 = factory.createConnection();
+
+ conn2 = factory.createConnection();
+
+ conn3 = factory.createConnection();
+
+ conn4 = factory.createConnection();
+
+ conn5 = factory.createConnection();
+
+ ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
+
+ ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
+
+ ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
+
+ ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
+
+ ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
+
+ int serverID1 = state1.getServerID();
+
+ int serverID2 = state2.getServerID();
+
+ int serverID3 = state3.getServerID();
+
+ int serverID4 = state4.getServerID();
+
+ int serverID5 = state5.getServerID();
+
+ log.info("server id 1: " + serverID1);
+
+ log.info("server id 2: " + serverID2);
+
+ log.info("server id 3: " + serverID3);
+
+ log.info("server id 4: " + serverID4);
+
+ log.info("server id 5: " + serverID5);
+
+ assertEquals(0, serverID1);
+
+ assertEquals(1, serverID2);
+
+ assertEquals(2, serverID3);
+
+ assertEquals(0, serverID4);
+
+ assertEquals(1, serverID5);
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+
+ if (conn4 != null)
+ {
+ conn4.close();
+ }
+
+ if (conn5 != null)
+ {
+ conn5.close();
+ }
+ }
+
+ }
+
+ /*
+ * Test that the failover mapping is created correctly and updated properly when nodes leave
+ * or join
+ */
+ public void testDefaultFailoverMap() throws Exception
+ {
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+
+ //The order here depends on the order the servers were started in
+
+ //If any servers get stopped and then started then the order will change
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ log.info("cf3 serverid=" + cf3.getServerId());
+
+
+ assertEquals(0, cf1.getServerId());
+
+ assertEquals(1, cf2.getServerId());
+
+ assertEquals(2, cf3.getServerId());
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ assertEquals(3, delegates.length);
+
+ assertEquals(3, failoverMap.size());
+
+ // Default failover policy just chooses the node to the right
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+ }
+
+ //Now cleanly stop one of the servers
+
+ log.info("************** STOPPING SERVER 0");
+ ServerManagement.stop(0);
+
+ log.info("server stopped");
+
+ assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
+
+ {
+ //Lookup another connection factory
+
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+
+ log.info("Got connection factory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info("Got failover map");
+
+ assertEquals(2, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ //Order here depends on order servers were started in
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ assertEquals(1, cf1.getServerId());
+
+ assertEquals(2, cf2.getServerId());
+
+
+ assertEquals(2, failoverMap.size());
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+ }
+
+ //Cleanly stop another server
+
+ log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
+
+ ServerManagement.stop(1);
+
+ assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
+
+ {
+ //Lookup another connection factory
+
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ assertEquals(1, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ assertEquals(2, cf1.getServerId());
+
+
+ assertEquals(1, failoverMap.size());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+ }
+
+ //Restart server 0
+
+ ServerManagement.start("all", 0);
+
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ log.info("Got connection factory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info("Got failover map");
+
+ assertEquals(2, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ assertEquals(2, cf1.getServerId());
+
+ assertEquals(0, cf2.getServerId());
+
+
+ assertEquals(2, failoverMap.size());
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+ }
+
+
+ //Restart server 1
+
+ ServerManagement.start("all", 1);
+
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+
+ log.info("Got connection factory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info("Got failover map");
+
+ assertEquals(3, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ log.info("cf3 serverid=" + cf3.getServerId());
+
+ assertEquals(2, cf1.getServerId());
+
+ assertEquals(0, cf2.getServerId());
+
+ assertEquals(1, cf3.getServerId());
+
+
+ assertEquals(3, failoverMap.size());
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+ }
+ }
+ public void testSimpleFailover() throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+ assertEquals(3, nodeIDView.size());
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ ClientConnectionFactoryDelegate cf1 = delegates[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegates[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegates[2];
+
+ int server0Id = cf1.getServerId();
+
+ int server1Id = cf2.getServerId();
+
+ int server2Id = cf3.getServerId();
+
+ log.info("server 0 id: " + server0Id);
+
+ log.info("server 1 id: " + server1Id);
+
+ log.info("server 2 id: " + server2Id);
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info(failoverMap.get(new Integer(server0Id)));
+ log.info(failoverMap.get(new Integer(server1Id)));
+ log.info(failoverMap.get(new Integer(server2Id)));
+
+ int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+
+ // server 1 should failover onto server 2
+
+ assertEquals(server2Id, server1FailoverId);
+
+ Connection conn = null;
+
+ try
+ {
+
+ //Get a connection on server 1
+ conn = factory.createConnection(); //connection on server 0
+
+ conn.close();
+
+ conn = factory.createConnection(); //connection on server 1
+
+ JBossConnection jbc = (JBossConnection)conn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ int initialServerID = state.getServerID();
+
+ assertEquals(1, initialServerID);
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message:" + i);
+
+ prod.send(tm);
+ }
+
+ //So now, messages should be in queue1 on server 1
+ //So we now kill server 1
+ //Which should cause transparent failover of connection conn onto server 1
+
+ log.info("************ KILLING (CRASHING) SERVER 1");
+
+ ServerManagement.getServer(1).kill();
+
+ log.info("killed server, now waiting");
+
+ Thread.sleep(5000);
+
+ log.info("done wait");
+
+ state = (ConnectionState)del.getState();
+
+ int finalServerID = state.getServerID();
+
+ log.info("final server id= " + finalServerID);
+
+ //server id should now be 2
+
+ assertEquals(2, finalServerID);
+
+ conn.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ log.info("message is " + tm);
+
+ assertNotNull(tm);
+
+ assertEquals("message:" + i, tm.getText());
+ }
+ log.info("done");
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
+
+
public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
@@ -723,178 +724,178 @@
}
-// public void testFailoverWithUnackedMessagesTransactional() throws Exception
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
-// assertEquals(3, nodeIDView.size());
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// ClientConnectionFactoryDelegate cf1 = delegates[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegates[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegates[2];
-//
-// int server0Id = cf1.getServerId();
-//
-// int server1Id = cf2.getServerId();
-//
-// int server2Id = cf3.getServerId();
-//
-// log.info("server 0 id: " + server0Id);
-//
-// log.info("server 1 id: " + server1Id);
-//
-// log.info("server 2 id: " + server2Id);
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info(failoverMap.get(new Integer(server0Id)));
-// log.info(failoverMap.get(new Integer(server1Id)));
-// log.info(failoverMap.get(new Integer(server2Id)));
-//
-// int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
-//
-// // server 1 should failover onto server 2
-//
-// assertEquals(server2Id, server1FailoverId);
-//
-// Connection conn = null;
-//
-// try
-// {
-// //Get a connection on server 1
-// conn = factory.createConnection(); //connection on server 0
-//
-// conn.close();
-//
-// conn = factory.createConnection(); //connection on server 1
-//
-// JBossConnection jbc = (JBossConnection)conn;
-//
-// ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
-//
-// ConnectionState state = (ConnectionState)del.getState();
-//
-// int initialServerID = state.getServerID();
-//
-// assertEquals(1, initialServerID);
-//
-// Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
-//
-// MessageProducer prod = sess.createProducer(queue1);
-//
-// MessageConsumer cons = sess.createConsumer(queue1);
-//
-// final int NUM_MESSAGES = 100;
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = sess.createTextMessage("message:" + i);
-//
-// prod.send(tm);
-// }
-//
-// sess.commit();
-//
-// conn.start();
-//
-// //Now consume half of the messages but don't commit them these will end up in
-// //client side resource manager
-//
-// for (int i = 0; i < NUM_MESSAGES / 2; i++)
-// {
-// TextMessage tm = (TextMessage)cons.receive(500);
-//
-// assertNotNull(tm);
-//
-// assertEquals("message:" + i, tm.getText());
-// }
-//
-// //So now, messages should be in queue1 on server 1
-// //So we now kill server 1
-// //Which should cause transparent failover of connection conn onto server 1
-//
-// log.info("************ KILLING (CRASHING) SERVER 1");
-//
-// ServerManagement.getServer(1).kill();
-//
-// log.info("killed server, now waiting");
-//
-// Thread.sleep(5000);
-//
-// log.info("done wait");
-//
-// state = (ConnectionState)del.getState();
-//
-// int finalServerID = state.getServerID();
-//
-// log.info("final server id= " + finalServerID);
-//
-// //server id should now be 2
-//
-// assertEquals(2, finalServerID);
-//
-// conn.start();
-//
-// //Now should be able to consume the rest of the messages
-//
-// log.info("here1");
-//
-// TextMessage tm = null;
-//
-// for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
-// {
-// tm = (TextMessage)cons.receive(500);
-//
-// log.info("message is " + tm.getText());
-//
-// assertNotNull(tm);
-//
-// assertEquals("message:" + i, tm.getText());
-// }
-//
-// log.info("here2");
-//
-// //Now should be able to commit them
-//
-// sess.commit();
-//
-// //Now check there are no more messages there
-// sess.close();
-//
-// sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// cons = sess.createConsumer(queue1);
-//
-// Message m = cons.receive(500);
-//
-// assertNull(m);
-//
-// log.info("got to end of test");
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// try
-// {
-// conn.close();
-// }
-// catch (Exception e)
-// {
-// e.printStackTrace();
-// }
-// }
-// }
-//
-// }
+ public void testFailoverWithUnackedMessagesTransactional() throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+ assertEquals(3, nodeIDView.size());
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ ClientConnectionFactoryDelegate cf1 = delegates[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegates[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegates[2];
+
+ int server0Id = cf1.getServerId();
+
+ int server1Id = cf2.getServerId();
+
+ int server2Id = cf3.getServerId();
+
+ log.info("server 0 id: " + server0Id);
+
+ log.info("server 1 id: " + server1Id);
+
+ log.info("server 2 id: " + server2Id);
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info(failoverMap.get(new Integer(server0Id)));
+ log.info(failoverMap.get(new Integer(server1Id)));
+ log.info(failoverMap.get(new Integer(server2Id)));
+
+ int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+
+ // server 1 should failover onto server 2
+
+ assertEquals(server2Id, server1FailoverId);
+
+ Connection conn = null;
+
+ try
+ {
+ //Get a connection on server 1
+ conn = factory.createConnection(); //connection on server 0
+
+ conn.close();
+
+ conn = factory.createConnection(); //connection on server 1
+
+ JBossConnection jbc = (JBossConnection)conn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ int initialServerID = state.getServerID();
+
+ assertEquals(1, initialServerID);
+
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message:" + i);
+
+ prod.send(tm);
+ }
+
+ sess.commit();
+
+ conn.start();
+
+ //Now consume half of the messages but don't commit them these will end up in
+ //client side resource manager
+
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(500);
+
+ assertNotNull(tm);
+
+ assertEquals("message:" + i, tm.getText());
+ }
+
+ //So now, messages should be in queue1 on server 1
+ //So we now kill server 1
+ //Which should cause transparent failover of connection conn onto server 1
+
+ log.info("************ KILLING (CRASHING) SERVER 1");
+
+ ServerManagement.getServer(1).kill();
+
+ log.info("killed server, now waiting");
+
+ Thread.sleep(5000);
+
+ log.info("done wait");
+
+ state = (ConnectionState)del.getState();
+
+ int finalServerID = state.getServerID();
+
+ log.info("final server id= " + finalServerID);
+
+ //server id should now be 2
+
+ assertEquals(2, finalServerID);
+
+ conn.start();
+
+ //Now should be able to consume the rest of the messages
+
+ log.info("here1");
+
+ TextMessage tm = null;
+
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+ {
+ tm = (TextMessage)cons.receive(500);
+
+ log.info("message is " + tm.getText());
+
+ assertNotNull(tm);
+
+ assertEquals("message:" + i, tm.getText());
+ }
+
+ log.info("here2");
+
+ //Now should be able to commit them
+
+ sess.commit();
+
+ //Now check there are no more messages there
+ sess.close();
+
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ cons = sess.createConsumer(queue1);
+
+ Message m = cons.receive(500);
+
+ assertNull(m);
+
+ log.info("got to end of test");
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
More information about the jboss-cvs-commits
mailing list