[Jboss-cvs] JBoss Messaging SVN: r1353 - in trunk: src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/base tests/src/org/jboss/test/messaging/core/plugin/postoffice tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 22 15:47:17 EDT 2006
Author: timfox
Date: 2006-09-22 15:47:06 -0400 (Fri, 22 Sep 2006)
New Revision: 1353
Added:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Modified:
trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
Log:
More clustering debugging
Modified: trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml 2006-09-22 19:47:06 UTC (rev 1353)
@@ -60,6 +60,7 @@
<attribute name="StateTimeout">5000</attribute>
<attribute name="CastTimeout">5000</attribute>
<attribute name="PullSize">1</attribute>
+ <attribute name="StatsSendPeriod">1000</attribute>
<attribute name="SyncChannelConfig">
<UDP mcast_addr="228.8.8.8" mcast_port="45568"
ip_ttl="8" ip_mcast="true"
Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-09-22 19:47:06 UTC (rev 1353)
@@ -65,6 +65,12 @@
<type>int</type>
</attribute>
+ <attribute access="read-write" getMethod="getStatsSendPeriod" setMethod="setStatsSendPeriod">
+ <description>The period in milliseconds between a post office casting it's statistics across the cluster</description>
+ <name>StatsSendPeriod/name>
+ <type>long</type>
+ </attribute>
+
<attribute access="read-write" getMethod="getSyncChannelConfig" setMethod="setSyncChannelConfig">
<description>The JGroups stack configuration for the synchronous channel</description>
<name>SyncChannelConfig</name>
Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-09-22 19:47:06 UTC (rev 1353)
@@ -73,6 +73,8 @@
private int pullSize = 1;
+ private long statsSendPeriod = 1000;
+
// Constructors --------------------------------------------------------
public ClusteredPostOfficeService()
@@ -178,6 +180,16 @@
return pullSize;
}
+ public void setStatsSendPeriod(long period)
+ {
+ this.statsSendPeriod = period;
+ }
+
+ public long getStatsSendPeriod()
+ {
+ return statsSendPeriod;
+ }
+
// ServiceMBeanSupport overrides ---------------------------------
protected synchronized void startService() throws Exception
@@ -218,7 +230,8 @@
syncChannelConfig, asyncChannelConfig,
stateTimeout, castTimeout,
pullPolicy, rf,
- pullSize);
+ pullSize,
+ statsSendPeriod);
postOffice.start();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-22 19:47:06 UTC (rev 1353)
@@ -128,6 +128,10 @@
private int pullSize;
private Map routerMap;
+
+ private StatsSender statsSender;
+
+ private long statsSendPeriod;
public DefaultClusteredPostOffice()
{
@@ -157,11 +161,12 @@
long stateTimeout, long castTimeout,
MessagePullPolicy redistributionPolicy,
ClusterRouterFactory rf,
- int pullSize) throws Exception
+ int pullSize,
+ long statsSendPeriod) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
- rf, pullSize);
+ rf, pullSize, statsSendPeriod);
this.syncChannelConfigE = syncChannelConfig;
this.asyncChannelConfigE = asyncChannelConfig;
@@ -183,11 +188,12 @@
long stateTimeout, long castTimeout,
MessagePullPolicy redistributionPolicy,
ClusterRouterFactory rf,
- int pullSize) throws Exception
+ int pullSize,
+ long statsSendPeriod) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
- rf, pullSize);
+ rf, pullSize, statsSendPeriod);
this.syncChannelConfigS = syncChannelConfig;
this.asyncChannelConfigS = asyncChannelConfig;
@@ -204,7 +210,8 @@
long stateTimeout, long castTimeout,
MessagePullPolicy redistributionPolicy,
ClusterRouterFactory rf,
- int pullSize)
+ int pullSize,
+ long statsSendPeriod)
{
super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr, filterFactory,
pool);
@@ -223,8 +230,12 @@
this.pullSize = pullSize;
+ this.statsSendPeriod = statsSendPeriod;
+
routerMap = new HashMap();
+ statsSender = new StatsSender(this, statsSendPeriod);
+
init();
}
@@ -274,11 +285,15 @@
handleAddressNodeMapping(currentAddress, nodeId);
syncSendRequest(new SendNodeIdRequest(currentAddress, nodeId));
+
+ statsSender.start();
}
public void stop() throws Exception
{
super.stop();
+
+ statsSender.stop();
syncChannel.close();
@@ -1111,8 +1126,16 @@
ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), remoteQueue.getChannelID(),
localQueue.getName(), num);
- List msgs = (List)syncSendRequest(req, fromAddress);
+ byte[] bytes = (byte[])syncSendRequest(req, fromAddress);
+ PullMessagesResponse response = new PullMessagesResponse();
+
+ StreamUtils.fromBytes(response, bytes);
+
+ List msgs = response.getMessages();
+
+ log.info("I have " + msgs.size() + " messages");
+
Iterator iter = msgs.iterator();
while (iter.hasNext())
@@ -1155,9 +1178,12 @@
//and send a checkrequest
//This applies to a normal message and messages requests too
- req = new PullMessagesRequest(this.nodeId, tx.getId());
-
- asyncSendRequest(req, fromAddress);
+ if (!msgs.isEmpty())
+ {
+ req = new PullMessagesRequest(this.nodeId, tx.getId());
+
+ asyncSendRequest(req, fromAddress);
+ }
}
@@ -1544,7 +1570,9 @@
ClusterRequest request = readRequest(bytes);
- request.execute(DefaultClusteredPostOffice.this);
+ Object result = request.execute(DefaultClusteredPostOffice.this);
+
+ return result;
}
catch (Throwable e)
{
@@ -1552,8 +1580,7 @@
IllegalStateException e2 = new IllegalStateException(e.getMessage());
e2.setStackTrace(e.getStackTrace());
throw e2;
- }
- return null;
+ }
}
}
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java 2006-09-22 19:47:06 UTC (rev 1353)
@@ -54,13 +54,16 @@
{
QueueStats stats = queue.getStats();
- int cnt = stats.getMessageCount();
-
- if (cnt > maxMessages)
- {
- maxMessages = cnt;
+ if (stats != null)
+ {
+ int cnt = stats.getMessageCount();
- chosenQueue = queue;
+ if (cnt > maxMessages)
+ {
+ maxMessages = cnt;
+
+ chosenQueue = queue;
+ }
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-09-22 19:47:06 UTC (rev 1353)
@@ -27,7 +27,9 @@
import java.util.Iterator;
import java.util.List;
+import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.util.StreamUtils;
/**
* A PullMessagesRequest
@@ -40,6 +42,8 @@
*/
public class PullMessagesRequest extends TransactionRequest implements ClusterTransaction
{
+ private static final Logger log = Logger.getLogger(PullMessagesRequest.class);
+
private String queueName;
private int numMessages;
@@ -68,12 +72,16 @@
Object execute(PostOfficeInternal office) throws Throwable
{
+ log.info("********* executign pull messages requiest");
+
TransactionId id = new TransactionId(nodeId, txId);
if (hold)
{
List dels = office.getDeliveries(queueName, numMessages);
+ log.info("Got a list of " + dels.size() + " deliveries");
+
PullMessagesResponse response = new PullMessagesResponse(dels.size());
if (!dels.isEmpty())
@@ -107,7 +115,12 @@
office.holdTransaction(id, this);
}
- return response;
+ log.info("returning response:" + response);
+
+ //Convert to bytes since the response isn't serializable (nor do we want it to be)
+ byte[] bytes = StreamUtils.toBytes(response);
+
+ return bytes;
}
else
{
@@ -184,16 +197,26 @@
public void read(DataInputStream in) throws Exception
{
- queueName = in.readUTF();
+ super.read(in);
- numMessages = in.readInt();
+ if (hold)
+ {
+ queueName = in.readUTF();
+
+ numMessages = in.readInt();
+ }
}
public void write(DataOutputStream out) throws Exception
{
- out.writeUTF(queueName);
+ super.write(out);
- out.writeInt(numMessages);
+ if (hold)
+ {
+ out.writeUTF(queueName);
+
+ out.writeInt(numMessages);
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java 2006-09-22 19:47:06 UTC (rev 1353)
@@ -23,6 +23,7 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -40,10 +41,14 @@
* $Id$
*
*/
-public class PullMessagesResponse implements Streamable
+public class PullMessagesResponse implements Streamable, Serializable
{
private List messages;
+ PullMessagesResponse()
+ {
+ }
+
PullMessagesResponse(int size)
{
messages = new ArrayList(size);
@@ -53,6 +58,11 @@
{
messages.add(msg);
}
+
+ List getMessages()
+ {
+ return messages;
+ }
public void read(DataInputStream in) throws Exception
{
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java 2006-09-22 19:47:06 UTC (rev 1353)
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.plugin.contract.MessagingComponent;
+
+/**
+ * A StatsSender
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class StatsSender implements MessagingComponent
+{
+ private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
+
+ private PostOfficeInternal office;
+
+ private boolean started;
+
+ private Timer timer;
+
+ private long period;
+
+ StatsSender(PostOfficeInternal office, long period)
+ {
+ this.office = office;
+
+ this.period = period;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ //Needs to be daemon
+ timer = new Timer(true);
+
+ //Add a random delay to prevent all timers starting at once
+ long delay = (long)(period * Math.random());
+
+ TimerTask task = new SendStatsTimerTask();
+
+ timer.schedule(task, delay, period);
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ timer.cancel();
+
+ timer = null;
+ }
+
+
+
+ class SendStatsTimerTask extends TimerTask
+ {
+
+ public void run()
+ {
+ try
+ {
+ office.sendStats();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to send statistics", e);
+ }
+ }
+
+ }
+
+}
Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java 2006-09-22 19:47:06 UTC (rev 1353)
@@ -0,0 +1,321 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.plugin.base;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.naming.InitialContext;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.jboss.jms.server.QueuedExecutorPool;
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.plugin.IdManager;
+import org.jboss.messaging.core.plugin.JDBCPersistenceManager;
+import org.jboss.messaging.core.plugin.SimpleMessageStore;
+import org.jboss.messaging.core.plugin.contract.MessageStore;
+import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.plugin.postoffice.DefaultPostOffice;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionRepository;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.jmx.ServiceContainer;
+import org.jboss.test.messaging.util.CoreMessageFactory;
+import org.jboss.tm.TransactionManagerService;
+
+/**
+ *
+ * A PostOfficeTestBase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class ClusteringTestBase extends MessagingTestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected ServiceContainer sc;
+
+ protected IdManager im;
+
+ protected PersistenceManager pm;
+
+ protected MessageStore ms;
+
+ protected TransactionRepository tr;
+
+ protected QueuedExecutorPool pool;
+
+ // Constructors --------------------------------------------------
+
+ public ClusteringTestBase(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ sc = new ServiceContainer("all");
+
+ sc.start();
+
+ pm =
+ new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(), null,
+ true, true, true, 100);
+ pm.start();
+
+ tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+ tr.start();
+
+ ms = new SimpleMessageStore();
+ ms.start();
+
+ pool = new QueuedExecutorPool(10);
+
+ im = new IdManager("CHANNEL_ID", 10, pm);
+
+ log.debug("setup done");
+ }
+
+ public void tearDown() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ sc.stop();
+ sc = null;
+ }
+ pm.stop();
+ tr.stop();
+ ms.stop();
+
+ super.tearDown();
+ }
+
+ // Public --------------------------------------------------------
+
+ protected PostOffice createPostOffice() throws Exception
+ {
+ FilterFactory ff = new SimpleFilterFactory();
+
+ DefaultPostOffice postOffice =
+ new DefaultPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+ null, true, "node1", "Simple", ms, pm, tr, ff, pool);
+
+ postOffice.start();
+
+ return postOffice;
+ }
+
+ protected boolean checkNoBindingData() throws Exception
+ {
+ InitialContext ctx = new InitialContext();
+
+ TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
+ DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
+
+ javax.transaction.Transaction txOld = mgr.suspend();
+ mgr.begin();
+
+ Connection conn = null;
+
+ PreparedStatement ps = null;
+
+ ResultSet rs = null;
+
+ try
+ {
+ conn = ds.getConnection();
+ String sql = "SELECT * FROM JMS_POSTOFFICE";
+ ps = conn.prepareStatement(sql);
+
+ rs = ps.executeQuery();
+
+ return rs.next();
+ }
+ finally
+ {
+ if (rs != null) rs.close();
+
+ if (ps != null) ps.close();
+
+ if (conn != null) conn.close();
+
+ mgr.commit();
+
+ if (txOld != null)
+ {
+ mgr.resume(txOld);
+ }
+
+ }
+ }
+
+ protected boolean checkNoMessageData() throws Exception
+ {
+ InitialContext ctx = new InitialContext();
+
+ TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
+ DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
+
+ javax.transaction.Transaction txOld = mgr.suspend();
+ mgr.begin();
+
+ Connection conn = null;
+
+ PreparedStatement ps = null;
+
+ ResultSet rs = null;
+
+ try
+ {
+ conn = ds.getConnection();
+ String sql = "SELECT * FROM JMS_MESSAGE_REFERENCE";
+ ps = conn.prepareStatement(sql);
+
+ rs = ps.executeQuery();
+
+ boolean exists = rs.next();
+
+ if (!exists)
+ {
+ rs.close();
+
+ ps.close();
+
+ ps = conn.prepareStatement("SELECT * FROM JMS_MESSAGE");
+
+ rs = ps.executeQuery();
+
+ exists = rs.next();
+ }
+
+ return exists;
+ }
+ finally
+ {
+ if (rs != null) rs.close();
+
+ if (ps != null) ps.close();
+
+ if (conn != null) conn.close();
+
+ mgr.commit();
+
+ if (txOld != null)
+ {
+ mgr.resume(txOld);
+ }
+
+ }
+ }
+
+ protected List sendMessages(String condition, boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
+ {
+ List list = new ArrayList();
+
+ for (int i = 0; i < num; i++)
+ {
+ Message msg = CoreMessageFactory.createCoreMessage(i + 1, persistent, null);
+
+ MessageReference ref = ms.reference(msg);
+
+ boolean routed = office.route(ref, condition, null);
+
+ assertTrue(routed);
+
+ list.add(msg);
+ }
+
+ Thread.sleep(1000);
+
+ return list;
+ }
+
+ protected void checkContainsAndAcknowledge(Message msg, SimpleReceiver receiver, Queue queue) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ Message msgRec = (Message)msgs.get(0);
+ assertEquals(msg.getMessageID(), msgRec.getMessageID());
+ receiver.acknowledge(msgRec, null);
+ msgs = queue.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receiver.clear();
+ }
+
+ protected void checkContainsAndAcknowledge(List msgList, SimpleReceiver receiver, Queue queue) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertEquals(msgList.size(), msgs.size());
+
+ for (int i = 0; i < msgList.size(); i++)
+ {
+ Message msgRec = (Message)msgs.get(i);
+ Message msgCheck = (Message)msgList.get(i);
+ assertEquals(msgCheck.getMessageID(), msgRec.getMessageID());
+ receiver.acknowledge(msgRec, null);
+ }
+
+ msgs = queue.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receiver.clear();
+ }
+
+ protected void checkEmpty(SimpleReceiver receiver) throws Throwable
+ {
+ List msgs = receiver.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
+
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java 2006-09-22 19:47:06 UTC (rev 1353)
@@ -54,6 +54,7 @@
import org.jboss.test.messaging.core.SimpleFilter;
import org.jboss.test.messaging.core.SimpleFilterFactory;
import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.jmx.ServiceContainer;
import org.jboss.test.messaging.util.CoreMessageFactory;
@@ -71,7 +72,7 @@
* $Id$
*
*/
-public class DefaultPostOfficeTest extends MessagingTestCase
+public class DefaultPostOfficeTest extends ClusteringTestBase
{
// Constants -----------------------------------------------------
@@ -79,18 +80,6 @@
// Attributes ----------------------------------------------------
- protected ServiceContainer sc;
-
- protected IdManager im;
-
- protected PersistenceManager pm;
-
- protected MessageStore ms;
-
- protected TransactionRepository tr;
-
- protected QueuedExecutorPool pool;
-
// Constructors --------------------------------------------------
public DefaultPostOfficeTest(String name)
@@ -103,40 +92,11 @@
public void setUp() throws Exception
{
super.setUp();
-
- sc = new ServiceContainer("all");
-
- sc.start();
-
- pm =
- new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(), null,
- true, true, true, 100);
- pm.start();
-
- tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
- tr.start();
-
- ms = new SimpleMessageStore();
- ms.start();
-
- pool = new QueuedExecutorPool(10);
-
- im = new IdManager("CHANNEL_ID", 10, pm);
-
- log.debug("setup done");
+
}
public void tearDown() throws Exception
- {
- if (!ServerManagement.isRemote())
- {
- sc.stop();
- sc = null;
- }
- pm.stop();
- tr.stop();
- ms.stop();
-
+ {
super.tearDown();
}
@@ -1140,122 +1100,8 @@
assertEquals(binding1.getQueue().isRecoverable(), binding2.getQueue().isRecoverable());
}
- protected PostOffice createPostOffice() throws Exception
- {
- FilterFactory ff = new SimpleFilterFactory();
-
- DefaultPostOffice postOffice =
- new DefaultPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- null, true, "node1", "Simple", ms, pm, tr, ff, pool);
-
- postOffice.start();
-
- return postOffice;
- }
- protected boolean checkNoBindingData() throws Exception
- {
- InitialContext ctx = new InitialContext();
-
- TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
- DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
-
- javax.transaction.Transaction txOld = mgr.suspend();
- mgr.begin();
-
- Connection conn = null;
-
- PreparedStatement ps = null;
-
- ResultSet rs = null;
-
- try
- {
- conn = ds.getConnection();
- String sql = "SELECT * FROM JMS_POSTOFFICE";
- ps = conn.prepareStatement(sql);
-
- rs = ps.executeQuery();
-
- return rs.next();
- }
- finally
- {
- if (rs != null) rs.close();
-
- if (ps != null) ps.close();
-
- if (conn != null) conn.close();
-
- mgr.commit();
-
- if (txOld != null)
- {
- mgr.resume(txOld);
- }
-
- }
- }
- protected boolean checkNoMessageData() throws Exception
- {
- InitialContext ctx = new InitialContext();
-
- TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
- DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
-
- javax.transaction.Transaction txOld = mgr.suspend();
- mgr.begin();
-
- Connection conn = null;
-
- PreparedStatement ps = null;
-
- ResultSet rs = null;
-
- try
- {
- conn = ds.getConnection();
- String sql = "SELECT * FROM JMS_MESSAGE_REFERENCE";
- ps = conn.prepareStatement(sql);
-
- rs = ps.executeQuery();
-
- boolean exists = rs.next();
-
- if (!exists)
- {
- rs.close();
-
- ps.close();
-
- ps = conn.prepareStatement("SELECT * FROM JMS_MESSAGE");
-
- rs = ps.executeQuery();
-
- exists = rs.next();
- }
-
- return exists;
- }
- finally
- {
- if (rs != null) rs.close();
-
- if (ps != null) ps.close();
-
- if (conn != null) conn.close();
-
- mgr.commit();
-
- if (txOld != null)
- {
- mgr.resume(txOld);
- }
-
- }
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-09-22 19:47:06 UTC (rev 1353)
@@ -1178,7 +1178,7 @@
//========================
log.info("******** sending");
- List msgs = sendMessages(persistent, office1, 3, null);
+ List msgs = sendMessages("topic", persistent, office1, 3, null);
log.info("********** sent");
//n2
@@ -1211,7 +1211,7 @@
//Send 3 messages at node2
//========================
- msgs = sendMessages(persistent, office2, 3, null);
+ msgs = sendMessages("topic", persistent, office2, 3, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1242,7 +1242,7 @@
//Send 3 messages at node3
//========================
- msgs = sendMessages(persistent, office3, 3, null);
+ msgs = sendMessages("topic", persistent, office3, 3, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1281,7 +1281,7 @@
// * node6: 1 shared durable (shared2), 1 non durable
// * node7: 1 shared durable (shared2)
- msgs = sendMessages(persistent, office4, 3, null);
+ msgs = sendMessages("topic", persistent, office4, 3, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1312,7 +1312,7 @@
//Send 3 messages at node5
//========================
- msgs = sendMessages(persistent, office5, 3, null);
+ msgs = sendMessages("topic", persistent, office5, 3, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1343,7 +1343,7 @@
//Send 3 messages at node6
//========================
- msgs = sendMessages(persistent, office6, 3, null);
+ msgs = sendMessages("topic", persistent, office6, 3, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1375,7 +1375,7 @@
//Send 3 messages at node7
//========================
- msgs = sendMessages(persistent, office7, 3, null);
+ msgs = sendMessages("topic", persistent, office7, 3, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -2101,7 +2101,7 @@
groupName,
JGroupsUtil.getControlStackProperties(),
JGroupsUtil.getDataStackProperties(),
- 5000, 5000, pullPolicy, rf, 1);
+ 5000, 5000, pullPolicy, rf, 1, 1000);
postOffice.start();
@@ -2110,65 +2110,7 @@
// Private -------------------------------------------------------
- private List sendMessages(boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
- {
- List list = new ArrayList();
-
- Message msg = CoreMessageFactory.createCoreMessage(1, persistent, null);
-
- MessageReference ref = ms.reference(msg);
-
- boolean routed = office.route(ref, "topic", null);
-
- assertTrue(routed);
-
- list.add(msg);
-
- Thread.sleep(1000);
-
- return list;
- }
- private void checkContainsAndAcknowledge(Message msg, SimpleReceiver receiver, Queue queue) throws Throwable
- {
- List msgs = receiver.getMessages();
- assertNotNull(msgs);
- assertEquals(1, msgs.size());
- Message msgRec = (Message)msgs.get(0);
- assertEquals(msg.getMessageID(), msgRec.getMessageID());
- receiver.acknowledge(msgRec, null);
- msgs = queue.browse();
- assertNotNull(msgs);
- assertTrue(msgs.isEmpty());
- receiver.clear();
- }
-
- private void checkContainsAndAcknowledge(List msgList, SimpleReceiver receiver, Queue queue) throws Throwable
- {
- List msgs = receiver.getMessages();
- assertNotNull(msgs);
- assertEquals(msgList.size(), msgs.size());
-
- for (int i = 0; i < msgList.size(); i++)
- {
- Message msgRec = (Message)msgs.get(i);
- Message msgCheck = (Message)msgList.get(i);
- assertEquals(msgCheck.getMessageID(), msgRec.getMessageID());
- receiver.acknowledge(msgRec, null);
- }
-
- msgs = queue.browse();
- assertNotNull(msgs);
- assertTrue(msgs.isEmpty());
- receiver.clear();
- }
-
- private void checkEmpty(SimpleReceiver receiver) throws Throwable
- {
- List msgs = receiver.getMessages();
- assertNotNull(msgs);
- assertTrue(msgs.isEmpty());
- }
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2006-09-22 19:47:06 UTC (rev 1353)
@@ -21,21 +21,10 @@
*/
package org.jboss.test.messaging.core.plugin.postoffice.cluster;
-import java.util.ArrayList;
import java.util.List;
-import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Queue;
-import org.jboss.messaging.core.plugin.IdManager;
-import org.jboss.messaging.core.plugin.JDBCPersistenceManager;
-import org.jboss.messaging.core.plugin.SimpleMessageStore;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.contract.MessageStore;
-import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
@@ -43,14 +32,9 @@
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
-import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.messaging.core.tx.TransactionRepository;
-import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.core.SimpleFilterFactory;
import org.jboss.test.messaging.core.SimpleReceiver;
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.test.messaging.tools.jmx.ServiceContainer;
-import org.jboss.test.messaging.util.CoreMessageFactory;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -64,7 +48,7 @@
* $Id$
*
*/
-public class DefaultRouterTest extends MessagingTestCase
+public class DefaultRouterTest extends ClusteringTestBase
{
// Constants -----------------------------------------------------
@@ -72,18 +56,6 @@
// Attributes ----------------------------------------------------
- protected ServiceContainer sc;
-
- protected IdManager im;
-
- protected PersistenceManager pm;
-
- protected MessageStore ms;
-
- protected TransactionRepository tr;
-
- protected QueuedExecutorPool pool;
-
// Constructors --------------------------------------------------
public DefaultRouterTest(String name)
@@ -96,40 +68,10 @@
public void setUp() throws Exception
{
super.setUp();
-
- sc = new ServiceContainer("all");
-
- sc.start();
-
- pm =
- new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(), null,
- true, true, true, 100);
- pm.start();
-
- tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
- tr.start();
-
- ms = new SimpleMessageStore();
- ms.start();
-
- pool = new QueuedExecutorPool(10);
-
- im = new IdManager("CHANNEL_ID", 10, pm);
-
- log.debug("setup done");
}
public void tearDown() throws Exception
{
- if (!ServerManagement.isRemote())
- {
- sc.stop();
- sc = null;
- }
- pm.stop();
- tr.stop();
- ms.stop();
-
super.tearDown();
}
@@ -206,49 +148,49 @@
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue5.add(receiver5);
- List msgs = sendMessages(persistent, office1, 3, null);
+ List msgs = sendMessages("topic", persistent, office1, 3, null);
checkContainsAndAcknowledge(msgs, receiver1, queue1);
checkEmpty(receiver2);
checkEmpty(receiver3);
checkEmpty(receiver4);
checkEmpty(receiver5);
- msgs = sendMessages(persistent, office1, 3, null);
+ msgs = sendMessages("topic", persistent, office1, 3, null);
checkEmpty(receiver1);
checkContainsAndAcknowledge(msgs, receiver2, queue1);
checkEmpty(receiver3);
checkEmpty(receiver4);
checkEmpty(receiver5);
- msgs = sendMessages(persistent, office1, 3, null);
+ msgs = sendMessages("topic", persistent, office1, 3, null);
checkEmpty(receiver1);
checkEmpty(receiver2);
checkContainsAndAcknowledge(msgs, receiver3, queue1);
checkEmpty(receiver4);
checkEmpty(receiver5);
- msgs = sendMessages(persistent, office1, 3, null);
+ msgs = sendMessages("topic", persistent, office1, 3, null);
checkEmpty(receiver1);
checkEmpty(receiver2);
checkEmpty(receiver3);
checkContainsAndAcknowledge(msgs, receiver4, queue1);
checkEmpty(receiver5);
- msgs = sendMessages(persistent, office1, 3, null);
+ msgs = sendMessages("topic", persistent, office1, 3, null);
checkEmpty(receiver1);
checkEmpty(receiver2);
checkEmpty(receiver3);
checkEmpty(receiver4);
checkContainsAndAcknowledge(msgs, receiver5, queue1);
- msgs = sendMessages(persistent, office1, 3, null);
+ msgs = sendMessages("topic", persistent, office1, 3, null);
checkContainsAndAcknowledge(msgs, receiver1, queue1);
checkEmpty(receiver2);
checkEmpty(receiver3);
checkEmpty(receiver4);
checkEmpty(receiver5);
- msgs = sendMessages(persistent, office1, 3, null);
+ msgs = sendMessages("topic", persistent, office1, 3, null);
checkEmpty(receiver1);
checkContainsAndAcknowledge(msgs, receiver2, queue1);
checkEmpty(receiver3);
@@ -345,21 +287,21 @@
SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue5.add(receiver5);
- List msgs = sendMessages(persistent, office2, 3, null);
+ List msgs = sendMessages("topic", persistent, office2, 3, null);
checkContainsAndAcknowledge(msgs, receiver1, queue1);
checkEmpty(receiver2);
checkEmpty(receiver3);
checkEmpty(receiver4);
checkEmpty(receiver5);
- msgs = sendMessages(persistent, office2, 3, null);
+ msgs = sendMessages("topic", persistent, office2, 3, null);
checkContainsAndAcknowledge(msgs, receiver1, queue1);
checkEmpty(receiver2);
checkEmpty(receiver3);
checkEmpty(receiver4);
checkEmpty(receiver5);
- msgs = sendMessages(persistent, office2, 3, null);
+ msgs = sendMessages("topic", persistent, office2, 3, null);
checkContainsAndAcknowledge(msgs, receiver1, queue1);
checkEmpty(receiver2);
checkEmpty(receiver3);
@@ -367,21 +309,21 @@
checkEmpty(receiver5);
- msgs = sendMessages(persistent, office3, 3, null);
+ msgs = sendMessages("topic", persistent, office3, 3, null);
checkEmpty(receiver1);
checkContainsAndAcknowledge(msgs, receiver2, queue1);
checkEmpty(receiver3);
checkEmpty(receiver4);
checkEmpty(receiver5);
- msgs = sendMessages(persistent, office3, 3, null);
+ msgs = sendMessages("topic", persistent, office3, 3, null);
checkEmpty(receiver1);
checkContainsAndAcknowledge(msgs, receiver2, queue1);
checkEmpty(receiver3);
checkEmpty(receiver4);
checkEmpty(receiver5);
- msgs = sendMessages(persistent, office3, 3, null);
+ msgs = sendMessages("topic", persistent, office3, 3, null);
checkEmpty(receiver1);
checkContainsAndAcknowledge(msgs, receiver2, queue1);
checkEmpty(receiver3);
@@ -440,7 +382,7 @@
groupName,
JGroupsUtil.getControlStackProperties(),
JGroupsUtil.getDataStackProperties(),
- 5000, 5000, redistPolicy, rf, 1);
+ 5000, 5000, redistPolicy, rf, 1, 1000);
postOffice.start();
@@ -449,70 +391,7 @@
// Private -------------------------------------------------------
- //TODO these methods are duplicated from DefaultClusteredPostOfficeTest - put in common super class or somewhere
- //else
- private List sendMessages(boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
- {
- List list = new ArrayList();
-
- Message msg = CoreMessageFactory.createCoreMessage(1, persistent, null);
-
- MessageReference ref = ms.reference(msg);
-
- boolean routed = office.route(ref, "topic", null);
-
- assertTrue(routed);
-
- list.add(msg);
-
- Thread.sleep(1000);
-
- return list;
- }
-
-
- private void checkContainsAndAcknowledge(Message msg, SimpleReceiver receiver, Queue queue) throws Throwable
- {
- List msgs = receiver.getMessages();
- assertNotNull(msgs);
- assertEquals(1, msgs.size());
- Message msgRec = (Message)msgs.get(0);
- assertEquals(msg.getMessageID(), msgRec.getMessageID());
- receiver.acknowledge(msgRec, null);
- msgs = queue.browse();
- assertNotNull(msgs);
- assertTrue(msgs.isEmpty());
- receiver.clear();
- }
-
- private void checkContainsAndAcknowledge(List msgList, SimpleReceiver receiver, Queue queue) throws Throwable
- {
- List msgs = receiver.getMessages();
- assertNotNull(msgs);
- assertEquals(msgList.size(), msgs.size());
-
- for (int i = 0; i < msgList.size(); i++)
- {
- Message msgRec = (Message)msgs.get(i);
- Message msgCheck = (Message)msgList.get(i);
- assertEquals(msgCheck.getMessageID(), msgRec.getMessageID());
- receiver.acknowledge(msgRec, null);
- }
-
- msgs = queue.browse();
- assertNotNull(msgs);
- assertTrue(msgs.isEmpty());
- receiver.clear();
- }
-
- private void checkEmpty(SimpleReceiver receiver) throws Throwable
- {
- List msgs = receiver.getMessages();
- assertNotNull(msgs);
- assertTrue(msgs.isEmpty());
- }
-
// Inner classes -------------------------------------------------
Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-09-22 19:47:06 UTC (rev 1353)
@@ -0,0 +1,270 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+
+public class RedistributionTest extends ClusteringTestBase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public RedistributionTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testRedist() throws Throwable
+ {
+ redistTest(true);
+ }
+
+ /*
+ *
+ *
+ *
+ */
+ public void redistTest(boolean persistent) throws Throwable
+ {
+ ClusteredPostOffice office1 = null;
+
+ ClusteredPostOffice office2 = null;
+
+ ClusteredPostOffice office3 = null;
+
+ ClusteredPostOffice office4 = null;
+
+ ClusteredPostOffice office5 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice("node1", "testgroup");
+
+ office2 = createClusteredPostOffice("node2", "testgroup");
+
+ office3 = createClusteredPostOffice("node3", "testgroup");
+
+ office4 = createClusteredPostOffice("node4", "testgroup");
+
+ office5 = createClusteredPostOffice("node5", "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+ PullingReceiver receiver1 = new PullingReceiver();
+ queue1.add(receiver1);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+ PullingReceiver receiver2 = new PullingReceiver();
+ queue2.add(receiver2);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
+ PullingReceiver receiver3 = new PullingReceiver();
+ queue3.add(receiver3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+ PullingReceiver receiver4 = new PullingReceiver();
+ queue4.add(receiver4);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+ Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+ PullingReceiver receiver5 = new PullingReceiver();
+ queue5.add(receiver5);
+
+ //Send 30 messages to each queue
+ this.sendMessages("queue1", persistent, office1, 30, null);
+ this.sendMessages("queue1", persistent, office2, 30, null);
+ this.sendMessages("queue1", persistent, office3, 30, null);
+ this.sendMessages("queue1", persistent, office4, 30, null);
+ this.sendMessages("queue1", persistent, office5, 30, null);
+
+ Thread.sleep(500);
+
+ List msgs = queue1.browse();
+ assertEquals(30, msgs.size());
+
+ msgs = queue2.browse();
+ assertEquals(30, msgs.size());
+
+ msgs = queue3.browse();
+ assertEquals(30, msgs.size());
+
+ msgs = queue4.browse();
+ assertEquals(30, msgs.size());
+
+ msgs = queue5.browse();
+ assertEquals(30, msgs.size());
+
+ //Consume all the messages from queue 3
+ for (int i = 0; i < 30; i++)
+ {
+ Delivery del = receiver3.getDelivery();
+ log.info("Got delivery: " + del.getReference().getMessageID());
+ del.acknowledge(null);
+ queue3.deliver(false);
+ }
+
+ msgs = queue3.browse();
+ assertEquals(0, msgs.size());
+
+ queue3.deliver(false);
+
+ Delivery del = receiver3.getDelivery();
+
+ log.info("delivery is " + del);
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+
+ if (office3 != null)
+ {
+ office3.stop();
+ }
+
+ if (office4 != null)
+ {
+ office4.stop();
+ }
+
+ if (office5 != null)
+ {
+ office5.stop();
+ }
+ }
+ }
+
+ class PullingReceiver implements Receiver
+ {
+ private Delivery del;
+
+ public synchronized Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+ {
+ if (del != null)
+ {
+ return null;
+ }
+
+ del = new SimpleDelivery(observer, reference, false);
+
+ this.notify();
+
+ return del;
+ }
+
+ public synchronized Delivery getDelivery()
+ {
+ while (del == null)
+ {
+ try
+ {
+ this.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ Delivery ret = del;
+ del = null;
+ return ret;
+ }
+
+ }
+
+ protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
+ {
+ MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
+
+ FilterFactory ff = new SimpleFilterFactory();
+
+ ClusterRouterFactory rf = new DefaultRouterFactory();
+
+ DefaultClusteredPostOffice postOffice =
+ new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+ null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+ groupName,
+ JGroupsUtil.getControlStackProperties(),
+ JGroupsUtil.getDataStackProperties(),
+ 5000, 5000, pullPolicy, rf, 1, 1000);
+
+ postOffice.start();
+
+ return postOffice;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
+
+
+
More information about the jboss-cvs-commits
mailing list