[jboss-cvs] JBoss Messaging SVN: r1377 - in trunk: src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Sep 28 16:28:49 EDT 2006
Author: timfox
Date: 2006-09-28 16:28:32 -0400 (Thu, 28 Sep 2006)
New Revision: 1377
Added:
trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java
Removed:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CheckRequest.java
Modified:
trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
trunk/tests/build.xml
Log:
More tweaks, changes to recovery
Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2006-09-28 20:28:32 UTC (rev 1377)
@@ -76,7 +76,9 @@
<attribute name="StateTimeout">5000</attribute>
<attribute name="CastTimeout">5000</attribute>
<attribute name="PullSize">1</attribute>
- <attribute name="StatsSendPeriod">1000</attribute>
+ <attribute name="StatsSendPeriod">10000</attribute>
+ <attribute name="MessagePullPolicy">org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy</attribute>
+ <attribute name="ClusterRouterFactory">org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory</attribute>
<attribute name="AsyncChannelConfig">
<config>
@@ -139,7 +141,9 @@
<attribute name="StateTimeout">5000</attribute>
<attribute name="CastTimeout">5000</attribute>
<attribute name="PullSize">1</attribute>
- <attribute name="StatsSendPeriod">1000</attribute>
+ <attribute name="StatsSendPeriod">10000</attribute>
+ <attribute name="MessagePullPolicy">org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy</attribute>
+ <attribute name="ClusterRouterFactory">org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory</attribute>
<attribute name="AsyncChannelConfig">
<config>
Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-09-28 20:28:32 UTC (rev 1377)
@@ -59,6 +59,18 @@
<type>java.lang.String</type>
</attribute>
+ <attribute access="read-write" getMethod="getMessagePullPolicy" setMethod="setMessagePullPolicy">
+ <description>The fully qualified class name of the class to use to implement the message pull policy for rebalancing</description>
+ <name>MessagePullPolicy</name>
+ <type>java.lang.String</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getClusterRouterFactory" setMethod="setClusterRouterFactory">
+ <description>The fully qualified class name of the class to use to implement the factory that generates cluster routers</description>
+ <name>ClusterRouterFactory</name>
+ <type>java.lang.String</type>
+ </attribute>
+
<attribute access="read-write" getMethod="getStateTimeout" setMethod="setStateTimeout">
<description>Timeout for getState()</description>
<name>StateTimeout</name>
Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -157,8 +157,6 @@
{
if (trace) { log.trace("acknowledging NON-transactionally"); }
- log.info("****************ACKNOWLEDGING");
-
List acks = state.getToAck();
AckInfo ack = (AckInfo)acks.get(0);
sd.acknowledge(ack);
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -240,8 +240,6 @@
{
if (trace) { log.trace(this + " receiving " + msgs.size() + " message(s) from the remoting layer"); }
- log.info(this + " receiving " + msgs.size() + " message(s) from the remoting layer");
-
synchronized (mainLock)
{
if (closed)
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -30,9 +30,6 @@
import javax.management.Attribute;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
import org.jboss.aop.AspectXmlLoader;
import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -206,8 +206,6 @@
{
if (trace) { log.trace(this + " receives " + ref + " for delivery"); }
- log.info(this + " receives " + ref + " for delivery");
-
// This is ok to have outside lock - is volatile
if (bufferFull)
{
@@ -512,8 +510,6 @@
// acknowledge a delivery
Delivery d;
- log.info("acknowledging: " + messageID);
-
synchronized (lock)
{
d = (Delivery)deliveries.remove(new Long(messageID));
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -146,12 +146,9 @@
{
if (!active)
{
- log.info("Not active - ignoring ref");
return null;
}
- log.info("handling ref");
-
checkClosed();
Future result = new Future();
@@ -185,8 +182,6 @@
{
if (trace) { log.trace("acknowledging " + d + (tx == null ? " non-transactionally" : " transactionally in " + tx)); }
- log.info("acknowledging " + d);
-
this.acknowledgeInternal(d, tx, true, false);
}
@@ -527,9 +522,7 @@
* @see org.jboss.messaging.core.Channel#deliver()
*/
protected void deliverInternal(boolean handle) throws Throwable
- {
- log.info("in deliver internal");
-
+ {
try
{
// The iterator is used to iterate through the refs in the channel in the case that they
@@ -591,8 +584,6 @@
Delivery del = router.handle(this, ref, null);
- log.info("router returned delivery " + del);
-
if (del == null)
{
// No receiver, broken receiver or full receiver so we stop delivering; also
Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -34,8 +34,6 @@
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultMessagePullPolicy;
-import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.w3c.dom.Element;
@@ -75,6 +73,10 @@
private long statsSendPeriod = 1000;
+ private String clusterRouterFactory;
+
+ private String messagePullPolicy;
+
// Constructors --------------------------------------------------------
public ClusteredPostOfficeService()
@@ -190,6 +192,26 @@
return statsSendPeriod;
}
+ public String getClusterRouterFactory()
+ {
+ return clusterRouterFactory;
+ }
+
+ public String getMessagePullPolicy()
+ {
+ return messagePullPolicy;
+ }
+
+ public void setClusterRouterFactory(String clusterRouterFactory)
+ {
+ this.clusterRouterFactory = clusterRouterFactory;
+ }
+
+ public void setMessagePullPolicy(String messagePullPolicy)
+ {
+ this.messagePullPolicy = messagePullPolicy;
+ }
+
// ServiceMBeanSupport overrides ---------------------------------
protected synchronized void startService() throws Exception
@@ -217,12 +239,16 @@
int nodeId = serverPeer.getServerPeerID();
- MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
+ Class clazz = Class.forName(messagePullPolicy);
+ MessagePullPolicy pullPolicy = (MessagePullPolicy)clazz.newInstance();
+
+ clazz = Class.forName(clusterRouterFactory);
+
+ ClusterRouterFactory rf = (ClusterRouterFactory)clazz.newInstance();
+
FilterFactory ff = new SelectorFactory();
-
- ClusterRouterFactory rf = new DefaultRouterFactory();
-
+
postOffice = new DefaultClusteredPostOffice(ds, tm, sqlProperties, createTablesOnStartup,
nodeId, officeName, ms,
pm, tr, ff, pool,
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -81,7 +81,5 @@
*/
boolean route(MessageReference ref, String condition, Transaction tx) throws Exception;
- void recover() throws Exception;
-
boolean isLocal();
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -374,8 +374,6 @@
{
lock.writeLock().acquire();
- log.info("loading bindings");
-
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CheckRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CheckRequest.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CheckRequest.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -1,73 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-
-/**
- * A CheckMessage
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-class CheckRequest extends ClusterRequest
-{
- private int nodeId;
-
- static final int TYPE = 2;
-
- CheckRequest()
- {
- }
-
- CheckRequest(int nodeId)
- {
- this.nodeId = nodeId;
- }
-
- Object execute(PostOfficeInternal office) throws Throwable
- {
- office.check(nodeId);
- return null;
- }
-
- byte getType()
- {
- return TYPE;
- }
-
- public void read(DataInputStream in) throws IOException
- {
- nodeId = in.readInt();
- }
-
- public void write(DataOutputStream out) throws IOException
- {
- out.writeInt(nodeId);
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -54,11 +54,6 @@
request = new BindRequest();
break;
}
- case CheckRequest.TYPE:
- {
- request = new CheckRequest();
- break;
- }
case MessageRequest.TYPE:
{
request = new MessageRequest();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -334,15 +334,6 @@
return binding;
}
- /*
- * This is called by the server peer if it determines that the server crashed last time it was run
- */
- public void recover() throws Exception
- {
- //We send a "check" message to all nodes of the cluster
- asyncSendRequest(new CheckRequest(nodeId));
- }
-
public boolean route(MessageReference ref, String condition, Transaction tx) throws Exception
{
if (ref == null)
@@ -752,8 +743,10 @@
/**
* Check for any transactions that need to be committed or rolled back
*/
- public void check(int nodeId) throws Throwable
+ public void check(Integer nodeId) throws Throwable
{
+ lock.readLock().acquire();
+
synchronized (holdingArea)
{
Iterator iter = holdingArea.entrySet().iterator();
@@ -766,7 +759,7 @@
TransactionId id = (TransactionId)entry.getKey();
- if (id.getNodeId() == nodeId)
+ if (id.getNodeId() == nodeId.intValue())
{
ClusterTransaction tx = (ClusterTransaction)iter.next();
@@ -826,9 +819,7 @@
if (q.isActive())
{
QueueStats stats = q.getStats();
-
- //log.info(q.getNodeId() + " queue " + stats.getQueueName() + " count " + stats.getMessageCount());
-
+
//We don't bother sending the stats if there's no significant change in the values
if (q.changedSignificantly())
@@ -840,10 +831,6 @@
statsList.add(stats);
}
- else
- {
- //log.info("Not changed significantly");
- }
}
}
}
@@ -1113,11 +1100,9 @@
}
-
- private void removeBindingsForAddress(Address address) throws Exception
+ private Integer getNodeIdForAddress(Address address) throws Exception
{
- lock.writeLock().acquire();
-
+ lock.readLock().acquire();
try
{
Iterator iter = nodeIdAddressMap.entrySet().iterator();
@@ -1134,19 +1119,27 @@
nodeId = (Integer)entry.getKey();
}
}
-
- if (nodeId == null)
- {
- throw new IllegalStateException("Cannot find node id for address: " + address);
- }
-
+ return nodeId;
+ }
+ finally
+ {
+ lock.readLock().release();
+ }
+ }
+
+ private void removeBindingsForAddress(Integer nodeId) throws Exception
+ {
+ lock.writeLock().acquire();
+
+ try
+ {
Map nameMap = (Map)nameMaps.get(nodeId);
if (nameMap != null)
{
List toRemove = new ArrayList();
- iter = nameMap.values().iterator();
+ Iterator iter = nameMap.values().iterator();
while (iter.hasNext())
{
@@ -1395,9 +1388,20 @@
{
try
{
- removeBindingsForAddress(address);
+ Integer nodeId = getNodeIdForAddress(address);
+
+ if (nodeId == null)
+ {
+ throw new IllegalStateException("Cannot find node id for address: " + address);
+ }
+
+ //Perform a check - the member might have crashed and left uncommitted transactions
+ //we need to resolve this
+ check(nodeId);
+
+ removeBindingsForAddress(nodeId);
}
- catch (Exception e)
+ catch (Throwable e)
{
log.error("Caught Exception in MembershipListener", e);
IllegalStateException e2 = new IllegalStateException(e.getMessage());
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -221,6 +221,8 @@
protected void deliverInternal(boolean handle) throws Throwable
{
+ log.info("in local clustered queue deliver internal");
+
int beforeSize = -1;
if (!handle)
@@ -234,6 +236,8 @@
{
int afterSize = messageRefs.size();
+ log.info("receiversready:" + receiversReady + " before size:" + beforeSize + " afterSize: " + afterSize);
+
if (receiversReady && beforeSize == 0 && afterSize == 0)
{
//Delivery has been prompted (not from handle call)
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -67,8 +67,6 @@
void commitTransaction(TransactionId id) throws Throwable;
- void check(int nodeId) throws Throwable;
-
void updateQueueStats(int nodeId, List stats) throws Exception;
void sendQueueStats() throws Exception;
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/tests/build.xml 2006-09-28 20:28:32 UTC (rev 1377)
@@ -338,7 +338,7 @@
haltonerror="${junit.batchtest.haltonerror}">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
<fileset dir="${build.tests.classes}">
- <include name="**/messaging/core/**/*Test.class"/>
+ <!-- <include name="**/messaging/core/**/*Test.class"/> -->
<include name="**/messaging/jms/**/*Test.class"/>
<exclude name="**/jms/stress/**"/>
<exclude name="**/jms/crash/*Test.class"/>
Added: trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java 2006-09-28 16:54:19 UTC (rev 1376)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java 2006-09-28 20:28:32 UTC (rev 1377)
@@ -0,0 +1,955 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+import org.jboss.test.messaging.MessagingTestCase;
+
+/**
+ *
+ * A ManualClusteringTest
+ *
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class ManualClusteringTest extends MessagingTestCase
+{
+ protected Context ic1;
+
+ protected Context ic2;
+
+ protected Context ic3;
+
+ protected Queue queue1;
+
+ protected Topic topic1;
+
+ protected Queue queue2;
+
+ protected Topic topic2;
+
+ protected Queue queue3;
+
+ protected Topic topic3;
+
+ protected ConnectionFactory cf1;
+
+ protected ConnectionFactory cf2;
+
+ protected ConnectionFactory cf3;
+
+ public ManualClusteringTest(String name)
+ {
+ super(name);
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Properties props1 = new Properties();
+
+ props1.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+ props1.put(Context.PROVIDER_URL, "jnp://localhost:1199");
+ props1.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
+
+ ic1 = new InitialContext(props1);
+
+ Properties props2 = new Properties();
+
+ props2.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+ props2.put(Context.PROVIDER_URL, "jnp://localhost:1299");
+ props2.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
+
+ ic2 = new InitialContext(props2);
+
+// Properties props3 = new Properties();
+//
+// props3.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+// props3.put(Context.PROVIDER_URL, "jnp://localhost:1399");
+// props3.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
+//
+// ic3 = new InitialContext(props3);
+
+ queue1 = (Queue)ic1.lookup("queue/ClusteredQueue1");
+
+ queue2 = (Queue)ic2.lookup("queue/ClusteredQueue1");
+
+ //queue3 = (Queue)ic3.lookup("queue/ClusteredQueue1");
+
+ topic1 = (Topic)ic1.lookup("topic/ClusteredTopic1");
+
+ topic2 = (Topic)ic2.lookup("topic/ClusteredTopic1");
+
+ //topic3 = (Topic)ic3.lookup("topic/ClusteredTopic1");
+
+ cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
+
+ //cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ ic1.close();
+
+ ic2.close();
+ }
+
+ /*
+ * Each node had consumers, send message at node, make sure local consumer gets message
+ */
+ public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
+ {
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+ try
+ {
+ conn1 = cf1.createConnection();
+
+ conn2 = cf2.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue2);
+
+ conn1.start();
+
+ conn2.start();
+
+ MessageProducer prod1 = sess1.createProducer(queue1);
+
+ prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ log.info("sent messages");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ log.info("i is " + i);
+
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("Got message:" + tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ Message m = cons2.receive(2000);
+
+ assertNull(m);
+ }
+ finally
+ {
+ try
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+ }
+ catch (Exception ignore)
+ {
+
+ }
+ }
+ }
+
+ public void testClusteredQueueLocalConsumerPersistent() throws Exception
+ {
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+ try
+ {
+ conn1 = cf1.createConnection();
+
+ conn2 = cf2.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue2);
+
+ conn1.start();
+
+ conn2.start();
+
+ MessageProducer prod1 = sess1.createProducer(queue1);
+
+ prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ log.info("sent messages");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ log.info("i is " + i);
+
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("Got message:" + tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ Message m = cons2.receive(2000);
+
+ assertNull(m);
+ }
+ finally
+ {
+ try
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+ }
+ catch (Exception ignore)
+ {
+
+ }
+ }
+ }
+
+// /*
+// * No consumer on local node, send message at node, make sure remote consumer gets messages
+// */
+// public void testClusteredQueueNoLocalConsumerNonPersistent() throws Exception
+// {
+// Connection conn1 = null;
+//
+// Connection conn2 = null;
+// try
+// {
+// conn1 = cf1.createConnection();
+//
+// conn2 = cf2.createConnection();
+//
+// Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// MessageConsumer cons2 = sess2.createConsumer(queue2);
+//
+// conn1.start();
+//
+// conn2.start();
+//
+// MessageProducer prod1 = sess1.createProducer(queue1);
+//
+// prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+//
+// final int NUM_MESSAGES = 100;
+//
+// for (int i = 0; i < NUM_MESSAGES; i++)
+// {
+// TextMessage tm = sess1.createTextMessage("message" + i);
+//
+// prod1.send(tm);
+// }
+//
+// log.info("sent messages");
+//
+// for (int i = 0; i < NUM_MESSAGES; i++)
+// {
+// log.info("i is " + i);
+//
+// TextMessage tm = (TextMessage)cons2.receive(10000);
+//
+// assertNotNull(tm);
+//
+// log.info("Got message:" + tm);
+//
+// assertEquals("message" + i, tm.getText());
+// }
+//
+// }
+// finally
+// {
+// try
+// {
+// if (conn1 != null) conn1.close();
+//
+// if (conn2 != null) conn2.close();
+// }
+// catch (Exception ignore)
+// {
+//
+// }
+// }
+// }
+//
+//
+//
+// public void testClusteredQueueNoLocalConsumerPersistent() throws Exception
+// {
+// Connection conn1 = null;
+//
+// Connection conn2 = null;
+// try
+// {
+// conn1 = cf1.createConnection();
+//
+// conn2 = cf2.createConnection();
+//
+// Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// MessageConsumer cons2 = sess2.createConsumer(queue2);
+//
+// conn1.start();
+//
+// conn2.start();
+//
+// MessageProducer prod1 = sess1.createProducer(queue1);
+//
+// prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+//
+// final int NUM_MESSAGES = 100;
+//
+// for (int i = 0; i < NUM_MESSAGES; i++)
+// {
+// TextMessage tm = sess1.createTextMessage("message" + i);
+//
+// prod1.send(tm);
+// }
+//
+// log.info("sent messages");
+//
+// for (int i = 0; i < NUM_MESSAGES; i++)
+// {
+// log.info("i is " + i);
+//
+// TextMessage tm = (TextMessage)cons2.receive(1000);
+//
+// assertNotNull(tm);
+//
+// log.info("Got message:" + tm);
+//
+// assertEquals("message" + i, tm.getText());
+// }
+//
+// }
+// finally
+// {
+// try
+// {
+// if (conn1 != null) conn1.close();
+//
+// if (conn2 != null) conn2.close();
+// }
+// catch (Exception ignore)
+// {
+//
+// }
+// }
+// }
+//
+
+ public void testClusteredTopicNonDurableNonPersistent() throws Exception
+ {
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+ try
+ {
+ conn1 = cf1.createConnection();
+
+ conn2 = cf2.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(topic1);
+
+ MessageConsumer cons2 = sess2.createConsumer(topic2);
+
+ conn1.start();
+
+ conn2.start();
+
+ MessageProducer prod1 = sess1.createProducer(topic1);
+
+ prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ log.info("sent messages");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ log.info("i is " + i);
+
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("Got message:" + tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ log.info("i is " + i);
+
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("Got message:" + tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+
+ }
+ finally
+ {
+ try
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+ }
+ catch (Exception ignore)
+ {
+
+ }
+ }
+ }
+
+
+ public void testClusteredTopicNonDurablePersistent() throws Exception
+ {
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+ try
+ {
+ conn1 = cf1.createConnection();
+
+ conn2 = cf2.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(topic1);
+
+ MessageConsumer cons2 = sess2.createConsumer(topic2);
+
+ conn1.start();
+
+ conn2.start();
+
+ MessageProducer prod1 = sess1.createProducer(topic1);
+
+ prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ log.info("sent messages");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ log.info("i is " + i);
+
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("Got message:" + tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ log.info("i is " + i);
+
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("Got message:" + tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+
+ }
+ finally
+ {
+ try
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+ }
+ catch (Exception ignore)
+ {
+
+ }
+ }
+ }
+
+
+ public void testClusteredTopicDurableNonPersistentLocal() throws Exception
+ {
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+ try
+ {
+ conn1 = cf1.createConnection();
+
+ conn1.setClientID("id1");
+
+ conn2 = cf2.createConnection();
+
+ conn2.setClientID("id1");
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ try
+ {
+ sess1.unsubscribe("sub1");
+
+ sess2.unsubscribe("sub1");
+ }
+ catch (Exception ignore)
+ {
+ }
+
+ MessageConsumer durable1 = sess1.createDurableSubscriber(topic1, "sub1");
+
+ MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
+
+ conn1.start();
+
+ conn2.start();
+
+ MessageProducer prod1 = sess1.createProducer(topic1);
+
+ prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ log.info("sent messages");
+
+ //All the messages should be on the local sub
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ log.info("i is " + i);
+
+ TextMessage tm = (TextMessage)durable1.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("Got message:" + tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ Message m = durable2.receive(2000);
+
+ assertNull(m);
+
+
+ sess1.unsubscribe("sub1");
+
+ sess2.unsubscribe("sub1");
+
+ }
+ finally
+ {
+ try
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+ }
+ catch (Exception ignore)
+ {
+
+ }
+ }
+ }
+
+ public void testClusteredTopicDurablePersistentLocal() throws Exception
+ {
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+ try
+ {
+ conn1 = cf1.createConnection();
+
+ conn1.setClientID("id1");
+
+ conn2 = cf2.createConnection();
+
+ conn2.setClientID("id1");
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ try
+ {
+ sess1.unsubscribe("sub1");
+
+ sess2.unsubscribe("sub1");
+ }
+ catch (Exception ignore)
+ {
+ }
+
+ MessageConsumer durable1 = sess1.createDurableSubscriber(topic1, "sub1");
+
+ MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
+
+ conn1.start();
+
+ conn2.start();
+
+ MessageProducer prod1 = sess1.createProducer(topic1);
+
+ prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ log.info("sent messages");
+
+ //All the messages should be on the local sub
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ log.info("i is " + i);
+
+ TextMessage tm = (TextMessage)durable1.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("Got message:" + tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ Message m = durable2.receive(2000);
+
+ assertNull(m);
+
+ sess1.unsubscribe("sub1");
+
+ sess2.unsubscribe("sub1");
+
+ }
+ finally
+ {
+ try
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+ }
+ catch (Exception ignore)
+ {
+
+ }
+ }
+ }
+
+
+ public void testClusteredTopicDurableNonPersistentNotLocal() throws Exception
+ {
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+ try
+ {
+ conn1 = cf1.createConnection();
+
+ conn1.setClientID("id1");
+
+ conn2 = cf2.createConnection();
+
+ conn2.setClientID("id1");
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ try
+ {
+ sess2.unsubscribe("sub1");
+ }
+ catch (Exception ignore)
+ {
+ }
+
+ MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
+
+ conn1.start();
+
+ conn2.start();
+
+ MessageProducer prod1 = sess1.createProducer(topic1);
+
+ prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ log.info("sent messages");
+
+ //All the messages should be on the non local sub
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ log.info("i is " + i);
+
+ TextMessage tm = (TextMessage)durable2.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("Got message:" + tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ sess2.unsubscribe("sub1");
+
+ }
+ finally
+ {
+ try
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+ }
+ catch (Exception ignore)
+ {
+
+ }
+ }
+ }
+
+ public void testClusteredTopicDurablePersistentNotLocal() throws Exception
+ {
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+ try
+ {
+ conn1 = cf1.createConnection();
+
+ conn1.setClientID("id1");
+
+ conn2 = cf2.createConnection();
+
+ conn2.setClientID("id1");
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ try
+ {
+ sess2.unsubscribe("sub1");
+ }
+ catch (Exception ignore)
+ {
+ }
+
+ MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
+
+ conn1.start();
+
+ conn2.start();
+
+ MessageProducer prod1 = sess1.createProducer(topic1);
+
+ prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ log.info("sent messages");
+
+ //All the messages should be on the non local sub
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ log.info("i is " + i);
+
+ TextMessage tm = (TextMessage)durable2.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("Got message:" + tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ sess2.unsubscribe("sub1");
+
+ }
+ finally
+ {
+ try
+ {
+ if (conn1 != null) conn1.close();
+
+ if (conn2 != null) conn2.close();
+ }
+ catch (Exception ignore)
+ {
+
+ }
+ }
+ }
+
+
+ class MyListener implements MessageListener
+ {
+ private int i;
+
+ MyListener(int i)
+ {
+ this.i = i;
+ }
+
+ public void onMessage(Message m)
+ {
+ try
+ {
+ int count = m.getIntProperty("count");
+
+ log.info("Listener " + i + " received message " + count);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+}
More information about the jboss-cvs-commits
mailing list