[jboss-cvs] JBoss Messaging SVN: r2822 - in trunk: src/main/org/jboss/jms/client/container and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jul 2 16:23:07 EDT 2007
Author: timfox
Date: 2007-07-02 16:23:06 -0400 (Mon, 02 Jul 2007)
New Revision: 2822
Added:
trunk/tests/src/org/jboss/test/messaging/core/clusterconnection/
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java
Modified:
trunk/src/etc/server/default/deploy/db2-persistence-service.xml
trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/destination/QueueService.java
trunk/src/main/org/jboss/jms/server/destination/TopicService.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
trunk/src/main/org/jboss/messaging/core/contract/Queue.java
trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java
trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/src/org/jboss/test/messaging/core/NonRecoverableMessagingQueueTest.java
trunk/tests/src/org/jboss/test/messaging/core/RecoverableMessagingQueueTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
Log:
Various changes and fixes including new tests
Modified: trunk/src/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/db2-persistence-service.xml 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/etc/server/default/deploy/db2-persistence-service.xml 2007-07-02 20:23:06 UTC (rev 2822)
@@ -105,7 +105,7 @@
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, CLUSTERED CHAR(1), ALL_NODES CHAR(1))
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(255), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
Modified: trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-07-02 20:23:06 UTC (rev 2822)
@@ -108,7 +108,7 @@
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID INTEGER, CLSTERED CHAR(1), ALL_NODES CHAR(1))
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(255), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID INTEGER, CLSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-07-02 20:23:06 UTC (rev 2822)
@@ -108,7 +108,7 @@
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, CLUSTERED CHAR(1), ALL_NODES CHAR(1))
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(255), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
Modified: trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-07-02 20:23:06 UTC (rev 2822)
@@ -108,7 +108,7 @@
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR2(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR2(1023), COND VARCHAR2(1023), SELECTOR VARCHAR2(1023), CHANNEL_ID INTEGER, CLUSTERED CHAR(1), ALL_NODES CHAR(1))
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR2(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR2(255), COND VARCHAR2(1023), SELECTOR VARCHAR2(1023), CHANNEL_ID INTEGER, CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
Modified: trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-07-02 20:23:06 UTC (rev 2822)
@@ -108,7 +108,7 @@
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, CLUSTERED CHAR(1), ALL_NODES CHAR(1))
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(255), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
Modified: trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-07-02 20:23:06 UTC (rev 2822)
@@ -113,7 +113,7 @@
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023) NULL, CHANNEL_ID INTEGER, CLSTERED CHAR(1), ALL_NODES CHAR(1))
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(255), COND VARCHAR(1023), SELECTOR VARCHAR(1023) NULL, CHANNEL_ID INTEGER, CLSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))
INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -90,7 +90,7 @@
}
else
{
- log.trace(proxy.getMessage() + " has reached maximum delivery number, cancelling to server");
+ log.trace(proxy.getMessage() + " has reached maximum delivery number " + maxDeliveries +", cancelling to server");
}
}
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -251,7 +251,7 @@
if (clusterPullConnectionFactoryName != null)
{
- clusterConnectionManager = new ClusterConnectionManager(useXAForMessagePull, serverPeerID, clusterPullConnectionFactoryName);
+ clusterConnectionManager = new ClusterConnectionManager(useXAForMessagePull, serverPeerID, clusterPullConnectionFactoryName, defaultPreserveOrdering);
clusterNotifier.registerListener(clusterConnectionManager);
}
Modified: trunk/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -86,8 +86,6 @@
destination.getPageSize(),
destination.getDownCacheSize());
- queue.setPreserveOrdering(serverPeer.isDefaultPreserveOrdering());
-
queue.load();
// Must be done after load
@@ -107,8 +105,7 @@
true,
destination.getMaxSize(), null,
destination.getFullSize(), destination.getPageSize(),
- destination.getDownCacheSize(), destination.isClustered(),
- serverPeer.isDefaultPreserveOrdering());
+ destination.getDownCacheSize(), destination.isClustered());
po.addBinding(new Binding(queueCond, queue, false), false);
queue.activate();
Modified: trunk/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/TopicService.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/jms/server/destination/TopicService.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -79,8 +79,6 @@
//instead we should never create queues inside the postoffice - only do it at deploy time
queue.setPagingParams(destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize());
- queue.setPreserveOrdering(serverPeer.isDefaultPreserveOrdering());
-
queue.load();
queue.activate();
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -109,6 +109,8 @@
private boolean remote;
+ private boolean preserveOrdering;
+
// Constructors ---------------------------------------------------------------------------------
ServerConsumerEndpoint(int id, Queue messageQueue, String queueName,
@@ -151,6 +153,8 @@
this.startStopLock = new Object();
+ this.preserveOrdering = sessionEndpoint.getConnectionEndpoint().getServerPeer().isDefaultPreserveOrdering();
+
if (dest.isTopic() && !messageQueue.isRecoverable())
{
// This is a consumer of a non durable topic subscription. We don't need to store
@@ -224,6 +228,18 @@
return delivery;
}
+
+ if (preserveOrdering && remote)
+ {
+ //If the header exists it means the message has already been sucked once - so reject.
+
+ if (ref.getMessage().getHeader(Message.CLUSTER_SUCKED) != null)
+ {
+ if (trace) { log.trace("Message has already been sucked once - not sucking again"); }
+
+ return null;
+ }
+ }
synchronized (startStopLock)
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -590,8 +590,7 @@
{
Queue coreQueue = new MessagingQueue(nodeId, dest.getName(),
idm.getID(), ms, pm, false, -1, null,
- fullSize, pageSize, downCacheSize, postOffice.isClustered(),
- sp.isDefaultPreserveOrdering());
+ fullSize, pageSize, downCacheSize, postOffice.isClustered());
Condition cond = new JMSCondition(true, dest.getName());
@@ -1210,7 +1209,7 @@
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, binding.queue,
binding.queue.getName(), this, selectorString, false,
- dest, null, null, 0, 0, true);
+ dest, null, null, 0, -1, true);
ConsumerAdvised advised;
@@ -1224,7 +1223,7 @@
Dispatcher.instance.registerTarget(consumerID, advised);
ClientConsumerDelegate stub =
- new ClientConsumerDelegate(consumerID, prefetchSize, 0, 0);
+ new ClientConsumerDelegate(consumerID, prefetchSize, -1, 0);
synchronized (consumers)
{
@@ -1306,8 +1305,7 @@
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize(),
- mDest.isClustered(),
- sp.isDefaultPreserveOrdering());
+ mDest.isClustered());
JMSCondition topicCond = new JMSCondition(false, jmsDestination.getName());
@@ -1364,8 +1362,7 @@
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize(),
- mDest.isClustered(),
- sp.isDefaultPreserveOrdering());
+ mDest.isClustered());
// Durable subs must be bound on ALL nodes of the cluster (if clustered)
@@ -1452,8 +1449,7 @@
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize(),
- mDest.isClustered(),
- sp.isDefaultPreserveOrdering());
+ mDest.isClustered());
// Durable subs must be bound on ALL nodes of the cluster
Modified: trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -61,7 +61,7 @@
* @param allNodes Add this binding on ALL nodes?
* @throws Exception
*/
- void addBinding(Binding binding, boolean allNodes) throws Exception;
+ boolean addBinding(Binding binding, boolean allNodes) throws Exception;
/**
* Remove a binding from the post office
@@ -69,7 +69,7 @@
* @param allNodes Remove this binding from ALL node?
* @throws Throwable
*/
- void removeBinding(String queueName, boolean allNodes) throws Throwable;
+ Binding removeBinding(String queueName, boolean allNodes) throws Throwable;
/**
* Route a reference.
Modified: trunk/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Queue.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/messaging/core/contract/Queue.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -56,10 +56,6 @@
int getDownCacheSize();
- boolean isPreserveOrdering();
-
- void setPreserveOrdering(boolean preserveOrdering);
-
boolean isClustered();
String getName();
Modified: trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -27,7 +27,6 @@
import org.jboss.messaging.core.contract.Delivery;
import org.jboss.messaging.core.contract.DeliveryObserver;
import org.jboss.messaging.core.contract.Distributor;
-import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.Receiver;
import org.jboss.messaging.core.impl.tx.Transaction;
@@ -60,11 +59,9 @@
private Distributor remoteDistributor;
- private boolean preserveOrdering;
-
// Constructors ---------------------------------------------------------------------------------
- public ClusterRoundRobinDistributor(Distributor local, Distributor remote, boolean preserveOrdering)
+ public ClusterRoundRobinDistributor(Distributor local, Distributor remote)
{
localDistributor = local;
@@ -87,28 +84,10 @@
{
//If no local distributor takes the ref then we try the remote distributor
- if (preserveOrdering)
- {
- if (ref.getMessage().getHeader(Message.CLUSTER_SUCKED) != null)
- {
- //The message has already been sucked once - don't suck it again
-
- if (trace) { log.trace(this + " preserveOrdering is true and has already been sucked so not allowing message to be sucked again"); }
-
- return null;
- }
- else
- {
- //Add the header - so it doesn't get sucked more than once
-
- ref.getMessage().putHeader(Message.CLUSTER_SUCKED, new Integer(333));
- }
- }
-
if (trace) { log.trace(this + " trying with remote distributor"); }
del = remoteDistributor.handle(observer, ref, tx);
-
+
if (trace) { log.trace(this + " remote distributor returned " + del); }
}
@@ -171,14 +150,13 @@
{
return remoteDistributor.remove(r);
}
-
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
- // Private --------------------------------------------------------------------------------------
+ // Private --------------------------------------------------------------------------------------
-
// Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -79,18 +79,15 @@
private boolean handleFlowControlForConsumers;
- private boolean preserveOrdering;
-
// Constructors --------------------------------------------------
public MessagingQueue(int nodeID, String name, long id, MessageStore ms, PersistenceManager pm,
boolean recoverable, int maxSize, Filter filter,
- int fullSize, int pageSize, int downCacheSize, boolean clustered,
- boolean preserveOrdering)
+ int fullSize, int pageSize, int downCacheSize, boolean clustered)
{
super(id, ms, pm, recoverable, maxSize, fullSize, pageSize, downCacheSize);
-
- setup(nodeID, name, filter, clustered, preserveOrdering);
+
+ setup(nodeID, name, filter, clustered);
}
/** This constructor is used when loading queue from storage - the paging params, maxSize and preserveOrdering don't matter
@@ -102,17 +99,16 @@
{
super(id, ms, pm, recoverable, -1, 100000, 2000, 2000); //paging params etc are actually ignored
- setup(nodeID, name, filter, clustered, false);
+ setup(nodeID, name, filter, clustered);
}
/* This constructor is only used in tests - should we remove it? */
public MessagingQueue(int nodeID, String name, long id, MessageStore ms, PersistenceManager pm,
- boolean recoverable, int maxSize, Filter filter, boolean clustered,
- boolean preserveOrdering)
+ boolean recoverable, int maxSize, Filter filter, boolean clustered)
{
super(id, ms, pm, recoverable, maxSize);
- setup(nodeID, name, filter, clustered, preserveOrdering);
+ setup(nodeID, name, filter, clustered);
}
/* Constructor for a remote queue representation in a cluster */
@@ -121,10 +117,10 @@
{
super(id, null, null, recoverable, -1);
- setup(nodeID, name, filter, clustered, false);
+ setup(nodeID, name, filter, clustered);
}
- private void setup(int nodeID, String name, Filter filter, boolean clustered, boolean preserveOrdering)
+ private void setup(int nodeID, String name, Filter filter, boolean clustered)
{
this.nodeID = nodeID;
@@ -134,13 +130,11 @@
this.clustered = clustered;
- this.preserveOrdering = preserveOrdering;
-
localDistributor = new DistributorWrapper(new RoundRobinDistributor());
remoteDistributor = new DistributorWrapper(new RoundRobinDistributor());
- distributor = new ClusterRoundRobinDistributor(localDistributor, remoteDistributor, preserveOrdering);
+ distributor = new ClusterRoundRobinDistributor(localDistributor, remoteDistributor);
suckers = new HashSet();
}
@@ -255,16 +249,6 @@
return downCacheSize;
}
- public boolean isPreserveOrdering()
- {
- return this.preserveOrdering;
- }
-
- public void setPreserveOrdering(boolean preserveOrdering)
- {
- this.preserveOrdering = preserveOrdering;
- }
-
// ChannelSupport overrides --------------------------------------
protected void deliverInternal()
Modified: trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -107,7 +107,7 @@
if (pageSize >= fullSize)
{
- throw new IllegalArgumentException("pageSize must be less than full size");
+ throw new IllegalArgumentException("pageSize must be less than full size " + pageSize + ", " + fullSize);
}
if (downCacheSize > pageSize)
{
Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -74,8 +74,10 @@
private PostOffice postOffice;
+ private boolean preserveOrdering;
+
public ClusterConnectionManager(boolean xa, int nodeID,
- String connectionFactoryUniqueName)
+ String connectionFactoryUniqueName, boolean preserveOrdering)
{
connections = new HashMap();
@@ -85,6 +87,8 @@
this.connectionFactoryUniqueName = connectionFactoryUniqueName;
+ this.preserveOrdering = preserveOrdering;
+
if (trace) { log.trace("Created " + this); }
}
@@ -133,6 +137,11 @@
if (trace) { log.trace(this + " stopped"); }
}
+ public Map getAllConnections()
+ {
+ return connections;
+ }
+
/*
* We respond to two types of events -
*
@@ -407,7 +416,7 @@
Queue localQueue = binding.queue;
- MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection, xa);
+ MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection, xa, preserveOrdering);
info.addSucker(sucker);
Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -38,6 +38,7 @@
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
+import org.jboss.jms.message.MessageProxy;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Queue;
import org.jboss.tm.TransactionManagerLocator;
@@ -78,12 +79,14 @@
private ConsumerDelegate consumer;
+ private boolean preserveOrdering;
+
public String toString()
{
return "MessageSucker:" + System.identityHashCode(this) + " queue:" + localQueue.getName();
}
- MessageSucker(Queue localQueue, JBossConnection sourceConnection, JBossConnection localConnection, boolean xa)
+ MessageSucker(Queue localQueue, JBossConnection sourceConnection, JBossConnection localConnection, boolean xa, boolean preserveOrdering)
{
this.localQueue = localQueue;
@@ -93,6 +96,8 @@
this.xa = xa;
+ this.preserveOrdering = preserveOrdering;
+
if (xa)
{
tm = TransactionManagerLocator.getInstance().locate();
@@ -252,6 +257,12 @@
if (trace) { log.trace("Started JTA transaction"); }
}
+ if (preserveOrdering)
+ {
+ //Add a header saying we have sucked the message
+ ((MessageProxy)msg).getMessage().putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
+ }
+
producer.send(null, msg, -1, -1, Long.MIN_VALUE);
if (trace) { log.trace(this + " forwarded message to queue"); }
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -109,9 +109,7 @@
this.requestTarget = requestTarget;
this.groupListener = groupListener;
-
- this.viewExecutor = new QueuedExecutor(new LinkedQueue());
-
+
this.lock = new ReentrantWriterPreferenceReadWriteLock();
}
@@ -121,6 +119,8 @@
try
{
+ this.viewExecutor = new QueuedExecutor(new LinkedQueue());
+
this.controlChannel = jChannelFactory.createControlChannel();
this.dataChannel = jChannelFactory.createDataChannel();
@@ -182,7 +182,17 @@
dataChannel.close();
+ controlChannel = null;
+
+ dataChannel = null;
+
+ currentView = null;
+
+ viewExecutor = null;
+
started = false;
+
+ log.info("** group member shutdown");
}
finally
{
@@ -536,6 +546,9 @@
// same thread that delivered the view change and this is what we need to do in
// failover, for example.
+
+ log.info("**** got view change " + newView);
+
viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
}
catch (InterruptedException e)
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MappingInfo.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -64,8 +64,6 @@
private int downCacheSize;
- private boolean preserveOrdering;
-
private boolean allNodes;
MappingInfo()
@@ -95,7 +93,7 @@
MappingInfo(int nodeId, String queueName, String conditionText, String filterString,
long channelId, boolean recoverable, boolean clustered, boolean allNodes,
int fullSize, int pageSize, int downCacheSize,
- int maxSize, boolean preserveOrdering)
+ int maxSize)
{
this (nodeId, queueName, conditionText, filterString, channelId, recoverable, clustered, allNodes);
@@ -106,8 +104,6 @@
this.downCacheSize = downCacheSize;
this.maxSize = maxSize;
-
- this.preserveOrdering = preserveOrdering;
}
// Streamable implementation ---------------------------------------------------------------------
@@ -137,8 +133,6 @@
downCacheSize = in.readInt();
maxSize = in.readInt();
-
- preserveOrdering = in.readBoolean();
}
public void write(DataOutputStream out) throws Exception
@@ -166,8 +160,6 @@
out.writeInt(downCacheSize);
out.writeInt(maxSize);
-
- out.writeBoolean(preserveOrdering);
}
int getNodeId()
@@ -225,11 +217,6 @@
return downCacheSize;
}
- boolean isPreserveOrdering()
- {
- return preserveOrdering;
- }
-
int getMaxSize()
{
return maxSize;
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -248,14 +248,6 @@
lock = new ReentrantWriterPreferenceReadWriteLock();
- mappings = new HashMap();
-
- nameMaps = new HashMap();
-
- channelIDMap = new HashMap();
-
- nodeIDAddressMap = new ConcurrentHashMap();
-
waitForBindUnbindLock = new Object();
}
@@ -288,28 +280,29 @@
this.failoverMapper = failoverMapper;
this.clustered = true;
-
- replicatedData = new HashMap();
-
- failoverMap = new LinkedHashMap();
-
- leftSet = new ConcurrentHashSet();
-
+
groupMember = new GroupMember(groupName, stateTimeout, castTimeout, jChannelFactory, this, this);
this.supportsFailover = supportsFailover;
nbSupport = new NotificationBroadcasterSupport();
}
-
+
// MessagingComponent overrides -----------------------------------------------------------------
public void start() throws Exception
{
+ if (started)
+ {
+ throw new IllegalStateException(this + " is already started");
+ }
+
if (trace) { log.trace(this + " starting"); }
super.start();
+ init();
+
loadedBindings = getBindingsFromStorage();
if (clustered)
@@ -345,14 +338,13 @@
public synchronized void stop() throws Exception
{
+ if (!started)
+ {
+ throw new IllegalStateException(this + " is not started");
+ }
+
if (trace) { log.trace(this + " stopping"); }
-
- if (!started)
- {
- log.warn("Attempt to stop() but " + this + " is not started");
- return;
- }
-
+
super.stop();
if (clustered)
@@ -363,6 +355,8 @@
groupMember.stop();
}
+ deInit();
+
started = false;
log.debug(this + " stopped");
@@ -395,20 +389,27 @@
return officeName;
}
- public void addBinding(Binding binding, boolean allNodes) throws Exception
+ public boolean addBinding(Binding binding, boolean allNodes) throws Exception
{
- internalAddBinding(binding, allNodes, true);
+ if (allNodes && !binding.queue.isClustered())
+ {
+ throw new IllegalArgumentException("Cannot bind a non clustered queue on all nodes");
+ }
+
+ boolean added = internalAddBinding(binding, allNodes, true);
- if (allNodes && clustered && binding.queue.isClustered())
+ if (added && allNodes && clustered && binding.queue.isClustered())
{
//Now we must wait for all the bindings to appear in state
//This is necessary since the second bind in an all bind is sent asynchronously to avoid deadlock
waitForBindUnbind(binding.queue.getName(), true);
}
+
+ return added;
}
- public void removeBinding(String queueName, boolean allNodes) throws Throwable
+ public Binding removeBinding(String queueName, boolean allNodes) throws Throwable
{
Binding binding = internalRemoveBinding(queueName, allNodes, true);
@@ -419,6 +420,8 @@
waitForBindUnbind(queueName, false);
}
+
+ return binding;
}
public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
@@ -595,7 +598,7 @@
Condition condition = conditionFactory.createCondition(mapping.getConditionText());
- addBindingInMemory(new Binding(condition, queue, mapping.isAllNodes()));
+ addBindingInMemory(new Binding(condition, queue, false));
if (mapping.isAllNodes())
{
@@ -608,8 +611,7 @@
Queue queue2 = new MessagingQueue(thisNodeID, mapping.getQueueName(), channelID, ms, pm,
mapping.isRecoverable(), mapping.getMaxSize(), filter,
- mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(), true,
- mapping.isPreserveOrdering());
+ mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(), true);
Binding localBinding = new Binding(condition, queue2, true);
@@ -644,6 +646,8 @@
lock.readLock().acquire();
+ log.info(this + " getting state");
+
try
{
Iterator iter = nameMaps.values().iterator();
@@ -665,9 +669,21 @@
{
String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
- MappingInfo mapping = new MappingInfo(queue.getNodeID(), queue.getName(), binding.condition.toText(),
- filterString, queue.getChannelID(), queue.isRecoverable(),
- true, binding.allNodes);
+ MappingInfo mapping;
+
+ if (binding.allNodes)
+ {
+ mapping = new MappingInfo(queue.getNodeID(), queue.getName(), binding.condition.toText(), filterString,
+ queue.getChannelID(), queue.isRecoverable(), true, true,
+ queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
+ queue.getMaxSize());
+ }
+ else
+ {
+ mapping = new MappingInfo(queue.getNodeID(), queue.getName(), binding.condition.toText(),
+ filterString, queue.getChannelID(), queue.isRecoverable(),
+ true, false);
+ }
list.add(mapping);
}
}
@@ -786,6 +802,7 @@
Condition condition = conditionFactory.createCondition(mapping.getConditionText());
+ //addBindingInMemory(new Binding(condition, queue, mapping.isAllNodes()));
addBindingInMemory(new Binding(condition, queue, false));
if (allNodes)
@@ -802,8 +819,7 @@
Queue queue2 = new MessagingQueue(thisNodeID, mapping.getQueueName(), channelID, ms, pm,
mapping.isRecoverable(), mapping.getMaxSize(), filter,
- mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(), true,
- mapping.isPreserveOrdering());
+ mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(), true);
//We must cast back asynchronously to avoid deadlock
boolean added = internalAddBinding(new Binding(condition, queue2, true), false, false);
@@ -1052,9 +1068,9 @@
Map map = new LinkedHashMap();
map.put("CREATE_POSTOFFICE_TABLE",
"CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER," +
- "QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), " +
+ "QUEUE_NAME VARCHAR(255), CONDITION VARCHAR(1023), " +
"SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, " +
- "CLUSTERED CHAR(1), ALL_NODES CHAR(1))");
+ "CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))");
return map;
}
@@ -1171,6 +1187,48 @@
// Private ------------------------------------------------------------------------------------
+ private void init()
+ {
+ mappings = new HashMap();
+
+ nameMaps = new HashMap();
+
+ channelIDMap = new HashMap();
+
+ nodeIDAddressMap = new ConcurrentHashMap();
+
+ if (clustered)
+ {
+ replicatedData = new HashMap();
+
+ failoverMap = new LinkedHashMap();
+
+ leftSet = new ConcurrentHashSet();
+ }
+ }
+
+ private void deInit()
+ {
+ mappings = null;
+
+ nameMaps = null;
+
+ channelIDMap = null;
+
+ nodeIDAddressMap = null;
+
+ if (clustered)
+ {
+ replicatedData = null;
+
+ failoverMap = null;
+
+ leftSet = null;
+ }
+
+ }
+
+
private void waitForBindUnbind(String queueName, boolean bind) throws Exception
{
if (trace) { log.trace(this + " waiting for " + (bind ? "bind" : "unbind") + " of "+ queueName + " on all nodes"); }
@@ -1236,7 +1294,9 @@
try
{
if (trace) { log.trace(this + " waiting for bind unbind lock"); }
+
waitForBindUnbindLock.wait(groupMember.getCastTimeout());
+
if (trace) { log.trace(this + " woke up"); }
}
catch (InterruptedException e)
@@ -1285,14 +1345,14 @@
}
//The binding might already exist - this could happen if the queue is bind all simultaneously from more than one node of the cluster
- boolean added = addBindingInMemory(binding);
-
+ boolean added = addBindingInMemory(binding);
+
if (added)
{
if (queue.isRecoverable())
{
// Need to write the mapping to the database
- insertBindingInStorage(condition, queue, allNodes);
+ insertBindingInStorage(condition, queue, binding.allNodes);
}
if (clustered && queue.isClustered())
@@ -1301,10 +1361,9 @@
MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
queue.isRecoverable(), true,
- allNodes,
+ binding.allNodes,
queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
- queue.getMaxSize(),
- queue.isPreserveOrdering());
+ queue.getMaxSize());
ClusterRequest request = new BindRequest(info, allNodes);
@@ -1383,12 +1442,19 @@
{
Map nameMap = (Map)iter.next();
- Binding binding = (Binding)nameMap.get(queueName);
-
- if (binding != null)
+ if (queueName != null)
{
- bindings.add(binding);
+ Binding binding = (Binding)nameMap.get(queueName);
+
+ if (binding != null)
+ {
+ bindings.add(binding);
+ }
}
+ else
+ {
+ bindings.addAll(nameMap.values());
+ }
}
return bindings;
@@ -1429,8 +1495,6 @@
if (trace) { log.trace(this + " considering queue " + queue); }
- //TODO optimise this
-
if (queue.getNodeID() == thisNodeID)
{
if (trace) { log.trace(this + " is a local queue"); }
@@ -1583,8 +1647,6 @@
if (nameMap == null)
{
- log.warn("Cannot find name maps for node " + nodeID);
-
return null;
}
@@ -1592,7 +1654,6 @@
if (binding == null)
{
- log.warn("Cannot find binding for queue name " + queueName);
return null;
}
@@ -1620,6 +1681,13 @@
throw new IllegalStateException("Cannot find queues in condition map for condition " + binding.condition);
}
+ Iterator i = queues.iterator();
+ while (i.hasNext())
+ {
+ Queue q = (Queue)i.next();
+ log.info("q:" + q);
+ }
+
boolean removed = queues.remove(binding.queue);
if (!removed)
@@ -1661,8 +1729,6 @@
if (nameMap != null && nameMap.containsKey(queue.getName()))
{
- log.warn("Name map for node " + nid + " already contains binding for queue " + queue.getName());
-
return false;
}
@@ -1849,8 +1915,7 @@
queue.isRecoverable(), true,
binding.allNodes,
queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
- queue.getMaxSize(),
- queue.isPreserveOrdering());
+ queue.getMaxSize());
ClusterRequest request = new BindRequest(info, binding.allNodes);
@@ -2196,8 +2261,6 @@
throw new IllegalStateException("Queue " + queue.getName() + " is not clustered!");
}
- log.info("**** removing old queue with channel id " + queue.getChannelID());
-
//Remove from the in-memory map - no need to broadcast anything - they will get removed from other nodes in memory
//maps when the other nodes detect failure
removeBindingInMemory(binding.queue.getNodeID(), binding.queue.getName());
@@ -2219,8 +2282,6 @@
{
Binding b = (Binding)localNameMap.get(queue.getName());
localQueue = b.queue;
-
- log.info("Found a local queue with channel id " + localQueue.getChannelID());
}
if (localQueue != null)
Modified: trunk/tests/src/org/jboss/test/messaging/core/NonRecoverableMessagingQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/NonRecoverableMessagingQueueTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/NonRecoverableMessagingQueueTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -52,7 +52,7 @@
{
super.setUp();
- queue = new MessagingQueue(1, "queue1", 1, ms, pm, false, -1, null, false, false);
+ queue = new MessagingQueue(1, "queue1", 1, ms, pm, false, -1, null, false);
queue.activate();
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/RecoverableMessagingQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/RecoverableMessagingQueueTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/RecoverableMessagingQueueTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -51,7 +51,7 @@
{
super.setUp();
- queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, false, false);
+ queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, false);
queue.activate();
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -62,10 +62,10 @@
public void testChannelShareNP_2PC() throws Throwable
{
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue1.activate();
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false);
queue2.activate();
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -61,10 +61,10 @@
public void test1() throws Throwable
{
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue1.activate();
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false);
queue2.activate();
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -62,10 +62,10 @@
public void testChannelShareNP_Transactional() throws Throwable
{
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue1.activate();
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false);
queue2.activate();
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -57,10 +57,10 @@
public void test1() throws Throwable
{
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue1.activate();
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false);
queue2.activate();
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -61,10 +61,10 @@
public void test1() throws Throwable
{
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue1.activate();
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false);
queue2.activate();
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -61,10 +61,10 @@
public void test1() throws Throwable
{
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue1.activate();
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", 2, ms, pm, true, -1, null, 50, 10, 5, false);
queue2.activate();
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -61,7 +61,7 @@
public void testPaging() throws Exception
{
- MessagingQueue p = new MessagingQueue(1, "queue0", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue p = new MessagingQueue(1, "queue0", 1, ms, pm, true, -1, null, 100, 20, 10, false);
p.activate();
CoreMessage m = null;
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -64,7 +64,7 @@
public void test1() throws Throwable
{
- MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue.activate();
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -63,7 +63,7 @@
public void test1() throws Throwable
{
- MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue.activate();
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -64,7 +64,7 @@
public void test1() throws Throwable
{
- MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue.activate();
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -64,7 +64,7 @@
public void test1() throws Throwable
{
- MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue.activate();
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -63,7 +63,7 @@
public void test1() throws Throwable
{
- MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue.activate();
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -64,7 +64,7 @@
public void test1() throws Throwable
{
- MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue.activate();
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -65,7 +65,7 @@
public void testRecoverableQueueCrash() throws Throwable
{
- MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue.activate();
Message[] msgs = new Message[200];
@@ -127,7 +127,7 @@
tr.start();
- MessagingQueue queue2 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, 100, 20, 10, false);
queue2.activate();
queue2.deactivate();
@@ -164,7 +164,7 @@
{
//Non recoverable queue - eg temporary queue
- MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, false, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, false, -1, null, 100, 20, 10, false);
queue.activate();
Message[] msgs = new Message[200];
@@ -226,7 +226,7 @@
tr = new TransactionRepository(pm, ms, idm);
tr.start();
- MessagingQueue queue2 = new MessagingQueue(1, "queue1", 1, ms, pm, false, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue1", 1, ms, pm, false, -1, null, 100, 20, 10, false);
queue2.activate();
queue2.deactivate();
@@ -257,7 +257,7 @@
{
//Non recoverable queue - eg temporary queue
- MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, false, -1, null, 100, 20, 10, false, false);
+ MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, false, -1, null, 100, 20, 10, false);
queue.activate();
Message[] msgs = new Message[200];
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/ClusteredPostOfficeTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -23,16 +23,22 @@
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.jboss.messaging.core.contract.Binding;
import org.jboss.messaging.core.contract.Condition;
+import org.jboss.messaging.core.contract.Message;
+import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.core.contract.Queue;
import org.jboss.messaging.core.impl.MessagingQueue;
import org.jboss.test.messaging.core.PostOfficeTestBase;
import org.jboss.test.messaging.core.SimpleCondition;
+import org.jboss.test.messaging.core.SimpleFilter;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.util.CoreMessageFactory;
/**
*
@@ -187,33 +193,27 @@
// Start one office
office1 = createClusteredPostOffice(1, "testgroup");
-
- log.info("Created office1");
-
+
// Add a couple of queues
- Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue1.activate();
Condition condition1 = new SimpleCondition("topic1");
- office1.addBinding(new Binding(condition1, queue1, false), false);
-
- log.info("Added binding1");
-
- Queue queue2 = new MessagingQueue(1, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ boolean added = office1.addBinding(new Binding(condition1, queue1, false), false);
+ assertTrue(added);
+
+ Queue queue2 = new MessagingQueue(1, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue2.activate();
- office1.addBinding(new Binding(condition1, queue2, false), false);
-
- log.info("Added binding2");
-
+ added = office1.addBinding(new Binding(condition1, queue2, false), false);
+ assertTrue(added);
+
// Start another office - make sure it picks up the bindings from the first node
office2 = createClusteredPostOffice(2, "testgroup");
-
- log.info("Created office 2");
-
+
// Should return all queues
Collection queues = office2.getQueuesForCondition(condition1, false);
assertNotNull(queues);
@@ -224,10 +224,11 @@
// Add another queue on node 2
- Queue queue3 = new MessagingQueue(2, "sub3", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue3 = new MessagingQueue(2, "sub3", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue3.activate();
- office2.addBinding(new Binding(condition1, queue3, false), false);
+ added = office2.addBinding(new Binding(condition1, queue3, false), false);
+ assertTrue(added);
// Make sure both nodes pick it up
@@ -248,10 +249,11 @@
// Add another binding on node 2
- Queue queue4 = new MessagingQueue(2, "sub4", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue4 = new MessagingQueue(2, "sub4", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue4.activate();
- office2.addBinding(new Binding(condition1, queue4, false), false);
+ added = office2.addBinding(new Binding(condition1, queue4, false), false);
+ assertTrue(added);
// Make sure both nodes pick it up
@@ -272,10 +274,12 @@
assertTrue(queues.contains(queue4));
// Unbind binding 1 and binding 2
- office1.removeBinding(queue1.getName(), false);
+ Binding removed = office1.removeBinding(queue1.getName(), false);
+ assertNotNull(removed);
+
+ removed = office1.removeBinding(queue2.getName(), false);
+ assertNotNull(removed);
- office1.removeBinding(queue2.getName(), false);
-
// Make sure bindings are not longer available on either node
queues = office1.getQueuesForCondition(condition1, false);
@@ -305,10 +309,11 @@
// Add another binding on node 3
- Queue queue5 = new MessagingQueue(3, "sub5", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue5 = new MessagingQueue(3, "sub5", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue5.activate();
- office3.addBinding(new Binding(condition1, queue5, false), false);
+ added = office3.addBinding(new Binding(condition1, queue5, false), false);
+ assertTrue(added);
// Make sure all nodes pick it up
@@ -335,15 +340,17 @@
// Add a durable and a non durable binding on node 1
- Queue queue6 = new MessagingQueue(1, "sub6", channelIDManager.getID(), ms, pm, true, -1, null, true, false);
+ Queue queue6 = new MessagingQueue(1, "sub6", channelIDManager.getID(), ms, pm, true, -1, null, true);
queue6.activate();
- office1.addBinding(new Binding(condition1, queue6, false), false);
+ added = office1.addBinding(new Binding(condition1, queue6, false), false);
+ assertTrue(added);
- Queue queue7 = new MessagingQueue(1, "sub7", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue7 = new MessagingQueue(1, "sub7", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue7.activate();
- office1.addBinding(new Binding(condition1, queue7, false), false);
+ added = office1.addBinding(new Binding(condition1, queue7, false), false);
+ assertTrue(added);
// Make sure all nodes pick them up
@@ -375,7 +382,6 @@
assertTrue(queues.contains(queue6));
assertTrue(queues.contains(queue7));
- log.info("****** stopping office1");
// Stop office 1
office1.stop();
@@ -409,9 +415,9 @@
assertTrue(queues.contains(queue5));
// Restart office 1 and office 2
- office1 = createClusteredPostOffice(1, "testgroup");
+ office1.start();
- office2 = createClusteredPostOffice(2, "testgroup");
+ office2.start();
queues = office1.getQueuesForCondition(condition1, false);
assertNotNull(queues);
@@ -438,9 +444,9 @@
office3.stop();
// Start them all
- office1 = createClusteredPostOffice(1, "testgroup");
- office2 = createClusteredPostOffice(2, "testgroup");
- office3 = createClusteredPostOffice(3, "testgroup");
+ office1.start();
+ office2.start();
+ office3.start();
// Only the durable queue should survive
@@ -461,7 +467,8 @@
//Unbind it
- office1.removeBinding(queue6.getName(), false);
+ removed = office1.removeBinding(queue6.getName(), false);
+ assertNotNull(removed);
queues = office1.getQueuesForCondition(condition1, false);
assertNotNull(queues);
@@ -478,24 +485,27 @@
//Bind another few more clustered
- Queue queue8 = new MessagingQueue(1, "sub8", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue8 = new MessagingQueue(1, "sub8", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue8.activate();
- Queue queue9 = new MessagingQueue(2, "sub9", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue9 = new MessagingQueue(2, "sub9", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue9.activate();
- Queue queue10 = new MessagingQueue(2, "sub10", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue10 = new MessagingQueue(2, "sub10", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue10.activate();
//Bind on different conditions
- office1.addBinding(new Binding(condition1, queue8, false), false);
+ added = office1.addBinding(new Binding(condition1, queue8, false), false);
+ assertTrue(added);
- office2.addBinding(new Binding(condition1, queue9, false), false);
+ added = office2.addBinding(new Binding(condition1, queue9, false), false);
+ assertTrue(added);
Condition condition2 = new SimpleCondition("topic2");
- office2.addBinding(new Binding(condition2, queue10, false), false);
+ added = office2.addBinding(new Binding(condition2, queue10, false), false);
+ assertTrue(added);
queues = office1.getQueuesForCondition(condition1, false);
assertNotNull(queues);
@@ -517,15 +527,17 @@
//Now a couple of non clustered queues
- Queue queue11 = new MessagingQueue(1, "sub11", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ Queue queue11 = new MessagingQueue(1, "sub11", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue11.activate();
- Queue queue12 = new MessagingQueue(2, "sub12", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ Queue queue12 = new MessagingQueue(2, "sub12", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue12.activate();
- office1.addBinding(new Binding(condition1, queue11, false), false);
+ added = office1.addBinding(new Binding(condition1, queue11, false), false);
+ assertTrue(added);
- office2.addBinding(new Binding(condition1, queue12, false), false);
+ added = office2.addBinding(new Binding(condition1, queue12, false), false);
+ assertTrue(added);
queues = office1.getQueuesForCondition(condition1, false);
assertNotNull(queues);
@@ -571,205 +583,880 @@
}
}
+ /*
+ * Bind / Unbind all tests
+ *
+ * 1.
+ * a) queue is not known by cluster
+ * b) bind all
+ * c) verify all nodes get queue
+ * d) unbind - verify unbound from all nodes
+ * e) close down all nodes
+ * f) start all nodes
+ * g) verify queue is not known
+ *
+ * 2.
+ * a) queue is known by cluster
+ * b) bind all
+ * c) verify nothing changes on cluster
+ *
+ * 3
+ * a) start one node
+ * b) queue is not known to cluster
+ * c) bind all
+ * d) start other nodes
+ * d) verify other nodes pick it up
+ *
+ * 4
+ * a) start one node
+ * b) queue is not known to cluster
+ * c) bind all
+ * d) shutdown all nodes
+ * e) startup all nodes
+ * f) verify queue is on all nodes
+ *
+ * 5
+ * a) start one node
+ * b) queue is not known
+ * c) bind all
+ * d) shutdown node
+ * e) start other nodes
+ * f) verify queue is not known
+ * g) restart first node, verify queue is now known
+ *
+ * 6
+ *
+ * non durable bind all
+ * a) bind all non durable
+ * b) make sure is picked up by all nodes
+ * c) close down all nodes
+ * d) restart them all - make sure is not there
+ * e) bind again
+ * f) make sure is picked up
+ * g) take down one node
+ * h) bring it back up
+ * i) make sure it has quuee again
+ */
-
- public final void testClusteredBindUnbindAll() throws Throwable
+ public void testBindUnbindAll1() throws Throwable
{
+ /*
+ * 1.
+ * a) queue is not known by cluster
+ * b) bind all
+ * c) verify all nodes get queue
+ * d) unbind - verify unbound from all nodes
+ * e) close down all nodes
+ * f) start all nodes
+ * g) verify queue is not known
+ * */
+
PostOffice office1 = null;
PostOffice office2 = null;
PostOffice office3 = null;
try
- {
- // Start one office
-
- log.info("Creating office1");
-
+ {
office1 = createClusteredPostOffice(1, "testgroup");
-
- log.info("Created office1");
-
- Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ office2 = createClusteredPostOffice(2, "testgroup");
+ office3 = createClusteredPostOffice(3, "testgroup");
+
+ //Durable
+ Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
queue1.activate();
+
+ Condition condition1 = new SimpleCondition("topic1");
- Condition condition1 = new SimpleCondition("condition1");
+ //Add all binding
+ boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
+ assertTrue(added);
- office1.addBinding(new Binding(condition1, queue1, false), true);
+ Thread.sleep(1000);
+
+ Collection bindings = office1.getAllBindings();
+ assertGotAll(1, bindings, queue1.getName());
- Collection queues = office1.getQueuesForCondition(condition1, false);
-
- assertNotNull(queues);
+ bindings = office2.getAllBindings();
+ assertGotAll(2, bindings, queue1.getName());
- assertEquals(1, queues.size());
+ bindings = office3.getAllBindings();
+ assertGotAll(3, bindings, queue1.getName());
- assertTrue(queues.contains(queue1));
+ //Now unbind same node
- // Start another office -
+ Binding removed = office1.removeBinding(queue1.getName(), true);
+ assertNotNull(removed);
- log.info("creating office2");
- office2 = createClusteredPostOffice(2, "testgroup");
- log.info("created office2");
+ Thread.sleep(1000);
+ bindings = office1.getAllBindings();
+ assertTrue(bindings.isEmpty());
- Queue queue2 = new MessagingQueue(2, "sub2", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ bindings = office2.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office3.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ //Bind again different node
+ Queue queue2 = new MessagingQueue(2, "sub2", channelIDManager.getID(), ms, pm, true, -1, null, true);
queue2.activate();
- office2.addBinding(new Binding(condition1, queue2, false), true);
+ added = office2.addBinding(new Binding(condition1, queue2, true), true);
+ assertTrue(added);
- queues = office1.getQueuesForCondition(condition1, false);
+ Thread.sleep(1000);
- assertNotNull(queues);
+ bindings = office1.getAllBindings();
+ assertGotAll(1, bindings, queue2.getName());
- assertEquals(4, queues.size());
+ bindings = office2.getAllBindings();
+ assertGotAll(2, bindings, queue2.getName());
- assertTrue(queues.contains(queue1));
- assertTrue(queues.contains(queue2));
- Iterator iter = queues.iterator();
+ bindings = office3.getAllBindings();
+ assertGotAll(3, bindings, queue2.getName());
- // TODO - when a new node joins the cluster it has to locally bind any queues previously bodun with all on other nodes.
+ //Close down all nodes
- while (iter.hasNext())
+ office1.stop();
+
+ dumpNodeIDView(office2);
+
+ office2.stop();
+
+ dumpNodeIDView(office3);
+
+ office3.stop();
+
+ //Start all nodes
+
+ office1.start();
+ office2.start();
+ office3.start();
+
+ Thread.sleep(1000);
+
+ //Verify the binding is there
+
+ bindings = office1.getAllBindings();
+ assertGotAll(1, bindings, queue2.getName());
+
+ bindings = office2.getAllBindings();
+ assertGotAll(2, bindings, queue2.getName());
+
+ bindings = office3.getAllBindings();
+ assertGotAll(3, bindings, queue2.getName());
+
+ //Unbind different node
+
+ removed = office3.removeBinding(queue2.getName(), true);
+ assertNotNull(removed);
+
+ Thread.sleep(1000);
+
+ bindings = office1.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office2.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office3.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ if (checkNoBindingData())
{
- Queue queue = (Queue)iter.next();
-
- if (!queue.equals(queue1) && !queue.equals(queue2))
- {
- if (queue.getName().equals("sub1"))
- {
- assertEquals(2, queue.getNodeID());
- }
- else if (queue.getName().equals("sub2"))
- {
- assertEquals(1, queue.getNodeID());
- }
- else
- {
- fail("Invalid queue name " + queue.getName());
- }
- }
- }
+ fail("data still in database");
+ }
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ try
+ {
+ office1.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (office2 != null)
+ {
+ try
+ {
+ office2.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (office3 != null)
+ {
+ try
+ {
+ office3.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
+ }
+
+ public void testBindUnbindAll2() throws Throwable
+ {
+ /*
+ * a) queue is known by cluster
+ * b) bind all
+ * c) verify nothing changes on cluster
+ */
+
+ PostOffice office1 = null;
+ PostOffice office2 = null;
+ PostOffice office3 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup");
+ office3 = createClusteredPostOffice(3, "testgroup");
+
+ //Durable
+ Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queue1.activate();
+
+ Condition condition1 = new SimpleCondition("topic1");
- queues = office2.getQueuesForCondition(condition1, false);
+ //Add all binding
+ boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
+ assertTrue(added);
- assertNotNull(queues);
+ Thread.sleep(1000);
+
+ Collection bindings = office1.getAllBindings();
+ assertGotAll(1, bindings, queue1.getName());
- assertEquals(4, queues.size());
+ bindings = office2.getAllBindings();
+ assertGotAll(2, bindings, queue1.getName());
- assertTrue(queues.contains(queue1));
- assertTrue(queues.contains(queue2));
- iter = queues.iterator();
+ bindings = office3.getAllBindings();
+ assertGotAll(3, bindings, queue1.getName());
- // TODO - when a new node joins the cluster it has to locally bind any queues previously bodun with all on other nodes.
+ //Bind again
+ added = office1.addBinding(new Binding(condition1, queue1, true), true);
+ assertFalse(added);
- while (iter.hasNext())
+ Thread.sleep(1000);
+
+ bindings = office1.getAllBindings();
+ assertGotAll(1, bindings, queue1.getName());
+
+ bindings = office2.getAllBindings();
+ assertGotAll(2, bindings, queue1.getName());
+
+ bindings = office3.getAllBindings();
+ assertGotAll(3, bindings, queue1.getName());
+
+ //Now unbind same node
+
+ Binding removed = office1.removeBinding(queue1.getName(), true);
+ assertNotNull(removed);
+
+ removed = office1.removeBinding(queue1.getName(), true);
+ assertNull(removed);
+
+ Thread.sleep(1000);
+
+ bindings = office1.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office2.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office3.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ if (checkNoBindingData())
{
- Queue queue = (Queue)iter.next();
-
- if (!queue.equals(queue1) && !queue.equals(queue2))
- {
- if (queue.getName().equals("sub1"))
- {
- assertEquals(1, queue.getNodeID());
- }
- else if (queue.getName().equals("sub2"))
- {
- assertEquals(2, queue.getNodeID());
- }
- else
- {
- fail("Invalid queue name " + queue.getName());
- }
- }
- }
+ fail("data still in database");
+ }
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ try
+ {
+ office1.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (office2 != null)
+ {
+ try
+ {
+ office2.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (office3 != null)
+ {
+ try
+ {
+ office3.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
+ }
+
+ public void testBindUnbindAll3() throws Throwable
+ {
+ /* a) start one node
+ * b) queue is not known to cluster
+ * c) bind all
+ * d) start other nodes
+ * d) verify other nodes pick it up
+ */
+
+ PostOffice office1 = null;
+ PostOffice office2 = null;
+ PostOffice office3 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+
+ //Durable
+ Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queue1.activate();
+
+ Condition condition1 = new SimpleCondition("topic1");
+ //Add all binding
+ boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
+ assertTrue(added);
- office2.removeBinding("sub2", true);
+ Thread.sleep(1000);
+
+ office2 = createClusteredPostOffice(2, "testgroup");
+ office3 = createClusteredPostOffice(3, "testgroup");
+
+ Thread.sleep(1000);
- queues = office1.getQueuesForCondition(condition1, false);
+ Collection bindings = office1.getAllBindings();
+ assertGotAll(1, bindings, queue1.getName());
- assertNotNull(queues);
+ bindings = office2.getAllBindings();
+ assertGotAll(2, bindings, queue1.getName());
- assertEquals(2, queues.size());
+ bindings = office3.getAllBindings();
+ assertGotAll(3, bindings, queue1.getName());
+
+ //Unbind
- assertTrue(queues.contains(queue1));
- iter = queues.iterator();
+ Binding removed = office1.removeBinding(queue1.getName(), true);
+ assertNotNull(removed);
+ Thread.sleep(1000);
+
+ bindings = office1.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office2.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office3.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ if (checkNoBindingData())
+ {
+ fail("data still in database");
+ }
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ try
+ {
+ office1.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
- while (iter.hasNext())
+ if (office2 != null)
+ {
+ try
+ {
+ office2.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (office3 != null)
+ {
+ try
+ {
+ office3.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
+ }
+
+ public void testBindUnbindAll4() throws Throwable
+ {
+ /* a) start one node
+ * b) queue is not known to cluster
+ * c) bind all
+ * d) shutdown all nodes
+ * e) startup all nodes
+ * f) verify queue is on all nodes
+ */
+
+ PostOffice office1 = null;
+ PostOffice office2 = null;
+ PostOffice office3 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+
+ //Durable
+ Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queue1.activate();
+
+ Condition condition1 = new SimpleCondition("topic1");
+
+ //Add all binding
+ boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
+ assertTrue(added);
+
+ Thread.sleep(1000);
+
+ office1.stop();
+
+ //office1 = createClusteredPostOffice(1, "testgroup");
+ office1.start();
+ office2 = createClusteredPostOffice(2, "testgroup");
+ office3 = createClusteredPostOffice(3, "testgroup");
+
+ Thread.sleep(1000);
+
+ Collection bindings = office1.getAllBindings();
+ assertGotAll(1, bindings, queue1.getName());
+
+ bindings = office2.getAllBindings();
+ assertGotAll(2, bindings, queue1.getName());
+
+ bindings = office3.getAllBindings();
+ assertGotAll(3, bindings, queue1.getName());
+
+ //Now unbind same node
+
+ Binding removed = office1.removeBinding(queue1.getName(), true);
+ assertNotNull(removed);
+
+ Thread.sleep(1000);
+
+ bindings = office1.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office2.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office3.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+
+ if (checkNoBindingData())
{
- Queue queue = (Queue)iter.next();
-
- if (!queue.equals(queue1))
- {
- if (queue.getName().equals("sub1"))
- {
- assertEquals(2, queue.getNodeID());
- }
- else
- {
- fail("Invalid queue name " + queue.getName());
- }
- }
- }
+ fail("data still in database");
+ }
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ try
+ {
+ office1.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (office2 != null)
+ {
+ try
+ {
+ office2.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (office3 != null)
+ {
+ try
+ {
+ office3.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
+ }
+
+ public void testBindUnbindAll5() throws Throwable
+ {
+ /*
+ * a) start one node
+ * b) queue is not known
+ * c) bind all
+ * d) shutdown node
+ * e) start other nodes
+ * f) verify queue is not known
+ * g) restart first node, verify queue is now known
+ * */
+
+ PostOffice office1 = null;
+ PostOffice office2 = null;
+ PostOffice office3 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+
+ //Durable
+ Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, true, -1, null, true);
+ queue1.activate();
+
+ Condition condition1 = new SimpleCondition("topic1");
- queues = office2.getQueuesForCondition(condition1, false);
+ //Add all binding
+ boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
+ assertTrue(added);
- assertNotNull(queues);
+ Thread.sleep(1000);
- assertEquals(2, queues.size());
+ office1.stop();
- assertTrue(queues.contains(queue1));
- iter = queues.iterator();
+ office2 = createClusteredPostOffice(2, "testgroup");
+ office3 = createClusteredPostOffice(3, "testgroup");
+ Collection bindings = office2.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office3.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ office1.start();
+
+ Thread.sleep(1000);
+
+ bindings = office1.getAllBindings();
+ assertGotAll(1, bindings, queue1.getName());
+
+ bindings = office2.getAllBindings();
+ assertGotAll(2, bindings, queue1.getName());
+
+ bindings = office3.getAllBindings();
+ assertGotAll(3, bindings, queue1.getName());
+
+ //Now unbind same node
+
+ Binding removed = office1.removeBinding(queue1.getName(), true);
+ assertNotNull(removed);
- while (iter.hasNext())
- {
- Queue queue = (Queue)iter.next();
-
- if (!queue.equals(queue1))
- {
- if (queue.getName().equals("sub1"))
- {
- assertEquals(1, queue.getNodeID());
- }
- else
- {
- fail("Invalid queue name " + queue.getName());
- }
- }
- }
+ Thread.sleep(1000);
- office2.removeBinding("sub1", true);
+ bindings = office1.getAllBindings();
+ assertTrue(bindings.isEmpty());
- queues = office2.getQueuesForCondition(condition1, false);
+ bindings = office2.getAllBindings();
+ assertTrue(bindings.isEmpty());
- assertNotNull(queues);
+ bindings = office3.getAllBindings();
+ assertTrue(bindings.isEmpty());
- assertTrue(queues.isEmpty());
+ if (checkNoBindingData())
+ {
+ fail("data still in database");
+ }
}
finally
{
- if (office1 != null)
- {
- office1.stop();
- }
+ if (office1 != null)
+ {
+ try
+ {
+ office1.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (office2 != null)
+ {
+ try
+ {
+ office2.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (office3 != null)
+ {
+ try
+ {
+ office3.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
+ }
+
+ public void testBindUnbindAll6() throws Throwable
+ {
+ /*
+ * 1.
+ * a) bind all non durable
+ * b) make sure is picked up by all nodes
+ * c) close down all nodes
+ * d) restart them all - make sure is not there
+ * e) bind again
+ * f) make sure is picked up
+ * g) take down one node
+ * h) bring it back up
+ * i) make sure it has quuee again
+ * */
+
+ PostOffice office1 = null;
+ PostOffice office2 = null;
+ PostOffice office3 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice(1, "testgroup");
+ office2 = createClusteredPostOffice(2, "testgroup");
+ office3 = createClusteredPostOffice(3, "testgroup");
+
+ //Durable
+ Queue queue1 = new MessagingQueue(1, "sub1", channelIDManager.getID(), ms, pm, false, -1, null, true);
+ queue1.activate();
+
+ Condition condition1 = new SimpleCondition("topic1");
- if (office2 != null)
- {
- office2.stop();
- }
+ //Add all binding
+ boolean added = office1.addBinding(new Binding(condition1, queue1, true), true);
+ assertTrue(added);
- if (office3 != null)
- {
- office3.stop();
- }
+ Thread.sleep(1000);
+
+ Collection bindings = office1.getAllBindings();
+ assertGotAll(1, bindings, queue1.getName());
+ bindings = office2.getAllBindings();
+ assertGotAll(2, bindings, queue1.getName());
+
+ bindings = office3.getAllBindings();
+ assertGotAll(3, bindings, queue1.getName());
+
+ office1.stop();
+ office2.stop();
+ office3.stop();
+
+ office1.start();
+ office2.start();
+ office3.start();
+
+ Thread.sleep(1000);
+
+ bindings = office1.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office2.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ bindings = office3.getAllBindings();
+ assertTrue(bindings.isEmpty());
+
+ added = office1.addBinding(new Binding(condition1, queue1, true), true);
+ assertTrue(added);
+
+ Thread.sleep(1000);
+
+ bindings = office1.getAllBindings();
+ assertGotAll(1, bindings, queue1.getName());
+
+ bindings = office2.getAllBindings();
+ assertGotAll(2, bindings, queue1.getName());
+
+ bindings = office3.getAllBindings();
+ assertGotAll(3, bindings, queue1.getName());
+
+ office3.stop();
+
+ Thread.sleep(1000);
+
+ bindings = office1.getAllBindings();
+ assertEquals(2, bindings.size());
+
+ office3.start();
+
+ Thread.sleep(1000);
+
+ bindings = office1.getAllBindings();
+ assertGotAll(1, bindings, queue1.getName());
+
+ bindings = office2.getAllBindings();
+ assertGotAll(2, bindings, queue1.getName());
+
+ bindings = office3.getAllBindings();
+ assertGotAll(3, bindings, queue1.getName());
+
+
if (checkNoBindingData())
{
fail("data still in database");
- }
+ }
}
+ finally
+ {
+ if (office1 != null)
+ {
+ try
+ {
+ office1.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (office2 != null)
+ {
+ try
+ {
+ office2.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (office3 != null)
+ {
+ try
+ {
+ office3.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
}
+
+ private void dumpNodeIDView(PostOffice postOffice)
+ {
+ Set view = postOffice.nodeIDView();
+
+ log.info("=== node id view ==");
+
+ Iterator iter = view.iterator();
+
+ while (iter.hasNext())
+ {
+ log.info("Node:" + iter.next());
+ }
+
+ log.info("==================");
+ }
+
+ private void assertGotAll(int nodeId, Collection bindings, String queueName)
+ {
+
+ log.info("============= dumping bindings ========");
+
+ Iterator iter = bindings.iterator();
+
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ log.info("Binding: " + binding);
+ }
+
+ log.info("========= end dump==========");
+
+ assertEquals(3, bindings.size());
+
+ iter = bindings.iterator();
+
+ boolean got1 = false;
+ boolean got2 = false;
+ boolean got3 = false;
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ log.info("binding node id " + binding.queue.getNodeID());
+
+ assertEquals(queueName, binding.queue.getName());
+ if (binding.queue.getNodeID() == nodeId)
+ {
+ assertTrue(binding.allNodes);
+ }
+ else
+ {
+ assertFalse(binding.allNodes);
+ }
+
+ if (binding.queue.getNodeID() == 1)
+ {
+ got1 = true;
+ }
+ if (binding.queue.getNodeID() == 2)
+ {
+ got2 = true;
+ }
+ if (binding.queue.getNodeID() == 3)
+ {
+ got3 = true;
+ }
+ }
+ assertTrue(got1 && got2 && got3);
+ }
+
+
+
//
// public final void testClusteredRoutePersistent() throws Throwable
// {
@@ -798,7 +1485,7 @@
//
// public void testClusteredPersistentRouteWithFilterNonRecoverable() throws Throwable
// {
-// this.clusteredRouteWithFilter(true, false);
+// this.clusteredRouteWithFilter(true);
// }
//
// public void testClusteredNonPersistentRouteWithFilterRecoverable() throws Throwable
@@ -813,7 +1500,7 @@
//
// public void testRouteSharedPointToPointQueuePersistentNonRecoverable() throws Throwable
// {
-// this.routeSharedQueue(true, false);
+// this.routeSharedQueue(true);
// }
//
// public void testRouteSharedPointToPointQueueNonPersistentNonRecoverable() throws Throwable
@@ -843,7 +1530,7 @@
//
// public void testRouteLocalQueuesPersistentNonRecoverable() throws Throwable
// {
-// this.routeLocalQueues(true, false);
+// this.routeLocalQueues(true);
// }
//
// public void testRouteLocalQueuesNonPersistentNonRecoverable() throws Throwable
@@ -862,10 +1549,6 @@
// }
- /*
- * We should allow the clustered bind of queues with the same queue name on different nodes of the
- * cluster
- */
public void testBindSameName() throws Throwable
{
PostOffice office1 = null;
@@ -878,47 +1561,40 @@
office2 = createClusteredPostOffice(2, "testgroup");
- Queue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue1.activate();
Condition condition1 = new SimpleCondition("queue1");
- office1.addBinding(new Binding(condition1, queue1, false), false);
+ boolean added = office1.addBinding(new Binding(condition1, queue1, false), false);
+ assertTrue(added);
- Queue queue2 = new MessagingQueue(2, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue2 = new MessagingQueue(2, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue2.activate();
- office2.addBinding(new Binding(condition1, queue2, false), false);
+ added = office2.addBinding(new Binding(condition1, queue2, false), false);
+ assertTrue(added);
- Queue queue3 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue3 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue3.activate();
- try
- {
- office1.addBinding(new Binding(condition1, queue3, false), false);
- fail();
- }
- catch (Exception e)
- {
- //Ok
- }
+ added = office1.addBinding(new Binding(condition1, queue3, false), false);
+ assertFalse(added);
- Queue queue4 = new MessagingQueue(2, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true, false);
+ Queue queue4 = new MessagingQueue(2, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, true);
queue4.activate();
- try
- {
- office2.addBinding(new Binding(condition1, queue4, false), false);
- fail();
- }
- catch (Exception e)
- {
- //Ok
- }
+ added = office2.addBinding(new Binding(condition1, queue4, false), false);
+ assertFalse(added);
- office1.removeBinding(queue1.getName(), false);
+ Binding removed = office1.removeBinding("does not exist", false);
+ assertNull(removed);
+
+ removed = office1.removeBinding(queue1.getName(), false);
+ assertNotNull(removed);
- office2.removeBinding(queue2.getName(), false);
+ removed = office2.removeBinding(queue2.getName(), false);
+ assertNotNull(removed);
}
finally
{
@@ -933,6 +1609,7 @@
}
}
}
+
// Package protected ----------------------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/PostOfficeTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -80,7 +80,7 @@
//Bind one durable
MessagingQueue queue1 =
- new MessagingQueue(1, "durableQueue", channelIDManager.getID(), ms, pm, true, -1, null, false, false);
+ new MessagingQueue(1, "durableQueue", channelIDManager.getID(), ms, pm, true, -1, null, false);
queue1.activate();
Condition condition1 = new SimpleCondition("condition1");
@@ -104,7 +104,7 @@
{
MessagingQueue queuexx =
- new MessagingQueue(777, "durableQueue", channelIDManager.getID(), ms, pm, true, -1, null, false, false);
+ new MessagingQueue(777, "durableQueue", channelIDManager.getID(), ms, pm, true, -1, null, false);
queuexx.activate();
office1.addBinding(new Binding(condition1, queuexx, false), false);
fail();
@@ -117,7 +117,7 @@
//Bind one non durable
MessagingQueue queue2 =
- new MessagingQueue(1, "nonDurableQueue", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ new MessagingQueue(1, "nonDurableQueue", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue2.activate();
Condition condition2 = new SimpleCondition("condition2");
@@ -219,44 +219,44 @@
Condition condition1 = new SimpleCondition("condition1");
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue1.activate();
office.addBinding(new Binding(condition1, queue1, false), false);
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue2.activate();
office.addBinding(new Binding(condition1, queue2, false), false);
- MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue3.activate();
office.addBinding(new Binding(condition1, queue3, false), false);
- MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue4.activate();
office.addBinding(new Binding(condition1, queue4, false), false);
- MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue5.activate();
Condition condition2 = new SimpleCondition("condition2");
office.addBinding(new Binding(condition2, queue5, false), false);
- MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue6.activate();
office.addBinding(new Binding(condition2, queue6, false), false);
- MessagingQueue queue7 = new MessagingQueue(1, "queue7", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue7 = new MessagingQueue(1, "queue7", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue7.activate();
office.addBinding(new Binding(condition2, queue7, false), false);
- MessagingQueue queue8 = new MessagingQueue(1, "queue8", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue8 = new MessagingQueue(1, "queue8", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue8.activate();
office.addBinding(new Binding(condition2, queue8, false), false);
@@ -341,15 +341,15 @@
Condition condition1 = new SimpleCondition("condition1");
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false);
office.addBinding(new Binding(condition1, queue1, false), false);
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false);
office.addBinding(new Binding(condition1, queue2, false), false);
Condition condition2 = new SimpleCondition("condition2");
- MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false);
office.addBinding(new Binding(condition2, queue3, false), false);
Binding b1 = office.getBindingForQueueName("queue1");
@@ -419,15 +419,15 @@
Condition condition1 = new SimpleCondition("condition1");
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false);
office.addBinding(new Binding(condition1, queue1, false), false);
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false);
office.addBinding(new Binding(condition1, queue2, false), false);
Condition condition2 = new SimpleCondition("condition2");
- MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false);
office.addBinding(new Binding(condition2, queue3, false), false);
Binding b1 = office.getBindingForChannelID(queue1.getChannelID());
@@ -527,22 +527,22 @@
Condition condition1 = new SimpleCondition("topic1");
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue1.activate();
postOffice.addBinding(new Binding(condition1, queue1, false), false);
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue2.activate();
postOffice.addBinding(new Binding(condition1, queue2, false), false);
- MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue3.activate();
postOffice.addBinding(new Binding(condition1, queue3, false), false);
- MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue4.activate();
Condition condition2 = new SimpleCondition("topic2");
@@ -550,12 +550,12 @@
postOffice.addBinding(new Binding(condition2, queue4, false), false);
- MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, false,-1, null, false, false);
+ MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, false,-1, null, false);
queue5.activate();
postOffice.addBinding(new Binding(condition2, queue5, false), false);
- MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue6.activate();
postOffice.addBinding(new Binding(condition2, queue6, false), false);
@@ -683,7 +683,7 @@
{
postOffice = createNonClusteredPostOffice();
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue1.activate();
postOffice.addBinding(new Binding(new SimpleCondition("condition1"), queue1, false), false);
@@ -740,17 +740,17 @@
Condition condition1 = new SimpleCondition("topic1");
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, filter, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, filter, false);
queue1.activate();
postOffice.addBinding(new Binding(condition1, queue1, false), false);
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue2.activate();
postOffice.addBinding(new Binding(condition1, queue2, false), false);
- MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue3.activate();
postOffice.addBinding(new Binding(condition1, queue3, false), false);
@@ -845,34 +845,34 @@
Condition condition1 = new SimpleCondition("topic1");
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue1.activate();
postOffice.addBinding(new Binding(condition1, queue1, false), false);
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue2.activate();
postOffice.addBinding(new Binding(condition1, queue2, false), false);
- MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue3 = new MessagingQueue(1, "queue3", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue3.activate();
postOffice.addBinding(new Binding(condition1, queue3, false), false);
- MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, true, -1, null, false, false);
+ MessagingQueue queue4 = new MessagingQueue(1, "queue4", channelIDManager.getID(), ms, pm, true, -1, null, false);
queue4.activate();
Condition condition2 = new SimpleCondition("topic2");
postOffice.addBinding(new Binding(condition2, queue4, false), false);
- MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, true, -1, null, false, false);
+ MessagingQueue queue5 = new MessagingQueue(1, "queue5", channelIDManager.getID(), ms, pm, true, -1, null, false);
queue5.activate();
postOffice.addBinding(new Binding(condition2, queue5, false), false);
- MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, true, -1, null, false, false);
+ MessagingQueue queue6 = new MessagingQueue(1, "queue6", channelIDManager.getID(), ms, pm, true, -1, null, false);
queue6.activate();
postOffice.addBinding(new Binding(condition2, queue6, false), false);
@@ -1023,12 +1023,12 @@
Condition condition1 = new SimpleCondition("topic1");
- MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false, false);
+ MessagingQueue queue1 = new MessagingQueue(1, "queue1", channelIDManager.getID(), ms, pm, false, -1, null, false);
queue1.activate();
postOffice.addBinding(new Binding(condition1, queue1, false), false);
- MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, true,-1, null, false, false);
+ MessagingQueue queue2 = new MessagingQueue(1, "queue2", channelIDManager.getID(), ms, pm, true,-1, null, false);
queue2.activate();
postOffice.addBinding(new Binding(condition1, queue2, false), false);
Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -0,0 +1,257 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ *
+ * We test every combination of the order of deployment of connection factory, local and remote queue
+ *
+ * and verify message sucking still works
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>2 Jul 2007
+ *
+ * $Id: $
+ *
+ */
+public class ClusterConnectionManagerTest extends ClusteringTestBase
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ClusterConnectionManagerTest(String name)
+ {
+ super(name);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public void test1() throws Exception
+ {
+ deployCF();
+
+ deployLocal();
+
+ deployRemote();
+
+ suck();
+ }
+
+ public void test2() throws Exception
+ {
+ deployCF();
+
+ deployRemote();
+
+ deployLocal();
+
+ suck();
+ }
+
+ public void test3() throws Exception
+ {
+ deployRemote();
+
+ deployCF();
+
+ deployLocal();
+
+ suck();
+ }
+
+ public void test4() throws Exception
+ {
+ deployRemote();
+
+ deployLocal();
+
+ deployCF();
+
+ suck();
+ }
+
+ public void test5() throws Exception
+ {
+ deployLocal();
+
+ deployRemote();
+
+ deployCF();
+
+ suck();
+ }
+
+ public void test6() throws Exception
+ {
+ deployLocal();
+
+ deployCF();
+
+ deployRemote();
+
+ suck();
+ }
+
+ private void deployCF() throws Exception
+ {
+ String cfName =
+ (String)ServerManagement.getServer(1).getAttribute(ServerManagement.getServerPeerObjectName(), "ClusterPullConnectionFactoryName");
+
+ //Deploy cf on node 1
+ ServerManagement.deployConnectionFactory(cfName, null, 150);
+ }
+
+ private void deployLocal() throws Exception
+ {
+ ServerManagement.deployQueue("suckQueue", 1);
+ }
+
+ private void deployRemote() throws Exception
+ {
+ ServerManagement.deployQueue("suckQueue", 0);
+ }
+
+ private void suck() throws Exception
+ {
+ InitialContext ic0 = new InitialContext(ServerManagement.getJNDIEnvironment(0));
+
+ Queue queue0 = (Queue)ic0.lookup("/queue/suckQueue");
+
+ InitialContext ic1 = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+
+ Queue queue1 = (Queue)ic1.lookup("/queue/suckQueue");
+
+ ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+
+ ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ Connection conn0 = null;
+
+ Connection conn1 = null;
+
+ try
+ {
+ conn0 = cf0.createConnection();
+
+ //Send some messages on node 0
+
+ final int NUM_MESSAGES = 100;
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess0.createProducer(queue0);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ //Consume them on node 1
+
+ conn1 = cf1.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ conn1.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ }
+
+ }
+
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ nodeCount = 2;
+ super.setUp();
+
+ log.debug("setup done");
+
+ //undeploy CF
+
+ String cfName =
+ (String)ServerManagement.getServer(1).getAttribute(ServerManagement.getServerPeerObjectName(), "ClusterPullConnectionFactoryName");
+
+ //undeploy cf on node 1
+ ServerManagement.undeployConnectionFactory(new ObjectName(cfName));
+ }
+
+ protected void tearDown() throws Exception
+ {
+ ServerManagement.undeployQueue("suckQueue", 0);
+
+ ServerManagement.undeployQueue("suckQueue", 1);
+
+ super.tearDown();
+ }
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -104,22 +104,16 @@
conn1 = cf.createConnection();
conn2 = cf.createConnection();
- log.info("Created connections");
-
checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- log.info("Created sessions");
-
MessageConsumer cons0 = sess0.createConsumer(queue[0]);
MessageConsumer cons1 = sess1.createConsumer(queue[1]);
MessageConsumer cons2 = sess2.createConsumer(queue[2]);
- log.info("Created consumers");
-
conn0.start();
conn1.start();
conn2.start();
@@ -139,8 +133,6 @@
prod0.send(tm);
}
- log.info("Sent messages");
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = (TextMessage)cons0.receive(1000);
@@ -239,25 +231,19 @@
//Send more messages at node 0
- log.info("Sending more at node 0");
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sess0.createTextMessage("message2-" + i);
prod0.send(tm);
}
-
- log.info("Sent messages");
-
+
// consume them on node2
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = (TextMessage)cons2.receive(1000);
-
- log.info("*** got message " + tm.getText());
-
+
assertNotNull(tm);
assertEquals("message2-" + i, tm.getText());
@@ -294,9 +280,7 @@
tm = (TextMessage)cons2.receive(1000);
if (tm != null)
- {
- log.info("*** got message " + tm.getText());
-
+ {
assertNotNull(tm);
msgs.add(tm.getText());
@@ -341,8 +325,6 @@
if (tm != null)
{
- log.info("*** got message " + tm.getText());
-
assertNotNull(tm);
msgs.add(tm.getText());
@@ -382,9 +364,7 @@
tm = (TextMessage)cons1.receive(1000);
if (tm != null)
- {
- log.info("*** got message " + tm.getText());
-
+ {
msgs.add(tm.getText());
count++;
@@ -398,9 +378,6 @@
if (tm != null)
{
- log.info("*** got message " + tm.getText());
-
-
msgs.add(tm.getText());
count++;
@@ -443,8 +420,6 @@
if (tm != null)
{
- log.info("*** got message " + tm.getText());
-
msgs.add(tm.getText());
count++;
@@ -458,9 +433,6 @@
if (tm != null)
{
-
- log.info("*** got message " + tm.getText());
-
msgs.add(tm.getText());
count++;
@@ -499,8 +471,6 @@
{
tm = (TextMessage)cons2.receive(1000);
- log.info("*** got message " + tm.getText());
-
assertNotNull(tm);
assertEquals("message5-" + i, tm.getText());
@@ -520,8 +490,6 @@
{
tm = (TextMessage)cons1.receive(1000);
- log.info("*** got message " + tm.getText());
-
assertNotNull(tm);
assertEquals("message5-" + i, tm.getText());
@@ -537,14 +505,11 @@
{
tm = (TextMessage)cons0.receive(1000);
- log.info("*** got message " + tm.getText());
-
assertNotNull(tm);
assertEquals("message5-" + i, tm.getText());
- }
+ }
-
}
finally
{
Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -0,0 +1,411 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.jmx.ServiceAttributeOverrides;
+
+/**
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>2 Jul 2007
+ *
+ * $Id: $
+ *
+ */
+public class PreserveOrderingTest extends ClusteringTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PreserveOrderingTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void testPreserveOrderingQueuePersistent() throws Exception
+ {
+ preserveOrderingQueue(true);
+ }
+
+ public void testPreserveOrderingQueueNonPersistent() throws Exception
+ {
+ preserveOrderingQueue(false);
+ }
+
+ public void testPreserveOrderingTopicPersistent() throws Exception
+ {
+ preserveOrderingDurableSub(true);
+ }
+
+ public void testPreserveOrderingTopicNonPersistent() throws Exception
+ {
+ preserveOrderingDurableSub(false);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ nodeCount = 3;
+
+ overrides = new ServiceAttributeOverrides();
+
+ overrides.put(new ObjectName("jboss.messaging:service=ServerPeer"), "DefaultPreserveOrdering", "true");
+
+ super.setUp();
+
+ log.debug("setup done");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ protected void preserveOrderingQueue(boolean persistent) throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ try
+ {
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn0 = cf.createConnection();
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Send at node 0
+
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message" + i);
+
+ prod0.send(tm);
+ }
+
+ log.info("Sent messages");
+
+ //Consume them on node1, but dont ack
+
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+ conn1.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Now close sess1- this will cancel the messages back to the queue
+
+ sess1.close();
+
+ //Now try and consume them back on node 0 - this should fail since we shouldn't be allowed to consume them back on node0
+
+ MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+
+ conn0.start();
+
+ Message m = cons0.receive(5000);
+
+ assertNull(m);
+
+ //Now try and consume them on node 2 - this should be fail too
+
+ Session sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+ conn2.start();
+
+ m = cons2.receive(5000);
+
+ assertNull(m);
+
+ //Finish them off on node 1
+
+ sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ cons1 = sess1.createConsumer(queue[1]);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+
+ if (i == NUM_MESSAGES - 1)
+ {
+ tm.acknowledge();
+ }
+ }
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ protected void preserveOrderingDurableSub(boolean persistent) throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ try
+ {
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn0 = cf.createConnection();
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+ conn0.setClientID("cl1");
+ conn1.setClientID("cl1");
+ conn2.setClientID("cl1");
+
+ log.info("Created connections");
+
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer sub0_1 = sess0.createDurableSubscriber(topic[0], "sub1");
+
+ MessageConsumer sub0_2 = sess0.createDurableSubscriber(topic[0], "sub2");
+
+ sub0_1.close();
+
+ sub0_2.close();
+
+
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer sub1_1 = sess1.createDurableSubscriber(topic[1], "sub1");
+
+ MessageConsumer sub1_2 = sess1.createDurableSubscriber(topic[1], "sub2");
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer sub2_1 = sess2.createDurableSubscriber(topic[2], "sub1");
+
+ MessageConsumer sub2_2 = sess2.createDurableSubscriber(topic[2], "sub2");
+
+ sub2_1.close();
+
+ sub2_2.close();
+
+ sess2.close();
+
+
+ // Send at node 0
+
+ MessageProducer prod0 = sess0.createProducer(topic[0]);
+
+ prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message" + i);
+
+ prod0.send(tm);
+ }
+
+ log.info("Sent messages");
+
+ //Consume them on node1, but dont ack
+
+ conn1.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)sub1_1.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)sub1_2.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Now close sess1- this will cancel the messages back to the queue
+
+ sess1.close();
+
+ //Now try and consume them back on node 0 - this should fail
+
+ sub0_1 = sess0.createDurableSubscriber(topic[0], "sub1");
+
+ sub0_2 = sess0.createDurableSubscriber(topic[0], "sub2");
+
+ conn0.start();
+
+ Message m = sub0_1.receive(5000);
+
+ assertNull(m);
+
+ m = sub0_2.receive(5000);
+
+ assertNull(m);
+
+ sess0.close();
+
+ //Now try and consume them on node 2 - this should be fail too
+
+ sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ sub2_1 = sess2.createDurableSubscriber(topic[2], "sub1");
+
+ sub2_2 = sess2.createDurableSubscriber(topic[2], "sub2");
+
+ conn2.start();
+
+ m = sub2_1.receive(5000);
+
+ assertNull(m);
+
+ m = sub2_2.receive(5000);
+
+ assertNull(m);
+
+ sess2.close();
+
+ //Finish them off on node 1
+
+ sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ sub1_1 = sess1.createDurableSubscriber(topic[1], "sub1");
+
+ sub1_2 = sess1.createDurableSubscriber(topic[1], "sub2");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)sub1_1.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+
+ if (i == NUM_MESSAGES - 1)
+ {
+ tm.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)sub1_2.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+
+ if (i == NUM_MESSAGES - 1)
+ {
+ tm.acknowledge();
+ }
+ }
+
+
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -39,6 +39,7 @@
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.jmx.ServiceAttributeOverrides;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
@@ -63,6 +64,8 @@
protected Context[] ic;
protected Queue queue[];
protected Topic topic[];
+
+ protected ServiceAttributeOverrides overrides;
// No need to have multiple conncetion factories since a clustered connection factory will create
// connections in a round robin fashion on different servers.
@@ -100,7 +103,7 @@
// make sure all servers are created and started; make sure that database is zapped
// ONLY for the first server, the others rely on values they expect to find in shared
// tables; don't clear the database for those.
- ServerManagement.start(i, config, i == 0);
+ ServerManagement.start(i, config, overrides, i == 0);
ServerManagement.deployQueue("testDistributedQueue", i);
ServerManagement.deployTopic("testDistributedTopic", i);
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-07-02 05:14:07 UTC (rev 2821)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-07-02 20:23:06 UTC (rev 2822)
@@ -751,13 +751,17 @@
config += "<attribute name=\"SupportsFailover\">" + supportsFailover + "</attribute>";
config += "<attribute name=\"SupportsLoadBalancing\">" + supportsLoadBalancing + "</attribute>";
- config += "<attribute name=\"JNDIBindings\"><bindings>";
-
- for(int i = 0; i < jndiBindings.length; i++)
+ if (jndiBindings != null)
{
- config += "<binding>" + jndiBindings[i] + "</binding>\n";
+ config += "<attribute name=\"JNDIBindings\"><bindings>";
+
+ for (int i = 0; i < jndiBindings.length; i++)
+ {
+ config += "<binding>" + jndiBindings[i] + "</binding>\n";
+ }
+ config += "</bindings></attribute>";
}
- config += "</bindings></attribute></mbean>";
+ config += "</mbean>";
MBeanConfigurationElement mc = new MBeanConfigurationElement(XMLUtil.stringToElement(config));
ObjectName on = sc.registerAndConfigureService(mc);
More information about the jboss-cvs-commits
mailing list