[jboss-cvs] JBoss Messaging SVN: r1770 - in trunk: . src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 12 05:50:19 EST 2006
Author: timfox
Date: 2006-12-12 05:49:42 -0500 (Tue, 12 Dec 2006)
New Revision: 1770
Added:
trunk/jboss-remoting-npe-temp-fix.jar
trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedDestinationsTest.java
Removed:
trunk/jboss-remoting-npe-fix.jar
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
Modified:
trunk/.classpath
trunk/src/etc/server/default/deploy/messaging-service.xml
trunk/src/etc/xmdesc/ServerPeer-xmbean.xml
trunk/src/main/org/jboss/jms/client/container/HAAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
trunk/src/main/org/jboss/jms/tx/AckInfo.java
trunk/src/main/org/jboss/messaging/core/Channel.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
More HA work
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/.classpath 2006-12-12 10:49:42 UTC (rev 1770)
@@ -11,7 +11,6 @@
<classpathentry kind="lib" path="lib/jboss-system.jar"/>
<classpathentry kind="lib" path="lib/jboss-transaction.jar"/>
<classpathentry kind="lib" path="lib/jnp-client.jar"/>
- <classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar"/>
<classpathentry kind="lib" path="thirdparty/oswego-concurrent/lib/concurrent.jar"/>
<classpathentry kind="lib" path="tests/lib/jboss-common-jdbc-wrapper.jar"/>
<classpathentry kind="lib" path="tests/lib/jboss-jca.jar"/>
@@ -41,9 +40,6 @@
<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jdk14-pluggable-instrumentor.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jrockit-pluggable-instrumentor.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/pluggable-instrumentor.jar"/>
- <classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-archive-browsing.jar"/>
- <classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common-client.jar"/>
- <classpathentry kind="lib" path="thirdparty/jboss/common/lib/namespace.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/jbossxb/lib/jboss-xml-binding.jar"/>
<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/backport-util-concurrent.jar"/>
<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/retrotranslator-runtime.jar"/>
@@ -51,5 +47,8 @@
<classpathentry kind="lib" path="thirdparty/trove/lib/trove.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
+ <classpathentry kind="lib" path="thirdparty/jboss/common-logging-log4j/lib/jboss-logging-log4j.jar"/>
+ <classpathentry kind="lib" path="thirdparty/jboss/common-logging-spi/lib/jboss-logging-spi.jar"/>
+ <classpathentry kind="lib" path="thirdparty/jboss/common-core/lib/jboss-common-core.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Deleted: trunk/jboss-remoting-npe-fix.jar
===================================================================
(Binary files differ)
Added: trunk/jboss-remoting-npe-temp-fix.jar
===================================================================
(Binary files differ)
Property changes on: trunk/jboss-remoting-npe-temp-fix.jar
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Modified: trunk/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/messaging-service.xml 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/etc/server/default/deploy/messaging-service.xml 2006-12-12 10:49:42 UTC (rev 1770)
@@ -35,6 +35,8 @@
</attribute>
<attribute name="MaxDeliveryAttempts">10</attribute>
<attribute name="DLQName">DLQ</attribute>
+ <attribute name="FailoverStartTimeout">3000</attribute>
+ <attribute name="FailoverCompleteTimeout">12000</attribute>
</mbean>
</server>
\ No newline at end of file
Modified: trunk/src/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ServerPeer-xmbean.xml 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/etc/xmdesc/ServerPeer-xmbean.xml 2006-12-12 10:49:42 UTC (rev 1770)
@@ -150,7 +150,19 @@
<description>The JNDI name of the DLQ</description>
<name>DLQName</name>
<type>java.lang.String</type>
- </attribute>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getFailoverStartTimeout" setMethod="setFailoverStartTimeout">
+ <description>The maximum amount of time to wait for failover to begin</description>
+ <name>FailoverStartTimeout</name>
+ <type>long</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getFailoverCompleteTimeout" setMethod="setFailoverCompleteTimeout">
+ <description>The maximum amount of time to wait for failover to complete</description>
+ <name>FailoverCompleteTimeout</name>
+ <type>long</type>
+ </attribute>
<!-- Managed operations -->
Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -49,6 +49,7 @@
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.server.endpoint.CreateConnectionResult;
+import org.jboss.jms.tx.AckInfo;
import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
import org.jboss.remoting.ConnectionListener;
@@ -328,8 +329,23 @@
failedSessionDelegate.copyState(newSessionDelegate);
log.info("copied state");
+
+ //Now we remove any unacked np messages - this is because we don't want to ack them
+ //since the server won't know about them and will barf
+ Iterator iter = failedSessionState.getToAck().iterator();
+
+ while (iter.hasNext())
+ {
+ AckInfo info = (AckInfo)iter.next();
+
+ if (!info.getMessage().getMessage().isReliable())
+ {
+ iter.remove();
+ }
+ }
+
+ //TODO remove any unacked from the resource manager
-
if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
//TODO Clebert please add comment as to why this clone is necessary
@@ -359,8 +375,32 @@
handleFailoverOnBrowser((BrowserState)sessionChild, newSessionDelegate);
}
}
+
+ /* Now we must sent the list of unacked AckInfos to the server - so the consumers
+ * delivery lists can be repopulated
+ */
+ List ackInfos = null;
+
+ if (!failedSessionState.isTransacted())
+ {
+ //Get the ack infos from the list in the session state
+ ackInfos = failedSessionState.getToAck();
+ }
+ else
+ {
+ //Transacted session - we need to get the acks
+ //TODO
+ }
+
+ //TODO for a transacted session the ackinfos will be in the resource manager!!
+
+ if (!ackInfos.isEmpty())
+ {
+ newSessionDelegate.sendUnackedAckInfos(ackInfos);
+ }
+
}
-
+
//We must not start the connection until the end
if (failedState.isStarted())
{
@@ -369,6 +409,8 @@
log.info("Failover done");
}
+
+
private void handleFailoverOnConsumer(ClientConnectionDelegate connectionDelegate,
ConnectionState failedConnectionState,
@@ -387,11 +429,11 @@
ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
failOverConsumer((JBossDestination)failedConsumerState.getDestination(),
- failedConsumerState.getSelector(),
- failedConsumerState.isNoLocal(),
- failedConsumerState.getSubscriptionName(),
- failedConsumerState.isConnectionConsumer(),
- failedConsumerDelegate.getChannelId());
+ failedConsumerState.getSelector(),
+ failedConsumerState.isNoLocal(),
+ failedConsumerState.getSubscriptionName(),
+ failedConsumerState.isConnectionConsumer(),
+ failedConsumerDelegate.getChannelId());
if (trace) { log.trace("handleFailoverOnConsumer: alternate consumer created"); }
@@ -418,6 +460,9 @@
MessageCallbackHandler handler = cm.unregisterHandler(oldServerID, oldConsumerID);
handler.setConsumerId(failedConsumerState.getConsumerID());
+
+ //Clear the buffer of the handler
+ handler.clearBuffer();
cm.registerHandler(failedConnectionState.getServerID(),
failedConsumerState.getConsumerID(),
@@ -426,8 +471,8 @@
failedSessionState.addCallbackHandler(handler);
log.info("failed over consumer");
-
}
+
private void handleFailoverOnProducer(ProducerState failedProducerState,
ClientSessionDelegate failedSessionDelegate)
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -441,6 +441,15 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
+ public void sendUnackedAckInfos(List ackInfos) throws JMSException
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
// Public --------------------------------------------------------
@@ -457,6 +466,7 @@
return ((ConnectionState)state.getParent()).getRemotingConnection().getInvokingClient();
}
+
// Package Private -----------------------------------------------
// Private -------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -94,6 +94,8 @@
public void registerHandler(int serverID, int consumerID, MessageCallbackHandler handler)
{
+ log.info(this + " registeringHandler, serverID:" + serverID + " consumerID:" + consumerID);
+
Long lookup = computeLookup(serverID, consumerID);
callbackHandlers.put(lookup, handler);
@@ -101,6 +103,8 @@
public MessageCallbackHandler unregisterHandler(int serverID, int consumerID)
{
+ log.info(this + " unregisteringHandler, serverID:" + serverID + " consumerID:" + consumerID);
+
Long lookup = computeLookup(serverID, consumerID);
return (MessageCallbackHandler)callbackHandlers.remove(lookup);
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -316,6 +316,9 @@
// otherwise the messages wouldn't get cancelled until the corresponding session died.
// So if another consumer in another session tried to consume from the channel before that
// session died it wouldn't receive those messages.
+ // We can't just cancel all the messages in the SCE since some of those messages might
+ // have actually been delivered (unlike these) and we may want to acknowledge them
+ // later, after this consumer has been closed
List ackInfos = new ArrayList();
@@ -514,6 +517,11 @@
}
}
+ public void clearBuffer()
+ {
+ buffer.clear();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -48,7 +48,6 @@
*/
public class SessionState extends HierarchicalStateSupport
{
-
protected static Logger log = Logger.getLogger(SessionState.class);
private int acknowledgeMode;
@@ -205,16 +204,5 @@
{
return new ArrayList(callbackHandlers.values());
}
-
- /*** used for HA Handling */
- public void cleanCallBackHandlers()
- {
- if (log.isTraceEnabled())
- {
- log.trace("Clearing callBackHandlers size=" + callbackHandlers.size());
- }
- callbackHandlers.clear();
- }
-
}
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -111,6 +111,10 @@
private String dlqName;
private Object failoverStatusLock;
+
+ private long failoverStartTimeout = 3000;
+
+ private long failoverCompleteTimeout = 12000;
// wired components
@@ -135,9 +139,6 @@
protected ObjectName postOfficeObjectName;
protected PostOffice postOffice;
-// protected ObjectName topicPostOfficeObjectName;
-// protected PostOffice topicPostOffice;
-
protected ObjectName jmsUserManagerObjectName;
protected JMSUserManager jmsUserManager;
@@ -352,16 +353,6 @@
postOfficeObjectName = on;
}
-// public ObjectName getTopicPostOffice()
-// {
-// return topicPostOfficeObjectName;
-// }
-//
-// public void setTopicPostOffice(ObjectName on)
-// {
-// topicPostOfficeObjectName = on;
-// }
-
public ObjectName getJmsUserManager()
{
return jmsUserManagerObjectName;
@@ -478,6 +469,28 @@
{
this.queuedExecutorPoolSize = poolSize;
}
+
+ public long getFailoverStartTimeout()
+ {
+ return this.failoverStartTimeout;
+ }
+
+ public void setFailoverStartTimeout(long timeout)
+ {
+ this.failoverStartTimeout = timeout;
+ }
+
+ public long getFailoverCompleteTimeout()
+ {
+ return this.failoverCompleteTimeout;
+ }
+
+ public void setFailoverCompleteTimeout(long timeout)
+ {
+ this.failoverCompleteTimeout = timeout;
+ }
+
+
// JMX Operations ------------------------------------------------
@@ -729,14 +742,11 @@
Replicator replicator = getReplicator();
- //TODO - these must be configurable
- final long FAILOVER_START_TIMEOUT = 15000;
+ //Failover
+
+ long startToWait = failoverStartTimeout;
- final long FAILOVER_COMPLETE_TIMEOUT = 25000;
-
- long startToWait = FAILOVER_START_TIMEOUT;
-
- long completeToWait = FAILOVER_COMPLETE_TIMEOUT;
+ long completeToWait = failoverCompleteTimeout;
//Must lock here
synchronized (failoverStatusLock)
@@ -1104,5 +1114,4 @@
}
}
}
-
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -31,9 +31,9 @@
import org.jboss.jms.selector.Selector;
import org.jboss.jms.server.remoting.JMSDispatcher;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Channel;
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.Routable;
-import org.jboss.messaging.core.local.PagingFilteredQueue;
/**
* Concrete implementation of BrowserEndpoint.
@@ -66,7 +66,7 @@
// Constructors --------------------------------------------------
protected ServerBrowserEndpoint(ServerSessionEndpoint session, int id,
- PagingFilteredQueue destination, String messageSelector)
+ Channel destination, String messageSelector)
throws JMSException
{
this.session = session;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -42,6 +42,7 @@
import org.jboss.jms.server.remoting.MessagingMarshallable;
import org.jboss.jms.util.ExceptionUtil;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Channel;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.MessageReference;
@@ -49,7 +50,6 @@
import org.jboss.messaging.core.Receiver;
import org.jboss.messaging.core.Routable;
import org.jboss.messaging.core.SimpleDelivery;
-import org.jboss.messaging.core.local.PagingFilteredQueue;
import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.tx.Transaction;
@@ -87,7 +87,7 @@
private int id;
- private PagingFilteredQueue messageQueue;
+ private Channel messageQueue;
private String queueName;
@@ -131,7 +131,7 @@
// Constructors --------------------------------------------------
- protected ServerConsumerEndpoint(int id, PagingFilteredQueue messageQueue, String queueName,
+ protected ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
ServerSessionEndpoint sessionEndpoint,
String selector, boolean noLocal, JBossDestination dest,
int prefetchSize, Queue dlq)
@@ -633,7 +633,24 @@
}
}
-
+
+ protected void createDeliveries(List messageIds) throws Throwable
+ {
+ List dels = messageQueue.createDeliveries(messageIds);
+
+ synchronized (lock)
+ {
+ Iterator iter = dels.iterator();
+
+ while (iter.hasNext())
+ {
+ Delivery del = (Delivery)iter.next();
+
+ deliveries.put(new Long(del.getReference().getMessageID()), del);
+ }
+ }
+ }
+
protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
{
Delivery del = (Delivery)deliveries.remove(messageID);
@@ -653,7 +670,7 @@
throw new IllegalStateException("Cannot find delivery to cancel:" + id);
}
}
-
+
protected void start()
{
synchronized (lock)
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -186,11 +187,11 @@
int prefetchSize = connectionEndpoint.getPrefetchSize();
ServerConsumerEndpoint ep =
- new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
+
+ new ServerConsumerEndpoint(consumerID, binding.getQueue(),
binding.getQueue().getName(), this, selectorString, noLocal,
jmsDestination, prefetchSize, dlq);
-
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
ClientConsumerDelegate stub =
@@ -755,6 +756,56 @@
throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDeliveries");
}
}
+
+ public void sendUnackedAckInfos(List ackInfos) throws JMSException
+ {
+ try
+ {
+ //Sort into different list for each consumer
+ Map ackMap = new HashMap();
+
+ for (int i = ackInfos.size() - 1; i >= 0; i--)
+ {
+ AckInfo ack = (AckInfo)ackInfos.get(i);
+
+ ServerConsumerEndpoint consumer =
+ this.connectionEndpoint.getConsumerEndpoint(ack.getConsumerID());
+
+ if (consumer == null)
+ {
+ throw new IllegalArgumentException("Cannot find consumer id: " + ack.getConsumerID());
+ }
+
+ LinkedList acks = (LinkedList)ackMap.get(consumer);
+
+ if (acks == null)
+ {
+ acks = new LinkedList();
+
+ ackMap.put(consumer, acks);
+ }
+
+ acks.addFirst(new Long(ack.getMessageID()));
+ }
+
+ Iterator iter = ackMap.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)entry.getKey();
+
+ List acks = (List)entry.getValue();
+
+ consumer.createDeliveries(acks);
+ }
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " sendUnackedAckInfos");
+ }
+ }
public void addTemporaryDestination(JBossDestination dest) throws JMSException
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -120,5 +120,14 @@
* @param ackInfos
*/
void cancelDeliveries(List ackInfos) throws JMSException;
+
+
+ /**
+ * Send a list of unacked ackInfos to the server so the delivery lists can be repopulated
+ * used at failover
+ * @param ackInfos
+ * @throws JMSException
+ */
+ void sendUnackedAckInfos(List ackInfos) throws JMSException;
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -147,6 +147,11 @@
{
endpoint.cancelDeliveries(ackInfos);
}
+
+ public void sendUnackedAckInfos(List ackInfos) throws JMSException
+ {
+ endpoint.sendUnackedAckInfos(ackInfos);
+ }
// AdvisedSupport overrides --------------------------------------
@@ -161,6 +166,7 @@
return "SessionAdvised->" + endpoint;
}
+
// Public --------------------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -37,8 +37,8 @@
import org.jboss.aop.Dispatcher;
import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.client.remoting.HandleMessageResponse;
-import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.Version;
@@ -95,6 +95,7 @@
protected static final byte MORE = 5;
protected static final byte SEND_TRANSACTION = 6;
protected static final byte GET_ID_BLOCK = 7;
+ protected static final byte UNACKED_ACKINFOS = 8;
// The response codes - start from 100
@@ -312,6 +313,28 @@
if (trace) { log.trace("wrote cancelDeliveries()"); }
}
+ else if ("sendUnackedAckInfos".equals(methodName) && mi.getArguments() != null)
+ {
+ dos.writeByte(UNACKED_ACKINFOS);
+
+ writeHeader(mi, dos);
+
+ List ids = (List)mi.getArguments()[0];
+
+ dos.writeInt(ids.size());
+
+ Iterator iter = ids.iterator();
+
+ while (iter.hasNext())
+ {
+ AckInfo ack = (AckInfo)iter.next();
+ ack.write(dos);
+ }
+
+ dos.flush();
+
+ if (trace) { log.trace("wrote sendUnackedAckInfos()"); }
+ }
else
{
dos.write(SERIALIZED);
@@ -700,6 +723,35 @@
return request;
}
+ case UNACKED_ACKINFOS:
+ {
+ MethodInvocation mi = readHeader(dis);
+
+ int size = dis.readInt();
+
+ List acks = new ArrayList(size);
+
+ for (int i = 0; i < size; i++)
+ {
+ AckInfo ack = new AckInfo();
+
+ ack.read(dis);
+
+ acks.add(ack);
+ }
+
+ Object[] args = new Object[] {acks};
+
+ mi.setArguments(args);
+
+ InvocationRequest request =
+ new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
+ new MessagingMarshallable(version, mi), null, null, null);
+
+ if (trace) { log.trace("read unackedAckInfos()"); }
+
+ return request;
+ }
case ID_BLOCK_RESPONSE:
{
IdBlock block = new IdBlock();
Modified: trunk/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/AckInfo.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/tx/AckInfo.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -86,7 +86,7 @@
/** Used to change ack's id during failover */
public void setConsumerID(int consumerID)
{
- this.consumerID=consumerID;
+ this.consumerID = consumerID;
}
public MessageProxy getMessage()
Modified: trunk/src/main/org/jboss/messaging/core/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Channel.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/messaging/core/Channel.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -145,6 +145,8 @@
void deactivate();
boolean isActive();
+
+ List createDeliveries(List messageIds);
}
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -491,6 +491,66 @@
}
}
}
+
+ public List createDeliveries(List messageIds)
+ {
+ //debug
+ Iterator iter = messageIds.iterator();
+
+ log.info("***** createdeliveries");
+ while (iter.hasNext())
+ {
+ Long l = (Long)iter.next();
+
+ log.info("Creating delivery for " + l);
+ }
+ log.info("**** end dump");
+
+ iter = messageIds.iterator();
+
+ List dels = new ArrayList();
+
+ synchronized (refLock)
+ {
+ synchronized (deliveryLock)
+ {
+ ListIterator liter = messageRefs.iterator();
+
+ while (iter.hasNext())
+ {
+ Long id = (Long)iter.next();
+
+ //Scan the queue
+ while (true)
+ {
+ if (!liter.hasNext())
+ {
+ // TODO we need to look in paging state too - currently not supported
+
+ throw new IllegalStateException("Cannot find ref in queue! (Might be paged!)");
+ }
+
+ MessageReference ref = (MessageReference)liter.next();
+
+ if (ref.getMessageID() == id.longValue())
+ {
+ liter.remove();
+
+ Delivery del = new SimpleDelivery(this, ref);
+
+ dels.add(del);
+
+ this.deliveries.add(del);
+
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ return dels;
+ }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -272,4 +272,9 @@
{
return "RemoteQueueStub(node=" + this.nodeId + " name=" + this.name + " channelId=" + this.id + ")";
}
+
+ public List createDeliveries(List messageIds)
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/build.xml 2006-12-12 10:49:42 UTC (rev 1770)
@@ -745,7 +745,7 @@
haltonerror="${junit.batchtest.haltonerror}">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
<fileset dir="${build.tests.classes}">
- <include name="**/jms/clustering/*Test.class"/>
+ <include name="**/jms/clustering/HATest.class"/>
<!--
<include name="**/jms/clustering/SimpleClusteringTest.class"/>
-->
Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleChannel.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleChannel.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -216,6 +216,11 @@
throw new UnsupportedOperationException();
}
+ public List createDeliveries(List messageIds)
+ {
+ throw new UnsupportedOperationException();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -369,6 +369,12 @@
// TODO Auto-generated method stub
return false;
}
+
+ public List createDeliveries(List messageIds)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.Filter;
@@ -608,6 +609,12 @@
// TODO Auto-generated method stub
return false;
}
+
+ public List createDeliveries(List messageIds)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Copied: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedDestinationsTest.java (from rev 1767, trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java)
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java 2006-12-12 02:16:50 UTC (rev 1767)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedDestinationsTest.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -0,0 +1,1023 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+
+/**
+ *
+ * A DistributedDestinationsTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DistributedDestinationsTest extends ClusteringTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DistributedDestinationsTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
+ {
+ clusteredQueueLocalConsumer(false);
+ }
+
+ public void testClusteredQueueLocalConsumerPersistent() throws Exception
+ {
+ clusteredQueueLocalConsumer(true);
+ }
+
+ public void testClusteredTopicNonDurableNonPersistent() throws Exception
+ {
+ clusteredTopicNonDurable(false);
+ }
+
+ public void testClusteredTopicNonDurablePersistent() throws Exception
+ {
+ clusteredTopicNonDurable(true);
+ }
+
+ public void testClusteredTopicNonDurableWithSelectorsNonPersistent() throws Exception
+ {
+ clusteredTopicNonDurableWithSelectors(false);
+ }
+
+ public void testClusteredTopicNonDurableWithSelectorsPersistent() throws Exception
+ {
+ clusteredTopicNonDurableWithSelectors(true);
+ }
+
+ public void testClusteredTopicDurableNonPersistent() throws Exception
+ {
+ clusteredTopicDurable(false);
+ }
+
+ public void testClusteredTopicDurablePersistent() throws Exception
+ {
+ clusteredTopicDurable(true);
+ }
+
+ public void testClusteredTopicSharedDurableLocalConsumerNonPersistent() throws Exception
+ {
+ clusteredTopicSharedDurableLocalConsumer(false);
+ }
+
+ public void testClusteredTopicSharedDurableLocalConsumerPersistent() throws Exception
+ {
+ clusteredTopicSharedDurableLocalConsumer(true);
+ }
+
+ public void testClusteredTopicSharedDurableNoLocalSubNonPersistent() throws Exception
+ {
+ clusteredTopicSharedDurableNoLocalSub(false);
+ }
+
+ public void testClusteredTopicSharedDurableNoLocalSubPersistent() throws Exception
+ {
+ clusteredTopicSharedDurableNoLocalSub(true);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+
+ /*
+ * Create a consumer on each queue on each node.
+ * Send messages in turn from all nodes.
+ * Ensure that the local consumer gets the message
+ */
+ protected void clusteredQueueLocalConsumer(boolean persistent) throws Exception
+ {
+ Connection conn1 = null;
+ Connection conn2 = null;
+ Connection conn3 = null;
+
+ try
+ {
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+ conn3 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn1, conn2, conn3);
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ log.info("Created sessions");
+
+ MessageConsumer cons1 = sess1.createConsumer(queue0);
+ MessageConsumer cons2 = sess2.createConsumer(queue1);
+ MessageConsumer cons3 = sess3.createConsumer(queue2);
+
+ log.info("Created consumers");
+
+ conn1.start();
+ conn2.start();
+ conn3.start();
+
+ // Send at node 0
+
+ MessageProducer prod = sess1.createProducer(queue0);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ log.info("Sent messages");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ Message m = cons2.receive(2000);
+
+ assertNull(m);
+
+ m = cons3.receive(2000);
+
+ assertNull(m);
+
+ // Send at node 1
+
+ MessageProducer prod1 = sess2.createProducer(queue1);
+
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ m = cons3.receive(2000);
+
+ assertNull(m);
+
+ // Send at node 2
+
+ MessageProducer prod2 = sess3.createProducer(queue2);
+
+ prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess3.createTextMessage("message" + i);
+
+ prod2.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+ }
+ }
+
+ // Private -------------------------------------------------------
+
+ /*
+ * Create non durable subscriptions on all nodes of the cluster.
+ * Ensure all messages are receive as appropriate
+ */
+ private void clusteredTopicNonDurable(boolean persistent) throws Exception
+ {
+ Connection conn1 = null;
+ Connection conn2 = null;
+ Connection conn3 = null;
+ try
+ {
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+ conn3 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn1, conn2, conn3);
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(topic0);
+ MessageConsumer cons2 = sess2.createConsumer(topic1);
+ MessageConsumer cons3 = sess3.createConsumer(topic2);
+
+ MessageConsumer cons4 = sess1.createConsumer(topic0);
+
+ MessageConsumer cons5 = sess2.createConsumer(topic1);
+
+ conn1.start();
+ conn2.start();
+ conn3.start();
+
+ // Send at node 0
+
+ MessageProducer prod = sess1.createProducer(topic0);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons4.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons5.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+ }
+ }
+
+ /*
+ * Create non durable subscriptions on all nodes of the cluster.
+ * Include some with selectors
+ * Ensure all messages are receive as appropriate
+ */
+ private void clusteredTopicNonDurableWithSelectors(boolean persistent) throws Exception
+ {
+ Connection conn1 = null;
+ Connection conn2 = null;
+ Connection conn3 = null;
+
+ try
+ {
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+ conn3 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn1, conn2, conn3);
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(topic0);
+ MessageConsumer cons2 = sess2.createConsumer(topic1);
+ MessageConsumer cons3 = sess3.createConsumer(topic2);
+
+ MessageConsumer cons4 = sess1.createConsumer(topic0, "COLOUR='red'");
+
+ MessageConsumer cons5 = sess2.createConsumer(topic1, "COLOUR='blue'");
+
+ conn1.start();
+ conn2.start();
+ conn3.start();
+
+ // Send at node 0
+
+ MessageProducer prod = sess1.createProducer(topic0);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ int c = i % 3;
+ if (c == 0)
+ {
+ tm.setStringProperty("COLOUR", "red");
+ }
+ else if (c == 1)
+ {
+ tm.setStringProperty("COLOUR", "blue");
+ }
+
+ prod.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ int c = i % 3;
+
+ if (c == 0)
+ {
+ TextMessage tm = (TextMessage)cons4.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ int c = i % 3;
+
+ if (c == 1)
+ {
+ TextMessage tm = (TextMessage)cons5.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+ }
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+ }
+ }
+
+
+
+ /*
+ * Create durable subscriptions on all nodes of the cluster.
+ * Include a couple with selectors
+ * Ensure all messages are receive as appropriate
+ * None of the durable subs are shared
+ */
+ private void clusteredTopicDurable(boolean persistent) throws Exception
+ {
+ Connection conn1 = null;
+ Connection conn2 = null;
+ Connection conn3 = null;
+ try
+ {
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+ conn3 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn1, conn2, conn3);
+
+ conn1.setClientID("wib1");
+ conn2.setClientID("wib1");
+ conn3.setClientID("wib1");
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ try
+ {
+ sess1.unsubscribe("sub");
+ }
+ catch (Exception ignore) {}
+ try
+ {
+ sess2.unsubscribe("sub1");
+ }
+ catch (Exception ignore) {}
+ try
+ {
+ sess3.unsubscribe("sub2");
+ }
+ catch (Exception ignore) {}
+ try
+ {
+ sess1.unsubscribe("sub3");
+ }
+ catch (Exception ignore) {}
+ try
+ {
+ sess2.unsubscribe("sub4");
+ }
+ catch (Exception ignore) {}
+
+ MessageConsumer cons1 = sess1.createDurableSubscriber(topic0, "sub");
+ MessageConsumer cons2 = sess2.createDurableSubscriber(topic1, "sub1");
+ MessageConsumer cons3 = sess3.createDurableSubscriber(topic2, "sub2");
+ MessageConsumer cons4 = sess1.createDurableSubscriber(topic0, "sub3");
+ MessageConsumer cons5 = sess2.createDurableSubscriber(topic1, "sub4");
+
+ conn1.start();
+ conn2.start();
+ conn3.start();
+
+ // Send at node 0
+
+ MessageProducer prod = sess1.createProducer(topic0);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons4.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons5.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ cons1.close();
+ cons2.close();
+ cons3.close();
+ cons4.close();
+ cons5.close();
+
+ sess1.unsubscribe("sub");
+ sess2.unsubscribe("sub1");
+ sess3.unsubscribe("sub2");
+ sess1.unsubscribe("sub3");
+ sess2.unsubscribe("sub4");
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+ }
+ }
+
+
+
+
+ /*
+ * Create shared durable subs on multiple nodes, the local instance should always get the message
+ */
+ protected void clusteredTopicSharedDurableLocalConsumer(boolean persistent) throws Exception
+ {
+ Connection conn1 = null;
+ Connection conn2 = null;
+ Connection conn3 = null;
+ try
+
+ {
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+ conn3 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn1, conn2, conn3);
+ conn1.setClientID("wib1");
+ conn2.setClientID("wib1");
+ conn3.setClientID("wib1");
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ try
+ {
+ sess1.unsubscribe("sub");
+ }
+ catch (Exception ignore) {}
+ try
+ {
+ sess2.unsubscribe("sub");
+ }
+ catch (Exception ignore) {}
+ try
+ {
+ sess3.unsubscribe("sub");
+ }
+ catch (Exception ignore) {}
+
+ MessageConsumer cons1 = sess1.createDurableSubscriber(topic0, "sub");
+ MessageConsumer cons2 = sess2.createDurableSubscriber(topic1, "sub");
+ MessageConsumer cons3 = sess3.createDurableSubscriber(topic2, "sub");
+
+ conn1.start();
+ conn2.start();
+ conn3.start();
+
+ // Send at node 0
+
+ MessageProducer prod = sess1.createProducer(topic0);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ Message m = cons2.receive(2000);
+
+ assertNull(m);
+
+ m = cons3.receive(2000);
+
+ assertNull(m);
+
+ // Send at node 1
+
+ MessageProducer prod1 = sess2.createProducer(topic1);
+
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess3.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ m = cons3.receive(2000);
+
+ assertNull(m);
+
+ // Send at node 2
+
+ MessageProducer prod2 = sess3.createProducer(topic2);
+
+ prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess3.createTextMessage("message" + i);
+
+ prod2.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ cons1.close();
+ cons2.close();
+ cons3.close();
+
+ // Need to unsubscribe on any node that the durable sub was created on
+
+ sess1.unsubscribe("sub");
+ sess2.unsubscribe("sub");
+ sess3.unsubscribe("sub");
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+ }
+ }
+
+
+
+ /*
+ * Create shared durable subs on multiple nodes, but without sub on local node
+ * should round robin
+ * note that this test assumes round robin
+ */
+ protected void clusteredTopicSharedDurableNoLocalSub(boolean persistent) throws Exception
+ {
+ Connection conn1 = null;
+ Connection conn2 = null;
+ Connection conn3 = null;
+
+ try
+ {
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+ conn3 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(conn1, conn2, conn3);
+
+ conn2.setClientID("wib1");
+ conn3.setClientID("wib1");
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ try
+ {
+ sess2.unsubscribe("sub");
+ }
+ catch (Exception ignore) {}
+ try
+ {
+ sess3.unsubscribe("sub");
+ }
+ catch (Exception ignore) {}
+
+ MessageConsumer cons1 = sess2.createDurableSubscriber(topic1, "sub");
+ MessageConsumer cons2 = sess3.createDurableSubscriber(topic2, "sub");
+
+ conn2.start();
+ conn3.start();
+
+ // Send at node 0
+
+ //Should round robin between the other 2 since there is no active consumer on sub on node 0
+
+ MessageProducer prod = sess1.createProducer(topic0);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i * 2, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + (i * 2 + 1), tm.getText());
+ }
+
+ cons1.close();
+ cons2.close();
+
+ sess2.unsubscribe("sub");
+ sess3.unsubscribe("sub");
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+ }
+ }
+
+ class MyListener implements MessageListener
+ {
+ private int i;
+
+ MyListener(int i)
+ {
+ this.i = i;
+ }
+
+ public void onMessage(Message m)
+ {
+ try
+ {
+ int count = m.getIntProperty("count");
+
+ log.info("Listener " + i + " received message " + count);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+
+ // Inner classes -------------------------------------------------
+
+
+
+}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -26,6 +26,7 @@
import java.util.Set;
import javax.jms.Connection;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -36,7 +37,6 @@
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
-import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -69,355 +69,490 @@
}
// Public --------------------------------------------------------
+//
+// /*
+// * Test that connections created using a clustered connection factory are created round robin on
+// * different servers
+// */
+// public void testRoundRobinConnectionCreation() throws Exception
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// log.info ("number of delegates = " + delegate.getDelegates().length);
+// log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
+//
+// assertEquals(3, delegate.getDelegates().length);
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//
+// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//
+// assertEquals(0, cf1.getServerId());
+//
+// assertEquals(1, cf2.getServerId());
+//
+// assertEquals(2, cf3.getServerId());
+//
+// assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+//
+// Connection conn1 = null;
+//
+// Connection conn2 = null;
+//
+// Connection conn3 = null;
+//
+// Connection conn4 = null;
+//
+// Connection conn5 = null;
+//
+// try
+// {
+// conn1 = factory.createConnection();
+//
+// conn2 = factory.createConnection();
+//
+// conn3 = factory.createConnection();
+//
+// conn4 = factory.createConnection();
+//
+// conn5 = factory.createConnection();
+//
+// ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
+//
+// ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
+//
+// ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
+//
+// ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
+//
+// ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
+//
+// int serverID1 = state1.getServerID();
+//
+// int serverID2 = state2.getServerID();
+//
+// int serverID3 = state3.getServerID();
+//
+// int serverID4 = state4.getServerID();
+//
+// int serverID5 = state5.getServerID();
+//
+// log.info("server id 1: " + serverID1);
+//
+// log.info("server id 2: " + serverID2);
+//
+// log.info("server id 3: " + serverID3);
+//
+// log.info("server id 4: " + serverID4);
+//
+// log.info("server id 5: " + serverID5);
+//
+// assertEquals(0, serverID1);
+//
+// assertEquals(1, serverID2);
+//
+// assertEquals(2, serverID3);
+//
+// assertEquals(0, serverID4);
+//
+// assertEquals(1, serverID5);
+// }
+// finally
+// {
+// if (conn1 != null)
+// {
+// conn1.close();
+// }
+//
+// if (conn2 != null)
+// {
+// conn2.close();
+// }
+//
+// if (conn3 != null)
+// {
+// conn3.close();
+// }
+//
+// if (conn4 != null)
+// {
+// conn4.close();
+// }
+//
+// if (conn5 != null)
+// {
+// conn5.close();
+// }
+// }
+//
+// }
+//
+// /*
+// * Test that the failover mapping is created correctly and updated properly when nodes leave
+// * or join
+// */
+// public void testDefaultFailoverMap() throws Exception
+// {
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//
+// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//
+// //The order here depends on the order the servers were started in
+//
+// //If any servers get stopped and then started then the order will change
+//
+// log.info("cf1 serverid=" + cf1.getServerId());
+//
+// log.info("cf2 serverid=" + cf2.getServerId());
+//
+// log.info("cf3 serverid=" + cf3.getServerId());
+//
+//
+// assertEquals(0, cf1.getServerId());
+//
+// assertEquals(1, cf2.getServerId());
+//
+// assertEquals(2, cf3.getServerId());
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// assertEquals(3, delegates.length);
+//
+// assertEquals(3, failoverMap.size());
+//
+// // Default failover policy just chooses the node to the right
+//
+// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//
+// assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//
+// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+// }
+//
+// //Now cleanly stop one of the servers
+//
+// log.info("************** STOPPING SERVER 0");
+// ServerManagement.stop(0);
+//
+// log.info("server stopped");
+//
+// assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
+//
+// {
+// //Lookup another connection factory
+//
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+//
+// log.info("Got connection factory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// log.info("Got failover map");
+//
+// assertEquals(2, delegates.length);
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//
+// //Order here depends on order servers were started in
+//
+// log.info("cf1 serverid=" + cf1.getServerId());
+//
+// log.info("cf2 serverid=" + cf2.getServerId());
+//
+// assertEquals(1, cf1.getServerId());
+//
+// assertEquals(2, cf2.getServerId());
+//
+//
+// assertEquals(2, failoverMap.size());
+//
+// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//
+// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+// }
+//
+// //Cleanly stop another server
+//
+// log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
+//
+// ServerManagement.stop(1);
+//
+// assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
+//
+// {
+// //Lookup another connection factory
+//
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// assertEquals(1, delegates.length);
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// assertEquals(2, cf1.getServerId());
+//
+//
+// assertEquals(1, failoverMap.size());
+//
+// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+// }
+//
+// //Restart server 0
+//
+// ServerManagement.start("all", 0);
+//
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//
+// log.info("Got connection factory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// log.info("Got failover map");
+//
+// assertEquals(2, delegates.length);
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//
+// log.info("cf1 serverid=" + cf1.getServerId());
+//
+// log.info("cf2 serverid=" + cf2.getServerId());
+//
+// assertEquals(2, cf1.getServerId());
+//
+// assertEquals(0, cf2.getServerId());
+//
+//
+// assertEquals(2, failoverMap.size());
+//
+// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//
+// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+// }
+//
+//
+// //Restart server 1
+//
+// ServerManagement.start("all", 1);
+//
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+//
+// log.info("Got connection factory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// log.info("Got failover map");
+//
+// assertEquals(3, delegates.length);
+//
+// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//
+// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//
+// log.info("cf1 serverid=" + cf1.getServerId());
+//
+// log.info("cf2 serverid=" + cf2.getServerId());
+//
+// log.info("cf3 serverid=" + cf3.getServerId());
+//
+// assertEquals(2, cf1.getServerId());
+//
+// assertEquals(0, cf2.getServerId());
+//
+// assertEquals(1, cf3.getServerId());
+//
+//
+// assertEquals(3, failoverMap.size());
+//
+// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//
+// assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//
+// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+// }
+// }
+//
+// public void testSimpleFailover() throws Exception
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+// assertEquals(3, nodeIDView.size());
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// ClientConnectionFactoryDelegate cf1 = delegates[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegates[1];
+//
+// ClientConnectionFactoryDelegate cf3 = delegates[2];
+//
+// int server0Id = cf1.getServerId();
+//
+// int server1Id = cf2.getServerId();
+//
+// int server2Id = cf3.getServerId();
+//
+// log.info("server 0 id: " + server0Id);
+//
+// log.info("server 1 id: " + server1Id);
+//
+// log.info("server 2 id: " + server2Id);
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// log.info(failoverMap.get(new Integer(server0Id)));
+// log.info(failoverMap.get(new Integer(server1Id)));
+// log.info(failoverMap.get(new Integer(server2Id)));
+//
+// int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+//
+// // server 1 should failover onto server 2
+//
+// assertEquals(server2Id, server1FailoverId);
+//
+// Connection conn = null;
+//
+// try
+// {
+//
+// //Get a connection on server 1
+// conn = factory.createConnection(); //connection on server 0
+//
+// conn.close();
+//
+// conn = factory.createConnection(); //connection on server 1
+//
+// JBossConnection jbc = (JBossConnection)conn;
+//
+// ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+//
+// ConnectionState state = (ConnectionState)del.getState();
+//
+// int initialServerID = state.getServerID();
+//
+// assertEquals(1, initialServerID);
+//
+// Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// MessageProducer prod = sess.createProducer(queue1);
+//
+// MessageConsumer cons = sess.createConsumer(queue1);
+//
+// final int NUM_MESSAGES = 100;
+//
+// for (int i = 0; i < NUM_MESSAGES; i++)
+// {
+// TextMessage tm = sess.createTextMessage("message:" + i);
+//
+// prod.send(tm);
+// }
+//
+// //So now, messages should be in queue1 on server 1
+// //So we now kill server 1
+// //Which should cause transparent failover of connection conn onto server 1
+//
+// log.info("************ KILLING (CRASHING) SERVER 1");
+//
+// ServerManagement.getServer(1).destroy();
+//
+// log.info("killed server, now waiting");
+//
+// Thread.sleep(5000);
+//
+// log.info("done wait");
+//
+// state = (ConnectionState)del.getState();
+//
+// int finalServerID = state.getServerID();
+//
+// log.info("final server id= " + finalServerID);
+//
+// //server id should now be 2
+//
+// assertEquals(2, finalServerID);
+//
+// conn.start();
+//
+// for (int i = 0; i < NUM_MESSAGES; i++)
+// {
+// TextMessage tm = (TextMessage)cons.receive(1000);
+//
+// log.info("message is " + tm);
+//
+// assertNotNull(tm);
+//
+// assertEquals("message:" + i, tm.getText());
+// }
+// log.info("done");
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// try
+// {
+// conn.close();
+// }
+// catch (Exception e)
+// {
+// e.printStackTrace();
+// }
+// }
+// }
+//
+// }
- /*
- * Test that connections created using a clustered connection factory are created round robin on
- * different servers
- */
- public void testRoundRobinConnectionCreation() throws Exception
- {
- JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- log.info ("number of delegates = " + delegate.getDelegates().length);
- log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
-
- assertEquals(3, delegate.getDelegates().length);
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-
- ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-
- assertEquals(0, cf1.getServerId());
-
- assertEquals(1, cf2.getServerId());
-
- assertEquals(2, cf3.getServerId());
-
- assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-
- Connection conn1 = null;
-
- Connection conn2 = null;
-
- Connection conn3 = null;
-
- Connection conn4 = null;
-
- Connection conn5 = null;
-
- try
- {
- conn1 = factory.createConnection();
-
- conn2 = factory.createConnection();
-
- conn3 = factory.createConnection();
-
- conn4 = factory.createConnection();
-
- conn5 = factory.createConnection();
-
- ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
-
- ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
-
- ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
-
- ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
-
- ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
-
- int serverID1 = state1.getServerID();
-
- int serverID2 = state2.getServerID();
-
- int serverID3 = state3.getServerID();
-
- int serverID4 = state4.getServerID();
-
- int serverID5 = state5.getServerID();
-
- log.info("server id 1: " + serverID1);
-
- log.info("server id 2: " + serverID2);
-
- log.info("server id 3: " + serverID3);
-
- log.info("server id 4: " + serverID4);
-
- log.info("server id 5: " + serverID5);
-
- assertEquals(0, serverID1);
-
- assertEquals(1, serverID2);
-
- assertEquals(2, serverID3);
-
- assertEquals(0, serverID4);
-
- assertEquals(1, serverID5);
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
-
- if (conn3 != null)
- {
- conn3.close();
- }
-
- if (conn4 != null)
- {
- conn4.close();
- }
-
- if (conn5 != null)
- {
- conn5.close();
- }
- }
-
- }
-
- /*
- * Test that the failover mapping is created correctly and updated properly when nodes leave
- * or join
- */
- public void testDefaultFailoverMap() throws Exception
- {
- {
- JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-
- ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-
- ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-
- //The order here depends on the order the servers were started in
-
- //If any servers get stopped and then started then the order will change
-
- log.info("cf1 serverid=" + cf1.getServerId());
-
- log.info("cf2 serverid=" + cf2.getServerId());
-
- log.info("cf3 serverid=" + cf3.getServerId());
-
-
- assertEquals(0, cf1.getServerId());
-
- assertEquals(1, cf2.getServerId());
-
- assertEquals(2, cf3.getServerId());
-
- Map failoverMap = delegate.getFailoverMap();
-
- assertEquals(3, delegates.length);
-
- assertEquals(3, failoverMap.size());
-
- // Default failover policy just chooses the node to the right
-
- assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-
- assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-
- assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
- }
-
- //Now cleanly stop one of the servers
-
- log.info("************** STOPPING SERVER 0");
- ServerManagement.stop(0);
-
- log.info("server stopped");
-
- assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
-
- {
- //Lookup another connection factory
-
- JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-
- log.info("Got connection factory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-
- Map failoverMap = delegate.getFailoverMap();
-
- log.info("Got failover map");
-
- assertEquals(2, delegates.length);
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-
- //Order here depends on order servers were started in
-
- log.info("cf1 serverid=" + cf1.getServerId());
-
- log.info("cf2 serverid=" + cf2.getServerId());
-
- assertEquals(1, cf1.getServerId());
-
- assertEquals(2, cf2.getServerId());
-
-
- assertEquals(2, failoverMap.size());
-
- assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-
- assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
- }
-
- //Cleanly stop another server
-
- log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
-
- ServerManagement.stop(1);
-
- assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
-
- {
- //Lookup another connection factory
-
- JBossConnectionFactory factory = (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-
- Map failoverMap = delegate.getFailoverMap();
-
- assertEquals(1, delegates.length);
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- assertEquals(2, cf1.getServerId());
-
-
- assertEquals(1, failoverMap.size());
-
- assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
- }
-
- //Restart server 0
-
- ServerManagement.start("all", 0);
-
- {
- JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-
- log.info("Got connection factory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-
- Map failoverMap = delegate.getFailoverMap();
-
- log.info("Got failover map");
-
- assertEquals(2, delegates.length);
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-
- log.info("cf1 serverid=" + cf1.getServerId());
-
- log.info("cf2 serverid=" + cf2.getServerId());
-
- assertEquals(2, cf1.getServerId());
-
- assertEquals(0, cf2.getServerId());
-
-
- assertEquals(2, failoverMap.size());
-
- assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-
- assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
- }
-
-
- //Restart server 1
-
- ServerManagement.start("all", 1);
-
- {
- JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-
- log.info("Got connection factory");
-
- ClusteredClientConnectionFactoryDelegate delegate =
- (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-
- ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-
- Map failoverMap = delegate.getFailoverMap();
-
- log.info("Got failover map");
-
- assertEquals(3, delegates.length);
-
- ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-
- ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-
- ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-
- log.info("cf1 serverid=" + cf1.getServerId());
-
- log.info("cf2 serverid=" + cf2.getServerId());
-
- log.info("cf3 serverid=" + cf3.getServerId());
-
- assertEquals(2, cf1.getServerId());
-
- assertEquals(0, cf2.getServerId());
-
- assertEquals(1, cf3.getServerId());
-
-
- assertEquals(3, failoverMap.size());
-
- assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-
- assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-
- assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
- }
- }
- public void testSimpleFailover() throws Exception
+ public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
@@ -462,8 +597,7 @@
Connection conn = null;
try
- {
-
+ {
//Get a connection on server 1
conn = factory.createConnection(); //connection on server 0
@@ -481,7 +615,7 @@
assertEquals(1, initialServerID);
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(queue1);
@@ -496,6 +630,20 @@
prod.send(tm);
}
+ conn.start();
+
+ //Now consume half of the messages but don't ack them these will end up in
+ //client side toAck list
+
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(500);
+
+ assertNotNull(tm);
+
+ assertEquals("message:" + i, tm.getText());
+ }
+
//So now, messages should be in queue1 on server 1
//So we now kill server 1
//Which should cause transparent failover of connection conn onto server 1
@@ -503,7 +651,7 @@
log.info("************ KILLING (CRASHING) SERVER 1");
ServerManagement.getServer(1).kill();
-
+
log.info("killed server, now waiting");
Thread.sleep(5000);
@@ -522,17 +670,41 @@
conn.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
+ //Now should be able to consume the rest of the messages
+
+ log.info("here1");
+
+ TextMessage tm = null;
+
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
{
- TextMessage tm = (TextMessage)cons.receive(1000);
+ tm = (TextMessage)cons.receive(500);
+
+ log.info("message is " + tm.getText());
- log.info("message is " + tm);
-
assertNotNull(tm);
assertEquals("message:" + i, tm.getText());
}
- log.info("done");
+
+ log.info("here2");
+
+ //Now should be able to acknowledge them
+
+ tm.acknowledge();
+
+ //Now check there are no more messages there
+ sess.close();
+
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ cons = sess.createConsumer(queue1);
+
+ Message m = cons.receive(500);
+
+ assertNull(m);
+
+ log.info("got to end of test");
}
finally
{
@@ -551,6 +723,182 @@
}
+// public void testFailoverWithUnackedMessagesTransactional() throws Exception
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//
+// ClusteredClientConnectionFactoryDelegate delegate =
+// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+// Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+// assertEquals(3, nodeIDView.size());
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// ClientConnectionFactoryDelegate cf1 = delegates[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegates[1];
+//
+// ClientConnectionFactoryDelegate cf3 = delegates[2];
+//
+// int server0Id = cf1.getServerId();
+//
+// int server1Id = cf2.getServerId();
+//
+// int server2Id = cf3.getServerId();
+//
+// log.info("server 0 id: " + server0Id);
+//
+// log.info("server 1 id: " + server1Id);
+//
+// log.info("server 2 id: " + server2Id);
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// log.info(failoverMap.get(new Integer(server0Id)));
+// log.info(failoverMap.get(new Integer(server1Id)));
+// log.info(failoverMap.get(new Integer(server2Id)));
+//
+// int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+//
+// // server 1 should failover onto server 2
+//
+// assertEquals(server2Id, server1FailoverId);
+//
+// Connection conn = null;
+//
+// try
+// {
+// //Get a connection on server 1
+// conn = factory.createConnection(); //connection on server 0
+//
+// conn.close();
+//
+// conn = factory.createConnection(); //connection on server 1
+//
+// JBossConnection jbc = (JBossConnection)conn;
+//
+// ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+//
+// ConnectionState state = (ConnectionState)del.getState();
+//
+// int initialServerID = state.getServerID();
+//
+// assertEquals(1, initialServerID);
+//
+// Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+//
+// MessageProducer prod = sess.createProducer(queue1);
+//
+// MessageConsumer cons = sess.createConsumer(queue1);
+//
+// final int NUM_MESSAGES = 100;
+//
+// for (int i = 0; i < NUM_MESSAGES; i++)
+// {
+// TextMessage tm = sess.createTextMessage("message:" + i);
+//
+// prod.send(tm);
+// }
+//
+// sess.commit();
+//
+// conn.start();
+//
+// //Now consume half of the messages but don't commit them these will end up in
+// //client side resource manager
+//
+// for (int i = 0; i < NUM_MESSAGES / 2; i++)
+// {
+// TextMessage tm = (TextMessage)cons.receive(500);
+//
+// assertNotNull(tm);
+//
+// assertEquals("message:" + i, tm.getText());
+// }
+//
+// //So now, messages should be in queue1 on server 1
+// //So we now kill server 1
+// //Which should cause transparent failover of connection conn onto server 1
+//
+// log.info("************ KILLING (CRASHING) SERVER 1");
+//
+// ServerManagement.getServer(1).kill();
+//
+// log.info("killed server, now waiting");
+//
+// Thread.sleep(5000);
+//
+// log.info("done wait");
+//
+// state = (ConnectionState)del.getState();
+//
+// int finalServerID = state.getServerID();
+//
+// log.info("final server id= " + finalServerID);
+//
+// //server id should now be 2
+//
+// assertEquals(2, finalServerID);
+//
+// conn.start();
+//
+// //Now should be able to consume the rest of the messages
+//
+// log.info("here1");
+//
+// TextMessage tm = null;
+//
+// for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+// {
+// tm = (TextMessage)cons.receive(500);
+//
+// log.info("message is " + tm.getText());
+//
+// assertNotNull(tm);
+//
+// assertEquals("message:" + i, tm.getText());
+// }
+//
+// log.info("here2");
+//
+// //Now should be able to commit them
+//
+// sess.commit();
+//
+// //Now check there are no more messages there
+// sess.close();
+//
+// sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// cons = sess.createConsumer(queue1);
+//
+// Message m = cons.receive(500);
+//
+// assertNull(m);
+//
+// log.info("got to end of test");
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// try
+// {
+// conn.close();
+// }
+// catch (Exception e)
+// {
+// e.printStackTrace();
+// }
+// }
+// }
+//
+// }
+
+
+
+
// public void testEvenSimplerFailover() throws Exception
// {
// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
Deleted: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java 2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java 2006-12-12 10:49:42 UTC (rev 1770)
@@ -1,1025 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms.clustering;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
-
-/**
- *
- * A ManualClusteringTest
- *
- * Nodes must be started up in order node1, node2, node3
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class ManualClusteringTest extends ClusteringTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ManualClusteringTest(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
- {
- clusteredQueueLocalConsumer(false);
- }
-
- public void testClusteredQueueLocalConsumerPersistent() throws Exception
- {
- clusteredQueueLocalConsumer(true);
- }
-
- public void testClusteredTopicNonDurableNonPersistent() throws Exception
- {
- clusteredTopicNonDurable(false);
- }
-
- public void testClusteredTopicNonDurablePersistent() throws Exception
- {
- clusteredTopicNonDurable(true);
- }
-
- public void testClusteredTopicNonDurableWithSelectorsNonPersistent() throws Exception
- {
- clusteredTopicNonDurableWithSelectors(false);
- }
-
- public void testClusteredTopicNonDurableWithSelectorsPersistent() throws Exception
- {
- clusteredTopicNonDurableWithSelectors(true);
- }
-
- public void testClusteredTopicDurableNonPersistent() throws Exception
- {
- clusteredTopicDurable(false);
- }
-
- public void testClusteredTopicDurablePersistent() throws Exception
- {
- clusteredTopicDurable(true);
- }
-
- public void testClusteredTopicSharedDurableLocalConsumerNonPersistent() throws Exception
- {
- clusteredTopicSharedDurableLocalConsumer(false);
- }
-
- public void testClusteredTopicSharedDurableLocalConsumerPersistent() throws Exception
- {
- clusteredTopicSharedDurableLocalConsumer(true);
- }
-
- public void testClusteredTopicSharedDurableNoLocalSubNonPersistent() throws Exception
- {
- clusteredTopicSharedDurableNoLocalSub(false);
- }
-
- public void testClusteredTopicSharedDurableNoLocalSubPersistent() throws Exception
- {
- clusteredTopicSharedDurableNoLocalSub(true);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
-
- /*
- * Create a consumer on each queue on each node.
- * Send messages in turn from all nodes.
- * Ensure that the local consumer gets the message
- */
- protected void clusteredQueueLocalConsumer(boolean persistent) throws Exception
- {
- Connection conn1 = null;
- Connection conn2 = null;
- Connection conn3 = null;
-
- try
- {
- //This will create 3 different connection on 3 different nodes, since
- //the cf is clustered
- conn1 = cf.createConnection();
- conn2 = cf.createConnection();
- conn3 = cf.createConnection();
-
- log.info("Created connections");
-
- checkConnectionsDifferentServers(conn1, conn2, conn3);
-
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- log.info("Created sessions");
-
- MessageConsumer cons1 = sess1.createConsumer(queue0);
- MessageConsumer cons2 = sess2.createConsumer(queue1);
- MessageConsumer cons3 = sess3.createConsumer(queue2);
-
- log.info("Created consumers");
-
- conn1.start();
- conn2.start();
- conn3.start();
-
- // Send at node 0
-
- MessageProducer prod = sess1.createProducer(queue0);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- final int NUM_MESSAGES = 100;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess1.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- log.info("Sent messages");
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons1.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- Message m = cons2.receive(2000);
-
- assertNull(m);
-
- m = cons3.receive(2000);
-
- assertNull(m);
-
- // Send at node 1
-
- MessageProducer prod1 = sess2.createProducer(queue1);
-
- prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess2.createTextMessage("message" + i);
-
- prod1.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- m = cons1.receive(2000);
-
- assertNull(m);
-
- m = cons3.receive(2000);
-
- assertNull(m);
-
- // Send at node 2
-
- MessageProducer prod2 = sess3.createProducer(queue2);
-
- prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess3.createTextMessage("message" + i);
-
- prod2.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- m = cons1.receive(2000);
-
- assertNull(m);
-
- m = cons2.receive(2000);
-
- assertNull(m);
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
-
- if (conn3 != null)
- {
- conn3.close();
- }
- }
- }
-
- // Private -------------------------------------------------------
-
- /*
- * Create non durable subscriptions on all nodes of the cluster.
- * Ensure all messages are receive as appropriate
- */
- private void clusteredTopicNonDurable(boolean persistent) throws Exception
- {
- Connection conn1 = null;
- Connection conn2 = null;
- Connection conn3 = null;
- try
- {
- //This will create 3 different connection on 3 different nodes, since
- //the cf is clustered
- conn1 = cf.createConnection();
- conn2 = cf.createConnection();
- conn3 = cf.createConnection();
-
- log.info("Created connections");
-
- checkConnectionsDifferentServers(conn1, conn2, conn3);
-
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons1 = sess1.createConsumer(topic0);
- MessageConsumer cons2 = sess2.createConsumer(topic1);
- MessageConsumer cons3 = sess3.createConsumer(topic2);
-
- MessageConsumer cons4 = sess1.createConsumer(topic0);
-
- MessageConsumer cons5 = sess2.createConsumer(topic1);
-
- conn1.start();
- conn2.start();
- conn3.start();
-
- // Send at node 0
-
- MessageProducer prod = sess1.createProducer(topic0);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- final int NUM_MESSAGES = 100;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess1.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons1.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons4.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons5.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
-
- if (conn3 != null)
- {
- conn3.close();
- }
- }
- }
-
- /*
- * Create non durable subscriptions on all nodes of the cluster.
- * Include some with selectors
- * Ensure all messages are receive as appropriate
- */
- private void clusteredTopicNonDurableWithSelectors(boolean persistent) throws Exception
- {
- Connection conn1 = null;
- Connection conn2 = null;
- Connection conn3 = null;
-
- try
- {
- //This will create 3 different connection on 3 different nodes, since
- //the cf is clustered
- conn1 = cf.createConnection();
- conn2 = cf.createConnection();
- conn3 = cf.createConnection();
-
- log.info("Created connections");
-
- checkConnectionsDifferentServers(conn1, conn2, conn3);
-
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons1 = sess1.createConsumer(topic0);
- MessageConsumer cons2 = sess2.createConsumer(topic1);
- MessageConsumer cons3 = sess3.createConsumer(topic2);
-
- MessageConsumer cons4 = sess1.createConsumer(topic0, "COLOUR='red'");
-
- MessageConsumer cons5 = sess2.createConsumer(topic1, "COLOUR='blue'");
-
- conn1.start();
- conn2.start();
- conn3.start();
-
- // Send at node 0
-
- MessageProducer prod = sess1.createProducer(topic0);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- final int NUM_MESSAGES = 100;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess1.createTextMessage("message" + i);
-
- int c = i % 3;
- if (c == 0)
- {
- tm.setStringProperty("COLOUR", "red");
- }
- else if (c == 1)
- {
- tm.setStringProperty("COLOUR", "blue");
- }
-
- prod.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons1.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- int c = i % 3;
-
- if (c == 0)
- {
- TextMessage tm = (TextMessage)cons4.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- int c = i % 3;
-
- if (c == 1)
- {
- TextMessage tm = (TextMessage)cons5.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
- }
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
-
- if (conn3 != null)
- {
- conn3.close();
- }
- }
- }
-
-
-
- /*
- * Create durable subscriptions on all nodes of the cluster.
- * Include a couple with selectors
- * Ensure all messages are receive as appropriate
- * None of the durable subs are shared
- */
- private void clusteredTopicDurable(boolean persistent) throws Exception
- {
- Connection conn1 = null;
- Connection conn2 = null;
- Connection conn3 = null;
- try
- {
- //This will create 3 different connection on 3 different nodes, since
- //the cf is clustered
- conn1 = cf.createConnection();
- conn2 = cf.createConnection();
- conn3 = cf.createConnection();
-
- log.info("Created connections");
-
- checkConnectionsDifferentServers(conn1, conn2, conn3);
-
- conn1.setClientID("wib1");
- conn2.setClientID("wib1");
- conn3.setClientID("wib1");
-
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- try
- {
- sess1.unsubscribe("sub");
- }
- catch (Exception ignore) {}
- try
- {
- sess2.unsubscribe("sub1");
- }
- catch (Exception ignore) {}
- try
- {
- sess3.unsubscribe("sub2");
- }
- catch (Exception ignore) {}
- try
- {
- sess1.unsubscribe("sub3");
- }
- catch (Exception ignore) {}
- try
- {
- sess2.unsubscribe("sub4");
- }
- catch (Exception ignore) {}
-
- MessageConsumer cons1 = sess1.createDurableSubscriber(topic0, "sub");
- MessageConsumer cons2 = sess2.createDurableSubscriber(topic1, "sub1");
- MessageConsumer cons3 = sess3.createDurableSubscriber(topic2, "sub2");
- MessageConsumer cons4 = sess1.createDurableSubscriber(topic0, "sub3");
- MessageConsumer cons5 = sess2.createDurableSubscriber(topic1, "sub4");
-
- conn1.start();
- conn2.start();
- conn3.start();
-
- // Send at node 0
-
- MessageProducer prod = sess1.createProducer(topic0);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- final int NUM_MESSAGES = 100;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess2.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons1.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons4.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons5.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- cons1.close();
- cons2.close();
- cons3.close();
- cons4.close();
- cons5.close();
-
- sess1.unsubscribe("sub");
- sess2.unsubscribe("sub1");
- sess3.unsubscribe("sub2");
- sess1.unsubscribe("sub3");
- sess2.unsubscribe("sub4");
-
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
-
- if (conn3 != null)
- {
- conn3.close();
- }
- }
- }
-
-
-
-
- /*
- * Create shared durable subs on multiple nodes, the local instance should always get the message
- */
- protected void clusteredTopicSharedDurableLocalConsumer(boolean persistent) throws Exception
- {
- Connection conn1 = null;
- Connection conn2 = null;
- Connection conn3 = null;
- try
-
- {
- //This will create 3 different connection on 3 different nodes, since
- //the cf is clustered
- conn1 = cf.createConnection();
- conn2 = cf.createConnection();
- conn3 = cf.createConnection();
-
- log.info("Created connections");
-
- checkConnectionsDifferentServers(conn1, conn2, conn3);
- conn1.setClientID("wib1");
- conn2.setClientID("wib1");
- conn3.setClientID("wib1");
-
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- try
- {
- sess1.unsubscribe("sub");
- }
- catch (Exception ignore) {}
- try
- {
- sess2.unsubscribe("sub");
- }
- catch (Exception ignore) {}
- try
- {
- sess3.unsubscribe("sub");
- }
- catch (Exception ignore) {}
-
- MessageConsumer cons1 = sess1.createDurableSubscriber(topic0, "sub");
- MessageConsumer cons2 = sess2.createDurableSubscriber(topic1, "sub");
- MessageConsumer cons3 = sess3.createDurableSubscriber(topic2, "sub");
-
- conn1.start();
- conn2.start();
- conn3.start();
-
- // Send at node 0
-
- MessageProducer prod = sess1.createProducer(topic0);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- final int NUM_MESSAGES = 100;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess1.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons1.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- Message m = cons2.receive(2000);
-
- assertNull(m);
-
- m = cons3.receive(2000);
-
- assertNull(m);
-
- // Send at node 1
-
- MessageProducer prod1 = sess2.createProducer(topic1);
-
- prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess3.createTextMessage("message" + i);
-
- prod1.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- m = cons1.receive(2000);
-
- assertNull(m);
-
- m = cons3.receive(2000);
-
- assertNull(m);
-
- // Send at node 2
-
- MessageProducer prod2 = sess3.createProducer(topic2);
-
- prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess3.createTextMessage("message" + i);
-
- prod2.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- m = cons1.receive(2000);
-
- assertNull(m);
-
- m = cons2.receive(2000);
-
- assertNull(m);
-
- cons1.close();
- cons2.close();
- cons3.close();
-
- // Need to unsubscribe on any node that the durable sub was created on
-
- sess1.unsubscribe("sub");
- sess2.unsubscribe("sub");
- sess3.unsubscribe("sub");
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
-
- if (conn3 != null)
- {
- conn3.close();
- }
- }
- }
-
-
-
- /*
- * Create shared durable subs on multiple nodes, but without sub on local node
- * should round robin
- * note that this test assumes round robin
- */
- protected void clusteredTopicSharedDurableNoLocalSub(boolean persistent) throws Exception
- {
- Connection conn1 = null;
- Connection conn2 = null;
- Connection conn3 = null;
-
- try
- {
- //This will create 3 different connection on 3 different nodes, since
- //the cf is clustered
- conn1 = cf.createConnection();
- conn2 = cf.createConnection();
- conn3 = cf.createConnection();
-
- log.info("Created connections");
-
- checkConnectionsDifferentServers(conn1, conn2, conn3);
-
- conn2.setClientID("wib1");
- conn3.setClientID("wib1");
-
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- try
- {
- sess2.unsubscribe("sub");
- }
- catch (Exception ignore) {}
- try
- {
- sess3.unsubscribe("sub");
- }
- catch (Exception ignore) {}
-
- MessageConsumer cons1 = sess2.createDurableSubscriber(topic1, "sub");
- MessageConsumer cons2 = sess3.createDurableSubscriber(topic2, "sub");
-
- conn2.start();
- conn3.start();
-
- // Send at node 0
-
- //Should round robin between the other 2 since there is no active consumer on sub on node 0
-
- MessageProducer prod = sess1.createProducer(topic0);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- final int NUM_MESSAGES = 100;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess1.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES / 2; i++)
- {
- TextMessage tm = (TextMessage)cons1.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i * 2, tm.getText());
- }
-
- for (int i = 0; i < NUM_MESSAGES / 2; i++)
- {
- TextMessage tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + (i * 2 + 1), tm.getText());
- }
-
- cons1.close();
- cons2.close();
-
- sess2.unsubscribe("sub");
- sess3.unsubscribe("sub");
-
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
-
- if (conn3 != null)
- {
- conn3.close();
- }
- }
- }
-
- class MyListener implements MessageListener
- {
- private int i;
-
- MyListener(int i)
- {
- this.i = i;
- }
-
- public void onMessage(Message m)
- {
- try
- {
- int count = m.getIntProperty("count");
-
- log.info("Listener " + i + " received message " + count);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- }
-
-
- // Inner classes -------------------------------------------------
-
-
-
-}
More information about the jboss-cvs-commits
mailing list