[jboss-cvs] JBoss Messaging SVN: r3337 - in branches/Branch_Stable: src/etc/server/default/deploy and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 16 07:22:59 EST 2007
Author: timfox
Date: 2007-11-16 07:22:58 -0500 (Fri, 16 Nov 2007)
New Revision: 3337
Added:
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
Removed:
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueDontUseXATest.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueUseXATest.java
Modified:
branches/Branch_Stable/docs/userguide/en/modules/c_configuration.xml
branches/Branch_Stable/docs/userguide/en/modules/configuration.xml
branches/Branch_Stable/src/etc/server/default/deploy/messaging-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml
branches/Branch_Stable/src/etc/xmdesc/ServerPeer-xmbean.xml
branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java
branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/DeliveryObserver.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Message.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleDeliveryObserver.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1153
Modified: branches/Branch_Stable/docs/userguide/en/modules/c_configuration.xml
===================================================================
--- branches/Branch_Stable/docs/userguide/en/modules/c_configuration.xml 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/docs/userguide/en/modules/c_configuration.xml 2007-11-16 12:22:58 UTC (rev 3337)
@@ -1,53 +1,93 @@
-<?xml version="1.0" encoding="UTF-8"?>
+<?xml version="1.0" encoding="UTF8"?>
<chapter id="c_configuration">
<title>JBoss Messaging Clustering Notes</title>
- <para>JBoss Messaging clustering should work out of the box in most cases
- with no configuration changes. It is however crucial that every node is
- assigned a unique server id, as specified in the installation guide.</para>
- <para>Every node deployed must have a unique id, including those in a
- particular LAN cluster, and also those only linked by mesage
- bridges.</para>
- <para>JBoss Messaging clusters JMS queues and topics transparently across
- the cluster. Messages sent to a distributed queue or topic on one node are
- consumable on other nodes. To designate that a particular destination is
- clustered simply set the clustered attribute in the destination deployment
- descriptor to true.</para>
- <para>JBoss Messaging balances messages between nodes, catering for faster
- or slower consumers to efficiently balance processing load across the
- cluster.</para>
- <para>JBoss Messaging durable subscrtiptions can also be clustered. This
- means multiple subscribers can consume from the same durable subscription
- from different nodes of the cluster. A durable subscription will be
- clustered if it's topic is clustered</para>
- <para>JBoss Messaging also supports clustered temporary topics and queues.
- All temporary topics and queues will be clustered if the post office is
- clustered</para>
- <para>If you don't want your nodes to participate in a cluster, or only
- have one non clustered server you can set the clustered attribute on the
- postoffice to false</para>
- <para>If you wish to apply strict JMS ordering to messages, such that a
- particular JMS consumer consumes messages in the same order as they were
- produced by a particular producer, you can set the DefaultPreserveOrdering
- attribute in the server peer to true. By default this is false. The
- side-effect of setting this to true is that messages cannot be distributed
- as freely around the cluster</para>
- <para>When pulling reliable messages from one node to another, JBoss
- Messaging can use client acnowledgement or an XA transaction. The default
- is client acknowledgement. Using XA transactions is a fairly heavyweight
- operation but ensures absolute once and only once delivery.</para>
- <para>If the call to send a persistent message to a persistent destination
- returns successfully with no exception, then you can be sure that the
- message was persisted. However if the call doesn't return successfully e.g.
- if an exception is thrown, then you *can't be sure the message wasn't
- persisted*. Since the failure might have occurred after persisting the
- message but before writing the response to the caller. This is a common
- attribute of any RPC type call: You can't tell by the call not returning
- that the call didn't actually succeed. Whether it's a web services call, an
- HTTP get request, an ejb invocation the same applies. The trick is to code
- your application so your operations are *idempotent* - i.e. they can be
- repeated without getting the system into an inconsistent state. With a
- message system you can do this on the application level, by checking for
- duplicate messages, and discarding them if they arrive. Duplicate checking
- is a very powerful technique that can remove the need for XA transactions
- in many cases.</para>
+
+ <section id="c_conf.serverpeerid">
+ <title>Unique server peer id</title>
+ <para>JBoss Messaging clustering should work out of the box in most cases
+ with no configuration changes. It is however crucial that every node is
+ assigned a unique server id, as specified in the installation guide.</para>
+ <para>Every node deployed must have a unique id, including those in a
+ particular LAN cluster, and also those only linked by mesage
+ bridges.</para>
+ </section>
+
+ <section id="c_conf.clustereddests">
+ <title>Clustered destinations</title>
+ <para>JBoss Messaging clusters JMS queues and topics transparently across
+ the cluster. Messages sent to a distributed queue or topic on one node are
+ consumable on other nodes. To designate that a particular destination is
+ clustered simply set the clustered attribute in the destination deployment
+ descriptor to true.</para>
+ <para>JBoss Messaging balances messages between nodes, catering for faster
+ or slower consumers to efficiently balance processing load across the
+ cluster.</para>
+ </section>
+
+ <section id="c_conf.clustereddursubs">
+ <title>Clustered durable subs</title>
+ <para>JBoss Messaging durable subscriptions can also be clustered. This
+ means multiple subscribers can consume from the same durable subscription
+ from different nodes of the cluster. A durable subscription will be
+ clustered if it's topic is clustered</para>
+ </section>
+
+ <section id="c_conf.clusteredtempdest">
+ <title>Clustered temporary destinations</title>
+ <para>JBoss Messaging also supports clustered temporary topics and queues.
+ All temporary topics and queues will be clustered if the post office is
+ clustered</para>
+ </section>
+
+ <section id="c_conf.nonclusteredserver">
+ <title>Non clustered servers</title>
+ <para>If you don't want your nodes to participate in a cluster, or only
+ have one non clustered server you can set the clustered attribute on the
+ postoffice to false</para>
+ </section>
+
+
+ <section id="c_conf.orderingincluster">
+ <title>Message ordering in the cluster</title>
+ <para>If you wish to apply strict JMS ordering to messages, such that a
+ particular JMS consumer consumes messages in the same order as they were
+ produced by a particular producer, you can set the DefaultPreserveOrdering
+ attribute in the server peer to true. By default this is false. The
+ sideeffect of setting this to true is that messages cannot be distributed
+ as freely around the cluster</para>
+ </section>
+
+
+ <section id="c_conf.idempotentops">
+ <title>Idempotent operations</title>
+ <para>If the call to send a persistent message to a persistent destination
+ returns successfully with no exception, then you can be sure that the
+ message was persisted. However if the call doesn't return successfully e.g.
+ if an exception is thrown, then you *can't be sure the message wasn't
+ persisted*. Since the failure might have occurred after persisting the
+ message but before writing the response to the caller. This is a common
+ attribute of any RPC type call: You can't tell by the call not returning
+ that the call didn't actually succeed. Whether it's a web services call, an
+ HTTP get request, an ejb invocation the same applies. The trick is to code
+ your application so your operations are *idempotent* i.e. they can be
+ repeated without getting the system into an inconsistent state. With a
+ message system you can do this on the application level, by checking for
+ duplicate messages, and discarding them if they arrive. Duplicate checking
+ is a very powerful technique that can remove the need for XA transactions
+ in many cases.</para>
+ </section>
+
+
+ <section id="c_conf.clusteredcfs">
+ <title>Clustered connection factories</title>
+ <para>If the supportsLoadBalancing attribute of the connection factory is set to true then consecutive create connection attempts will round robin between available servers. The first node to try is chosen randomly</para>
+ <para>If the supportsFailover attribute of the connection factory is set to true then automatic failover is enabled.
+ This will automatically failover from one server to another, transparently to the user, in case of failure.</para>
+ <para>If automatic failover is not required or you wish to do manual failover (JBoss MQ style) this can be set to false, and you can supply a standard JMS ExceptionListener on the connection which will be called in case of
+ connection failure. You would then need to manually close the connection, lookup a new connection factory from
+ HA JNDI and recreate the connection.</para>
+
+ </section>
+
+
</chapter>
\ No newline at end of file
Modified: branches/Branch_Stable/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_Stable/docs/userguide/en/modules/configuration.xml 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/docs/userguide/en/modules/configuration.xml 2007-11-16 12:22:58 UTC (rev 3337)
@@ -105,10 +105,6 @@
<attribute name="ClusterPullConnectionFactoryName">jboss.messaging.connectionfactory:service=ClusterPullConnectionFactory</attribute>
- <!-- Use XA when pulling persistent messages from a remote node to this one. -->
-
- <attribute name="UseXAForMessagePull">false</attribute>
-
<!-- When redistributing messages in the cluster. Do we need to preserve the order of messages received
by a particular consumer from a particular producer? -->
@@ -271,13 +267,6 @@
messages between nodes. You will not normally need to change
this.</para>
</section>
- <section id="conf.serverpeer.attributes.usexaformessagepull">
- <title>UseXAForMessagePull</title>
- <para>If true, then move a reliable message from one node to
- another in an XA transaction. Relaxing this gives better
- performance at the expense of some reliability. See the cluster
- configurations section for more details. Default is false.</para>
- </section>
<section id="conf.serverpeer.attributes.defaultpreserveordering">
<title>DefaultPreserveOrdering</title>
<para>If true, then strict JMS ordering is preserved in the
Modified: branches/Branch_Stable/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/messaging-service.xml 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/messaging-service.xml 2007-11-16 12:22:58 UTC (rev 3337)
@@ -81,10 +81,6 @@
<attribute name="ClusterPullConnectionFactoryName">jboss.messaging.connectionfactory:service=ClusterPullConnectionFactory</attribute>
- <!-- Use XA when pulling persistent messages from a remote node to this one. -->
-
- <attribute name="UseXAForMessagePull">false</attribute>
-
<!-- When redistributing messages in the cluster. Do we need to preserve the order of messages received
by a particular consumer from a particular producer? -->
Modified: branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-11-16 12:22:58 UTC (rev 3337)
@@ -61,7 +61,8 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF_MESSAGE_ID=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
- UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+ UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
Modified: branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-11-16 12:22:58 UTC (rev 3337)
@@ -62,6 +62,7 @@
SELECT_EXISTS_REF_MESSAGE_ID=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
Modified: branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-11-16 12:22:58 UTC (rev 3337)
@@ -66,6 +66,7 @@
SELECT_EXISTS_REF_MESSAGE_ID=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
Modified: branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-11-16 12:22:58 UTC (rev 3337)
@@ -62,6 +62,7 @@
SELECT_EXISTS_REF_MESSAGE_ID=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
Modified: branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-11-16 12:22:58 UTC (rev 3337)
@@ -67,6 +67,7 @@
SELECT_EXISTS_REF_MESSAGE_ID=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
Modified: branches/Branch_Stable/src/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_Stable/src/etc/xmdesc/ServerPeer-xmbean.xml 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/xmdesc/ServerPeer-xmbean.xml 2007-11-16 12:22:58 UTC (rev 3337)
@@ -181,13 +181,7 @@
<name>ClusterPullConnectionFactoryName</name>
<type>java.lang.String</type>
</attribute>
-
- <attribute access="read-write" getMethod="isUseXAForMessagePull" setMethod="setUseXAForMessagePull">
- <description>When pulling persistent messages from a remote durable queue to a local one, should XA be used?</description>
- <name>UseXAForMessagePull</name>
- <type>boolean</type>
- </attribute>
-
+
<attribute access="read-write" getMethod="isDefaultPreserveOrdering" setMethod="setDefaultPreserveOrdering">
<description>When pulling messages do we need to preserve the ordering of messages consumed from a particular producer, for a particular consumer?</description>
<name>DefaultPreserveOrdering</name>
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -89,9 +89,7 @@
long timeToLive = ((Long)args[4]).longValue();
boolean keepID = args.length>5? ((Boolean)args[5]).booleanValue() : false;
-
- String keptId = null;
-
+
// configure the message for sending, using attributes stored as metadata
ProducerState producerState = getProducerState(mi);
@@ -175,7 +173,7 @@
// Generate the message id
ConnectionState connectionState = (ConnectionState)sessionState.getParent();
- long id = 0;
+ long id = -1;
JBossMessage messageToSend;
boolean foreign = false;
@@ -229,14 +227,13 @@
// get the actual message
MessageProxy proxy = (MessageProxy)m;
+ m.setJMSDestination(destination);
+
if (keepID)
{
- keptId = m.getJMSMessageID();
+ id = proxy.getMessage().getMessageID();
}
-
- m.setJMSDestination(destination);
-
//The following line executed on the proxy should cause a copy to occur
//if it is necessary
proxy.setJMSMessageID(null);
@@ -249,15 +246,13 @@
// Set the new id
- id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
- messageToSend.setMessageId(id);
-
- if (keptId != null)
+ if (!keepID)
{
- messageToSend.setJMSMessageID(keptId);
+ id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
}
-
-
+
+ messageToSend.setMessageId(id);
+
// This only really used for BytesMessages and StreamMessages to reset their state
messageToSend.doBeforeSend();
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -342,7 +342,13 @@
{
((ClientClusteredConnectionFactoryDelegate)del).setFailoverMap(failoverMap);
}
- }
+ }
+
+ //Note we don't rebind at this point - we just update the maps.
+ //When a node joins or leaves, we first get the join/leave notification
+ //Then we'll get a subsequent connection factory deploy/undeploy
+ //Even when a node crashes we'll get this since the postoffice ensures replication removes
+ //are called in this event
}
else if ((notification.type == ClusterNotification.TYPE_REPLICATOR_PUT || notification.type == ClusterNotification.TYPE_REPLICATOR_REMOVE) &&
(notification.data instanceof String) && ((String)notification.data).startsWith(Replicator.CF_PREFIX))
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -57,6 +57,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Binding;
import org.jboss.messaging.core.contract.Delivery;
+import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.MessageStore;
import org.jboss.messaging.core.contract.PostOffice;
@@ -698,7 +699,7 @@
if (dest.isDirect())
{
//Route directly to queue - temp kludge for clustering
-
+
Binding binding = postOffice.getBindingForQueueName(dest.getName());
if (binding == null)
@@ -708,7 +709,9 @@
Queue queue = binding.queue;
- Delivery del = queue.handle(null, ref, tx);
+ Long scid = (Long)ref.getMessage().removeHeader(Message.SOURCE_CHANNEL_ID);
+
+ Delivery del = queue.handleMove(ref, scid.longValue());
if (del == null)
{
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -438,6 +438,11 @@
// Package protected ----------------------------------------------------------------------------
+ boolean isRemote()
+ {
+ return this.remote;
+ }
+
boolean isReplicating()
{
return replicating;
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -1768,8 +1768,16 @@
return false;
}
- rec.del.acknowledge(null);
-
+ if (rec.getConsumer().isRemote())
+ {
+ //Optimisation for shared DB - we don't ack in DB - we move
+ rec.del.getObserver().acknowledgeNoPersist(rec.del);
+ }
+ else
+ {
+ rec.del.acknowledge(null);
+ }
+
//Now replicate the ack
if (rec.replicating && replicating)
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/DeliveryObserver.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/DeliveryObserver.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/DeliveryObserver.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -32,6 +32,8 @@
public interface DeliveryObserver
{
void acknowledge(Delivery d, Transaction tx) throws Throwable;
+
+ void acknowledgeNoPersist(Delivery d) throws Throwable;
void cancel(Delivery d) throws Throwable;
}
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Message.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Message.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Message.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -43,8 +43,15 @@
* The header is checked when sucking messages and if order preservation is true then the message is not accepted.
* This is a basic way of ensuring message order is preserved.
*/
- public static final String CLUSTER_SUCKED = "CLUSTER_SUCKED";
+ public static final String CLUSTER_SUCKED = "SUCKED";
+ /**
+ * This header is set on a message when it is sucked from one node to another.
+ * If the header exists on the destination node, and the message is persistent, the message
+ * will be moved from one channel to the other by doing a simple database update
+ */
+ public static final String SOURCE_CHANNEL_ID = "SCID";
+
/**
* @return The unique id of the message
*/
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -40,6 +40,8 @@
public interface PersistenceManager extends MessagingComponent
{
void addReference(long channelID, MessageReference ref, Transaction tx) throws Exception;
+
+ void moveReference(long sourceChannelID, long destChannelID, MessageReference ref) throws Exception;
void removeReference(long channelID, MessageReference ref, Transaction tx) throws Exception;
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -91,4 +91,7 @@
Map getRecoveryArea();
int getRecoveryMapSize();
+
+ //Optimisation for shared database
+ Delivery handleMove(MessageReference ref, long sourceChannelID);
}
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -35,8 +35,8 @@
import org.jboss.messaging.core.contract.DeliveryObserver;
import org.jboss.messaging.core.contract.Distributor;
import org.jboss.messaging.core.contract.Filter;
+import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
-import org.jboss.messaging.core.contract.MessageStore;
import org.jboss.messaging.core.contract.PersistenceManager;
import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.messaging.core.impl.tx.TransactionException;
@@ -129,7 +129,62 @@
}
// Receiver implementation ----------------------------------------------------------------------
+
+ //Optimisation
+ public Delivery handleMove(MessageReference ref, long sourceChannelID)
+ {
+ if (!isActive())
+ {
+ if (trace) { log.trace(this + " is not active, returning null delivery for " + ref); }
+
+ return null;
+ }
+ checkClosed();
+
+ if (trace) { log.trace(this + " moving ref " + ref + " from channel " + sourceChannelID); }
+
+ if (maxSize != -1 && getMessageCount() >= maxSize)
+ {
+ //Have reached maximum size - will drop message
+
+ log.warn(this + " has reached maximum size, " + ref + " will be dropped");
+
+ return null;
+ }
+
+ // Each channel has its own copy of the reference
+ ref = ref.copy();
+
+ try
+ {
+ if (ref.getMessage().isReliable() && recoverable)
+ {
+ // Reliable message in a recoverable state - also add to db
+ if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
+
+ pm.moveReference(sourceChannelID, channelID, ref);
+ }
+
+ synchronized (lock)
+ {
+ addReferenceInMemory(ref);
+
+ deliverInternal();
+ }
+
+ messagesAdded.increment();
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to handle message", t);
+
+ return null;
+ }
+
+ return new SimpleDelivery(this, ref, true, false);
+ }
+
public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx)
{
if (!isActive())
@@ -141,7 +196,78 @@
checkClosed();
- return handleInternal(sender, ref, tx, true);
+ if (ref == null)
+ {
+ return null;
+ }
+
+ if (trace) { log.trace(this + " handles " + ref + (tx == null ? " non-transactionally" : " in transaction: " + tx)); }
+
+ if (maxSize != -1 && getMessageCount() >= maxSize)
+ {
+ //Have reached maximum size - will drop message
+
+ log.warn(this + " has reached maximum size, " + ref + " will be dropped");
+
+ return null;
+ }
+
+ // Each channel has its own copy of the reference
+ ref = ref.copy();
+
+ try
+ {
+ if (tx == null)
+ {
+ if (ref.getMessage().isReliable() && recoverable)
+ {
+ // Reliable message in a recoverable state - also add to db
+ if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
+
+ pm.addReference(channelID, ref, null);
+ }
+
+ // If the ref has a scheduled delivery time then we don't add to the in memory queue
+ // instead we create a timeout, so when that time comes delivery will attempted directly
+
+ if (!checkAndSchedule(ref))
+ {
+ synchronized (lock)
+ {
+ addReferenceInMemory(ref);
+
+ deliverInternal();
+ }
+ }
+ }
+ else
+ {
+ if (trace) { log.trace(this + " adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx)); }
+
+ // add to post commit callback
+ getCallback(tx).addRef(ref);
+
+ if (trace) { log.trace(this + " added transactionally " + ref + " in memory"); }
+
+ if (ref.getMessage().isReliable() && recoverable)
+ {
+ // Reliable message in a recoverable state - also add to db
+ if (trace) { log.trace(this + " adding " + ref + (tx == null ? " to database non-transactionally" : " in transaction: " + tx)); }
+
+ pm.addReference(channelID, ref, tx);
+ }
+ }
+
+ messagesAdded.increment();
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to handle message", t);
+
+ return null;
+ }
+
+ return new SimpleDelivery(this, ref, true, false);
}
// DeliveryObserver implementation --------------------------------------------------------------
@@ -152,6 +278,11 @@
acknowledgeInternal(d, tx, true);
}
+
+ public void acknowledgeNoPersist(Delivery d) throws Throwable
+ {
+ acknowledgeInternal(d, null, false);
+ }
public void cancel(Delivery del) throws Throwable
{
@@ -580,86 +711,7 @@
return false;
}
-
- protected Delivery handleInternal(DeliveryObserver sender, MessageReference ref,
- Transaction tx, boolean persist)
- {
- if (ref == null)
- {
- return null;
- }
-
- if (trace) { log.trace(this + " handles " + ref + (tx == null ? " non-transactionally" : " in transaction: " + tx)); }
-
- if (maxSize != -1 && getMessageCount() >= maxSize)
- {
- //Have reached maximum size - will drop message
- log.warn(this + " has reached maximum size, " + ref + " will be dropped");
-
- return null;
- }
-
- // Each channel has its own copy of the reference
- ref = ref.copy();
-
- try
- {
- if (tx == null)
- {
- if (persist && ref.getMessage().isReliable() && recoverable)
- {
- // Reliable message in a recoverable state - also add to db
- if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
-
- // TODO - this db access could safely be done outside the event loop
- pm.addReference(channelID, ref, null);
- }
-
- // If the ref has a scheduled delivery time then we don't add to the in memory queue
- // instead we create a timeout, so when that time comes delivery will attempted directly
-
- if (!checkAndSchedule(ref))
- {
- synchronized (lock)
- {
- addReferenceInMemory(ref);
-
- deliverInternal();
- }
- }
- }
- else
- {
- if (trace) { log.trace(this + " adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx)); }
-
- // add to post commit callback
- getCallback(tx).addRef(ref);
-
- if (trace) { log.trace(this + " added transactionally " + ref + " in memory"); }
-
- if (persist && ref.getMessage().isReliable() && recoverable)
- {
- // Reliable message in a recoverable state - also add to db
- if (trace) { log.trace(this + " adding " + ref + (tx == null ? " to database non-transactionally" : " in transaction: " + tx)); }
-
- pm.addReference(channelID, ref, tx);
- }
- }
-
- messagesAdded.increment();
- }
- catch (Throwable t)
- {
- log.error("Failed to handle message", t);
-
- return null;
- }
-
- return new SimpleDelivery(this, ref, true, false);
- }
-
-
protected boolean checkAndSchedule(MessageReference ref)
{
if (ref.getScheduledDeliveryTime() > System.currentTimeMillis())
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -1282,6 +1282,8 @@
public void addReference(final long channelID, final MessageReference ref,
final Transaction tx) throws Exception
{
+ if (trace) { log.trace("Adding reference " + ref + " in channel " + channelID + " tx " + tx); }
+
class AddReferenceRunner extends JDBCTxRunner2
{
public Object doTransaction() throws Exception
@@ -1351,7 +1353,45 @@
new AddReferenceRunner().executeWithRetry();
}
}
+
+ public void moveReference(final long sourceChannelID, final long destChannelID, final MessageReference ref)
+ throws Exception
+ {
+ if (trace) { log.trace("Moving reference " + ref + " from " + sourceChannelID + " to " + destChannelID); }
+
+ class MoveReferenceRunner extends JDBCTxRunner2
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement psReference = null;
+
+ try
+ {
+ psReference = conn.prepareStatement(getSQLStatement("MOVE_REFERENCE"));
+
+ psReference.setLong(1, destChannelID);
+ psReference.setLong(2, sourceChannelID);
+ psReference.setLong(3, ref.getMessage().getMessageID());
+ int rows = psReference.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Updated " + rows + " rows");
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(psReference);
+ }
+ }
+ }
+
+ new MoveReferenceRunner().executeWithRetry();
+ }
+
public void updateDeliveryCount(final long channelID,
final MessageReference ref) throws Exception
{
@@ -1394,6 +1434,8 @@
public void removeReference(final long channelID,
final MessageReference ref, final Transaction tx) throws Exception
{
+ if (trace) { log.trace("Removing reference " + ref + " in channel " + channelID + " tx " + tx); }
+
class RemoveReferenceRunner extends JDBCTxRunner2
{
public Object doTransaction() throws Exception
@@ -2376,7 +2418,6 @@
map.put("LOAD_REFS",
"SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' "
+ "AND CHANNEL_ID = ? ORDER BY ORD");
-
map.put("UPDATE_REFS_NOT_PAGED",
"UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?");
map.put("SELECT_MIN_MAX_PAGE_ORD",
@@ -2387,6 +2428,8 @@
"UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?");
map.put("UPDATE_CHANNEL_ID",
"UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?");
+ map.put("MOVE_REFERENCE",
+ "UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?");
// Message
map.put("LOAD_MESSAGES",
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -470,7 +470,25 @@
if (localQueue.isClustered())
{
- MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection, xa, preserveOrdering);
+ //Find channel id for remote queue - we need this for doing shared DB optimisation
+ Collection coll = this.postOffice.getAllBindingsForQueueName(queueName);
+ Iterator iter = coll.iterator();
+ long sourceChannelID = -1;
+ while (iter.hasNext())
+ {
+ Binding b = (Binding)iter.next();
+ if (b.queue.getNodeID() == nodeID)
+ {
+ sourceChannelID = b.queue.getChannelID();
+ }
+ }
+ if (sourceChannelID == -1)
+ {
+ throw new IllegalArgumentException("Cannot find source channel id");
+ }
+
+ MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection,
+ xa, preserveOrdering, sourceChannelID);
info.addSucker(sucker);
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -80,13 +80,15 @@
private boolean preserveOrdering;
+ private long sourceChannelID;
+
public String toString()
{
return "MessageSucker:" + System.identityHashCode(this) + " queue:" + localQueue.getName();
}
MessageSucker(Queue localQueue, JBossConnection sourceConnection, JBossConnection localConnection,
- boolean xa, boolean preserveOrdering)
+ boolean xa, boolean preserveOrdering, long sourceChannelID)
{
if (trace) { log.trace("Creating message sucker, localQueue:" + localQueue + " xa:" + xa + " preserveOrdering:" + preserveOrdering); }
@@ -96,10 +98,16 @@
this.localConnection = localConnection;
- this.xa = xa;
+ //this.xa = xa;
+ //XA is currently disabled for message sucking - this is because JBM 1.4.0 uses shared database so XA is
+ //unnecesary - we can move the ref from one channel to another with a database update
+ this.xa = false;
+
this.preserveOrdering = preserveOrdering;
+ this.sourceChannelID = sourceChannelID;
+
if (xa)
{
tm = TransactionManagerLocator.getInstance().locate();
@@ -125,8 +133,7 @@
JBossSession sess = (JBossSession)sourceConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
sourceSession = (SessionDelegate)sess.getDelegate();
-
-
+
sess = (JBossSession)localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
localSession = (SessionDelegate)sess.getDelegate();
@@ -244,6 +251,9 @@
try
{
+ /*
+ Commented out until JBM 2.0
+
boolean startTx = xa && msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT;
if (startTx)
@@ -263,12 +273,18 @@
if (trace) { log.trace("Started JTA transaction"); }
}
+ org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
+
if (preserveOrdering)
{
//Add a header saying we have sucked the message
- ((MessageProxy)msg).getMessage().putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
+ coreMessage.putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
}
+ //Add a header with the node id of the node we sucked from - this is used on the sending end to do
+ //the move optimisation
+ coreMessage.putHeader(org.jboss.messaging.core.contract.Message.SOURCE_CHANNEL_ID, sourceChannelID);
+
long timeToLive = msg.getJMSExpiration();
if (timeToLive != 0)
{
@@ -301,6 +317,39 @@
if (trace) { log.trace("Acknowledged message"); }
}
+ */
+
+ org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
+
+ if (preserveOrdering)
+ {
+ //Add a header saying we have sucked the message
+ coreMessage.putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
+ }
+
+ //Add a header with the node id of the node we sucked from - this is used on the sending end to do
+ //the move optimisation
+ coreMessage.putHeader(org.jboss.messaging.core.contract.Message.SOURCE_CHANNEL_ID, sourceChannelID);
+
+ long timeToLive = msg.getJMSExpiration();
+ if (timeToLive != 0)
+ {
+ timeToLive -= System.currentTimeMillis();
+ if (timeToLive <= 0)
+ {
+ timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
+ }
+ }
+
+ //First we ack it - this ack only occurs in memory even if it is a persistent message
+ msg.acknowledge();
+
+ if (trace) { log.trace("Acknowledged message"); }
+
+ //Then we send - this causes the ref to be moved (SQL UPDATE) in the database
+ producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
+
+ if (trace) { log.trace(this + " forwarded message to queue"); }
}
catch (Exception e)
{
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleChannel.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleChannel.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -113,7 +113,14 @@
{
throw new NotYetImplementedException();
}
+
+ public void acknowledgeNoPersist(Delivery d) throws Throwable
+ {
+ throw new NotYetImplementedException();
+ }
+
+
public void cancel(Delivery d) throws Exception
{
throw new NotYetImplementedException();
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleDeliveryObserver.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleDeliveryObserver.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleDeliveryObserver.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -56,6 +56,11 @@
notifyAll();
}
}
+
+ public void acknowledgeNoPersist(Delivery d) throws Throwable
+ {
+ // TODO Auto-generated method stub
+ }
public synchronized void cancel(Delivery d)
{
Deleted: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueDontUseXATest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueDontUseXATest.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueDontUseXATest.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -1,73 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms.clustering;
-
-import org.jboss.test.messaging.tools.container.ServiceAttributeOverrides;
-import org.jboss.test.messaging.tools.container.ServiceContainer;
-
-
-/**
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: $</tt>10 Jul 2007
- *
- * $Id: $
- *
- */
-public class DistributedQueueDontUseXATest extends DistributedQueueTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public DistributedQueueDontUseXATest(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- // Package private ---------------------------------------------
-
- // protected ----------------------------------------------------
-
- protected void setUp() throws Exception
- {
- this.overrides = new ServiceAttributeOverrides();
-
- overrides.put(ServiceContainer.SERVER_PEER_OBJECT_NAME, "UseXAForMessagePull", "false");
-
- super.setUp();
- }
-
- // private -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java (from rev 3301, branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java)
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java (rev 0)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -0,0 +1,1199 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.clustering;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ *
+ * A DistributedQueueTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2796 $</tt>
+ *
+ * $Id: DistributedDestinationsTest.java 2796 2007-06-25 22:24:41Z timfox $
+ *
+ */
+public class DistributedQueueTest extends ClusteringTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DistributedQueueTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void testMessagePropertiesPreservedOnSuckPersistent() throws Exception
+ {
+ this.messagePropertiesPreservedOnSuck(true);
+ }
+
+ public void testMessagePropertiesPreservedOnSuckNonPersistent() throws Exception
+ {
+ this.messagePropertiesPreservedOnSuck(false);
+ }
+
+ public void testClusteredQueueNonPersistent() throws Exception
+ {
+ clusteredQueue(false);
+ }
+
+ public void testClusteredQueuePersistent() throws Exception
+ {
+ clusteredQueue(true);
+ }
+
+ public void testLocalNonPersistent() throws Exception
+ {
+ localQueue(false);
+ }
+
+ public void testLocalPersistent() throws Exception
+ {
+ localQueue(true);
+ }
+
+ public void testWithConnectionsOnAllNodesClientAck() throws Exception
+ {
+ Connection conn0 = createConnectionOnServer(cf, 0);
+
+ Connection conn1 = createConnectionOnServer(cf, 1);
+
+ Connection conn2 = createConnectionOnServer(cf, 2);
+
+ try
+ {
+ conn0.start();
+
+ conn1.start();
+
+ conn2.start();
+
+ //Send a load of messages on node 0
+
+ Session sess0_1 = conn0.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons0_1 = sess0_1.createConsumer(queue[0]);
+
+ MessageProducer prod0 = sess0_1.createProducer(queue[0]);
+
+ Set msgIds = new HashSet();
+
+ final int numMessages = 60;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sess0_1.createTextMessage("message-" + i);
+
+ prod0.send(tm);
+ }
+
+ TextMessage tm0_1 = null;
+
+ for (int i = 0; i < numMessages / 6; i++)
+ {
+ tm0_1 = (TextMessage)cons0_1.receive(5000);
+
+ assertNotNull(tm0_1);
+
+ msgIds.add(tm0_1.getText());
+ }
+
+ tm0_1.acknowledge();
+
+ cons0_1.close();
+
+ Session sess0_2 = conn0.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons0_2 = sess0_2.createConsumer(queue[0]);
+
+ TextMessage tm0_2 = null;
+
+ for (int i = 0; i < numMessages / 6; i++)
+ {
+ tm0_2 = (TextMessage)cons0_2.receive(5000);
+
+ assertNotNull(tm0_2);
+
+ msgIds.add(tm0_2.getText());
+ }
+
+ tm0_2.acknowledge();
+
+ cons0_2.close();
+
+
+ //Two on node 1
+
+ Session sess1_1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons1_1 = sess1_1.createConsumer(queue[1]);
+
+ TextMessage tm1_1 = null;
+
+ for (int i = 0; i < numMessages / 6; i++)
+ {
+ tm1_1 = (TextMessage)cons1_1.receive(5000);
+
+ assertNotNull(tm1_1);
+
+ msgIds.add(tm1_1.getText());
+ }
+
+ tm1_1.acknowledge();
+
+ cons1_1.close();
+
+ Session sess1_2 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons1_2 = sess1_2.createConsumer(queue[1]);
+
+ TextMessage tm1_2 = null;
+
+ for (int i = 0; i < numMessages / 6; i++)
+ {
+ tm1_2 = (TextMessage)cons1_2.receive(5000);
+
+ assertNotNull(tm1_2);
+
+ msgIds.add(tm1_2.getText());
+ }
+
+ tm1_2.acknowledge();
+
+ cons1_2.close();
+
+
+ //Two on node 2
+
+ Session sess2_1 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons2_1 = sess2_1.createConsumer(queue[2]);
+
+ TextMessage tm2_1 = null;
+
+ for (int i = 0; i < numMessages / 6; i++)
+ {
+ tm2_1 = (TextMessage)cons2_1.receive(5000);
+
+ assertNotNull(tm2_1);
+
+ msgIds.add(tm2_1.getText());
+ }
+
+ tm2_1.acknowledge();
+
+ cons2_1.close();
+
+ Session sess2_2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons2_2 = sess2_2.createConsumer(queue[2]);
+
+ TextMessage tm2_2 = null;
+
+ for (int i = 0; i < numMessages / 6; i++)
+ {
+ tm2_2 = (TextMessage)cons2_2.receive(5000);
+
+ assertNotNull(tm2_2);
+
+ msgIds.add(tm2_2.getText());
+ }
+
+ tm2_2.acknowledge();
+
+ cons2_2.close();
+
+ assertEquals(numMessages, msgIds.size());
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ assertTrue(msgIds.contains("message-" + i));
+ }
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ public void testMixedSuck() throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ try
+ {
+
+ conn0 = this.createConnectionOnServer(cf, 0);
+ conn1 = this.createConnectionOnServer(cf, 1);
+ conn2 = this.createConnectionOnServer(cf, 2);
+
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+
+ conn0.start();
+ conn2.start();
+
+ final int NUM_MESSAGES = 300;
+
+
+ // Send at node 0
+
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ MessageProducer prod2 = sess2.createProducer(queue[2]);
+
+ //Send more messages at node 0 and node 2
+
+ boolean persistent = false;
+ for (int i = 0; i < NUM_MESSAGES / 2 ; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message4-" + i);
+
+ prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ prod0.send(tm);
+
+ persistent = !persistent;
+ }
+
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message4-" + i);
+
+ prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ prod2.send(tm);
+
+ persistent = !persistent;
+ }
+
+ //consume them on node 2 - we will get messages from both nodes so the order is undefined
+
+ Set msgs = new HashSet();
+
+ TextMessage tm = null;
+
+ do
+ {
+ tm = (TextMessage)cons2.receive(5000);
+
+ if (tm != null)
+ {
+ msgs.add(tm.getText());
+ }
+ }
+ while (tm != null);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ assertTrue(msgs.contains("message4-" + i));
+ }
+
+ assertEquals(NUM_MESSAGES, msgs.size());
+
+ cons2.close();
+
+ sess2.close();
+
+ sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ cons2 = sess2.createConsumer(queue[2]);
+
+ Message msg = cons2.receive(5000);
+
+ assertNull(msg);
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ // Package private ---------------------------------------------
+
+ // protected ----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ nodeCount = 3;
+
+ super.setUp();
+ }
+
+ // private -----------------------------------------------------
+
+
+ private void clusteredQueue(boolean persistent) throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ try
+ {
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn0 = this.createConnectionOnServer(cf, 0);
+ conn1 = this.createConnectionOnServer(cf, 1);
+ conn2 = this.createConnectionOnServer(cf, 2);
+
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+ MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+
+ conn0.start();
+ conn1.start();
+ conn2.start();
+
+ // Send at node 0
+
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message0-" + i);
+
+ prod0.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons0.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message0-" + i, tm.getText());
+ }
+
+ Message m = cons0.receive(2000);
+
+ assertNull(m);
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ // Send at node 1
+
+ MessageProducer prod1 = sess1.createProducer(queue[1]);
+
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message1-" + i);
+
+ prod1.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message1-" + i, tm.getText());
+ }
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ // Send at node 2
+
+ MessageProducer prod2 = sess2.createProducer(queue[2]);
+
+ prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message2-" + i);
+
+ prod2.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message2-" + i, tm.getText());
+ }
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+
+ //Now close the consumers at node 0 and node 1
+
+ cons0.close();
+
+ cons1.close();
+
+ //Send more messages at node 0
+
+ String messageIdCorrelate[] = new String[NUM_MESSAGES];
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message3-" + i);
+
+ prod0.send(tm);
+
+ messageIdCorrelate[i] = tm.getJMSMessageID();
+
+ log.info("SetID[" + i + "]=" + tm.getJMSMessageID());
+
+ }
+
+ // consume them on node2
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+ assertEquals(messageIdCorrelate[i], tm.getJMSMessageID());
+
+ assertEquals("message3-" + i, tm.getText());
+ }
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ //Send more messages at node 0 and node 1
+
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message4-" + i);
+
+ prod0.send(tm);
+ }
+
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message4-" + i);
+
+ prod2.send(tm);
+ }
+
+ //consume them on node 2 - we will get messages from both nodes so the order is undefined
+
+ Set msgs = new HashSet();
+
+ TextMessage tm = null;
+
+ do
+ {
+ tm = (TextMessage)cons2.receive(1000);
+
+ if (tm != null)
+ {
+ msgs.add(tm.getText());
+ }
+ }
+ while (tm != null);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ assertTrue(msgs.contains("message4-" + i));
+ }
+
+ assertEquals(NUM_MESSAGES, msgs.size());
+
+ msgs.clear();
+
+ // Now repeat but this time creating the consumer after send
+
+ cons2.close();
+
+ // Send more messages at node 0 and node 1
+
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
+ {
+ tm = sess0.createTextMessage("message5-" + i);
+
+ prod0.send(tm);
+ }
+
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+ {
+ tm = sess1.createTextMessage("message5-" + i);
+
+ prod2.send(tm);
+ }
+
+ cons2 = sess2.createConsumer(queue[2]);
+
+ //consume them on node 2 - we will get messages from both nodes so the order is undefined
+
+ msgs = new HashSet();
+
+ do
+ {
+ tm = (TextMessage)cons2.receive(1000);
+
+ if (tm != null)
+ {
+ msgs.add(tm.getText());
+ }
+ }
+ while (tm != null);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ assertTrue(msgs.contains("message5-" + i));
+ }
+
+ assertEquals(NUM_MESSAGES, msgs.size());
+
+ msgs.clear();
+
+
+ //Now send messages at node 0 - but consume from node 1 AND node 2
+
+ //order is undefined
+
+ cons2.close();
+
+ cons1 = sess1.createConsumer(queue[1]);
+
+ cons2 = sess2.createConsumer(queue[2]);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ tm = sess0.createTextMessage("message6-" + i);
+
+ prod0.send(tm);
+ }
+
+ msgs = new HashSet();
+
+ int count = 0;
+
+ do
+ {
+ tm = (TextMessage)cons1.receive(1000);
+
+ if (tm != null)
+ {
+ msgs.add(tm.getText());
+
+ count++;
+ }
+ }
+ while (tm != null);
+
+ do
+ {
+ tm = (TextMessage)cons2.receive(1000);
+
+ if (tm != null)
+ {
+ msgs.add(tm.getText());
+
+ count++;
+ }
+ }
+ while (tm != null);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ assertTrue(msgs.contains("message6-" + i));
+ }
+
+ assertEquals(NUM_MESSAGES, count);
+
+ msgs.clear();
+
+ //as above but start consumers AFTER sending
+
+ cons1.close();
+
+ cons2.close();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ tm = sess0.createTextMessage("message7-" + i);
+
+ prod0.send(tm);
+ }
+
+ cons1 = sess1.createConsumer(queue[1]);
+
+ cons2 = sess2.createConsumer(queue[2]);
+
+
+ msgs = new HashSet();
+
+ count = 0;
+
+ do
+ {
+ tm = (TextMessage)cons1.receive(1000);
+
+ if (tm != null)
+ {
+ msgs.add(tm.getText());
+
+ count++;
+ }
+ }
+ while (tm != null);
+
+ do
+ {
+ tm = (TextMessage)cons2.receive(1000);
+
+ if (tm != null)
+ {
+ msgs.add(tm.getText());
+
+ count++;
+ }
+ }
+ while (tm != null);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ assertTrue(msgs.contains("message7-" + i));
+ }
+
+ assertEquals(NUM_MESSAGES, count);
+
+ msgs.clear();
+
+
+ // Now send message on node 0, consume on node2, then cancel, consume on node1, cancel, consume on node 0
+
+ cons1.close();
+
+ cons2.close();
+
+ sess2.close();
+
+ sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ cons2 = sess2.createConsumer(queue[2]);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ tm = sess0.createTextMessage("message8-" + i);
+
+ prod0.send(tm);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message8-" + i, tm.getText());
+ }
+
+ sess2.close(); // messages should go back on queue
+
+ //Now try on node 1
+
+ sess1.close();
+
+ sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ cons1 = sess1.createConsumer(queue[1]);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message8-" + i, tm.getText());
+ }
+
+ sess1.close(); // messages should go back on queue
+
+ //Now try on node 0
+
+ cons0 = sess0.createConsumer(queue[0]);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ tm = (TextMessage)cons0.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message8-" + i, tm.getText());
+ }
+
+ Message msg = cons0.receive(5000);
+
+ assertNull(msg);
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ private void messagePropertiesPreservedOnSuck(boolean persistent) throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ try
+ {
+
+ conn0 = this.createConnectionOnServer(cf, 0);
+ conn1 = this.createConnectionOnServer(cf, 1);
+ conn2 = this.createConnectionOnServer(cf, 2);
+
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+
+ conn0.start();
+ conn2.start();
+
+ // Send at node 0
+
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+
+
+ TextMessage tm = sess0.createTextMessage("blahmessage");
+
+ prod0.setPriority(7);
+
+ prod0.setTimeToLive(1 * 60 * 60 * 1000);
+
+ prod0.send(tm);
+
+ long expiration = tm.getJMSExpiration();
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+
+
+ tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("blahmessage", tm.getText());
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+ assertEquals(7, tm.getJMSPriority());
+
+ assertTrue(Math.abs(expiration - tm.getJMSExpiration()) < 100);
+
+ Message m = cons2.receive(5000);
+
+ assertNull(m);
+
+
+ //Now do one with expiration = 0
+
+
+ tm = sess0.createTextMessage("blahmessage2");
+
+ prod0.setPriority(7);
+
+ prod0.setTimeToLive(0);
+
+ prod0.send(tm);
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+
+
+ tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("blahmessage2", tm.getText());
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+ assertEquals(7, tm.getJMSPriority());
+
+ assertEquals(0, tm.getJMSExpiration());
+
+ m = cons2.receive(5000);
+
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+
+ /* Check that non clustered queues behave properly when deployed on a cluster */
+ private void localQueue(boolean persistent) throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ //Deploy three non clustered queues with same name on different nodes
+
+ try
+ {
+ ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 0, false);
+
+ ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 1, false);
+
+ ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 2, false);
+
+ Queue queue0 = (Queue)ic[0].lookup("/nonClusteredQueue");
+ Queue queue1 = (Queue)ic[1].lookup("/nonClusteredQueue");
+ Queue queue2 = (Queue)ic[2].lookup("/nonClusteredQueue");
+
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn0 = this.createConnectionOnServer(cf, 0);
+ conn1 = this.createConnectionOnServer(cf, 1);
+ conn2 = this.createConnectionOnServer(cf, 2);
+
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn0.start();
+ conn1.start();
+ conn2.start();
+
+ // ==============
+ // Send at node 0
+
+ MessageProducer prod0 = sess0.createProducer(queue0);
+
+ prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message" + i);
+
+ prod0.send(tm);
+ }
+
+ // Try and consume at node 1
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ Message m = cons1.receive(2000);
+
+ assertNull(m);
+
+ cons1.close();
+
+ //And at node 2
+
+ MessageConsumer cons2 = sess2.createConsumer(queue2);
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ cons2.close();
+
+ // Now consume at node 0
+
+ MessageConsumer cons0 = sess0.createConsumer(queue0);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons0.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ cons0.close();
+
+ // ==============
+ // Send at node 1
+
+ MessageProducer prod1 = sess1.createProducer(queue1);
+
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ // Try and consume at node 0
+
+ cons0 = sess0.createConsumer(queue0);
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ cons0.close();
+
+ //And at node 2
+
+ cons2 = sess2.createConsumer(queue2);
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ cons2.close();
+
+ // Now consume at node 1
+
+ cons1 = sess1.createConsumer(queue1);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ cons1.close();
+
+ // ==============
+ // Send at node 2
+
+ MessageProducer prod2 = sess2.createProducer(queue2);
+
+ prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message" + i);
+
+ prod2.send(tm);
+ }
+
+ // Try and consume at node 0
+
+ cons0 = sess0.createConsumer(queue0);
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ cons0.close();
+
+ //And at node 1
+
+ cons1 = sess1.createConsumer(queue1);
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ cons1.close();
+
+ // Now consume at node 2
+
+ cons2 = sess2.createConsumer(queue2);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ cons2.close();
+
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ ServerManagement.undeployQueue("nonClusteredQueue", 0);
+
+ ServerManagement.undeployQueue("nonClusteredQueue", 1);
+
+ ServerManagement.undeployQueue("nonClusteredQueue", 2);
+ }
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -1,1202 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms.clustering;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.message.TextMessageProxy;
-
-
-/**
- *
- * A DistributedQueueTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 2796 $</tt>
- *
- * $Id: DistributedDestinationsTest.java 2796 2007-06-25 22:24:41Z timfox $
- *
- */
-public abstract class DistributedQueueTestBase extends ClusteringTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public DistributedQueueTestBase(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- public void testMessagePropertiesPreservedOnSuckPersistent() throws Exception
- {
- this.messagePropertiesPreservedOnSuck(true);
- }
-
- public void testMessagePropertiesPreservedOnSuckNonPersistent() throws Exception
- {
- this.messagePropertiesPreservedOnSuck(false);
- }
-
- public void testClusteredQueueNonPersistent() throws Exception
- {
- clusteredQueue(false);
- }
-
- public void testClusteredQueuePersistent() throws Exception
- {
- clusteredQueue(true);
- }
-
- public void testLocalNonPersistent() throws Exception
- {
- localQueue(false);
- }
-
- public void testLocalPersistent() throws Exception
- {
- localQueue(true);
- }
-
- public void testWithConnectionsOnAllNodesClientAck() throws Exception
- {
- Connection conn0 = createConnectionOnServer(cf, 0);
-
- Connection conn1 = createConnectionOnServer(cf, 1);
-
- Connection conn2 = createConnectionOnServer(cf, 2);
-
- try
- {
- conn0.start();
-
- conn1.start();
-
- conn2.start();
-
- //Send a load of messages on node 0
-
- Session sess0_1 = conn0.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons0_1 = sess0_1.createConsumer(queue[0]);
-
- MessageProducer prod0 = sess0_1.createProducer(queue[0]);
-
- Set msgIds = new HashSet();
-
- final int numMessages = 60;
-
- for (int i = 0; i < numMessages; i++)
- {
- TextMessage tm = sess0_1.createTextMessage("message-" + i);
-
- prod0.send(tm);
- }
-
- TextMessage tm0_1 = null;
-
- for (int i = 0; i < numMessages / 6; i++)
- {
- tm0_1 = (TextMessage)cons0_1.receive(5000);
-
- assertNotNull(tm0_1);
-
- msgIds.add(tm0_1.getText());
- }
-
- tm0_1.acknowledge();
-
- cons0_1.close();
-
- Session sess0_2 = conn0.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons0_2 = sess0_2.createConsumer(queue[0]);
-
- TextMessage tm0_2 = null;
-
- for (int i = 0; i < numMessages / 6; i++)
- {
- tm0_2 = (TextMessage)cons0_2.receive(5000);
-
- assertNotNull(tm0_2);
-
- msgIds.add(tm0_2.getText());
- }
-
- tm0_2.acknowledge();
-
- cons0_2.close();
-
-
- //Two on node 1
-
- Session sess1_1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons1_1 = sess1_1.createConsumer(queue[1]);
-
- TextMessage tm1_1 = null;
-
- for (int i = 0; i < numMessages / 6; i++)
- {
- tm1_1 = (TextMessage)cons1_1.receive(5000);
-
- assertNotNull(tm1_1);
-
- msgIds.add(tm1_1.getText());
- }
-
- tm1_1.acknowledge();
-
- cons1_1.close();
-
- Session sess1_2 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons1_2 = sess1_2.createConsumer(queue[1]);
-
- TextMessage tm1_2 = null;
-
- for (int i = 0; i < numMessages / 6; i++)
- {
- tm1_2 = (TextMessage)cons1_2.receive(5000);
-
- assertNotNull(tm1_2);
-
- msgIds.add(tm1_2.getText());
- }
-
- tm1_2.acknowledge();
-
- cons1_2.close();
-
-
- //Two on node 2
-
- Session sess2_1 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons2_1 = sess2_1.createConsumer(queue[2]);
-
- TextMessage tm2_1 = null;
-
- for (int i = 0; i < numMessages / 6; i++)
- {
- tm2_1 = (TextMessage)cons2_1.receive(5000);
-
- assertNotNull(tm2_1);
-
- msgIds.add(tm2_1.getText());
- }
-
- tm2_1.acknowledge();
-
- cons2_1.close();
-
- Session sess2_2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons2_2 = sess2_2.createConsumer(queue[2]);
-
- TextMessage tm2_2 = null;
-
- for (int i = 0; i < numMessages / 6; i++)
- {
- tm2_2 = (TextMessage)cons2_2.receive(5000);
-
- assertNotNull(tm2_2);
-
- msgIds.add(tm2_2.getText());
- }
-
- tm2_2.acknowledge();
-
- cons2_2.close();
-
- assertEquals(numMessages, msgIds.size());
-
- for (int i = 0; i < numMessages; i++)
- {
- assertTrue(msgIds.contains("message-" + i));
- }
- }
- finally
- {
- if (conn0 != null)
- {
- conn0.close();
- }
-
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- public void testMixedSuck() throws Exception
- {
- Connection conn0 = null;
- Connection conn1 = null;
- Connection conn2 = null;
-
- try
- {
-
- conn0 = this.createConnectionOnServer(cf, 0);
- conn1 = this.createConnectionOnServer(cf, 1);
- conn2 = this.createConnectionOnServer(cf, 2);
-
- checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
-
- Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons2 = sess2.createConsumer(queue[2]);
-
- conn0.start();
- conn2.start();
-
- final int NUM_MESSAGES = 300;
-
-
- // Send at node 0
-
- MessageProducer prod0 = sess0.createProducer(queue[0]);
-
- MessageProducer prod2 = sess2.createProducer(queue[2]);
-
- //Send more messages at node 0 and node 2
-
- boolean persistent = false;
- for (int i = 0; i < NUM_MESSAGES / 2 ; i++)
- {
- TextMessage tm = sess0.createTextMessage("message4-" + i);
-
- prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- prod0.send(tm);
-
- persistent = !persistent;
- }
-
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess2.createTextMessage("message4-" + i);
-
- prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- prod2.send(tm);
-
- persistent = !persistent;
- }
-
- //consume them on node 2 - we will get messages from both nodes so the order is undefined
-
- Set msgs = new HashSet();
-
- TextMessage tm = null;
-
- do
- {
- tm = (TextMessage)cons2.receive(5000);
-
- if (tm != null)
- {
- msgs.add(tm.getText());
- }
- }
- while (tm != null);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- assertTrue(msgs.contains("message4-" + i));
- }
-
- assertEquals(NUM_MESSAGES, msgs.size());
-
- cons2.close();
-
- sess2.close();
-
- sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- cons2 = sess2.createConsumer(queue[2]);
-
- Message msg = cons2.receive(5000);
-
- assertNull(msg);
- }
- finally
- {
- if (conn0 != null)
- {
- conn0.close();
- }
-
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- // Package private ---------------------------------------------
-
- // protected ----------------------------------------------------
-
- protected void setUp() throws Exception
- {
- nodeCount = 3;
-
- super.setUp();
- }
-
- // private -----------------------------------------------------
-
-
- private void clusteredQueue(boolean persistent) throws Exception
- {
- Connection conn0 = null;
- Connection conn1 = null;
- Connection conn2 = null;
-
- try
- {
- //This will create 3 different connection on 3 different nodes, since
- //the cf is clustered
- conn0 = this.createConnectionOnServer(cf, 0);
- conn1 = this.createConnectionOnServer(cf, 1);
- conn2 = this.createConnectionOnServer(cf, 2);
-
- checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
-
- Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons0 = sess0.createConsumer(queue[0]);
- MessageConsumer cons1 = sess1.createConsumer(queue[1]);
- MessageConsumer cons2 = sess2.createConsumer(queue[2]);
-
- conn0.start();
- conn1.start();
- conn2.start();
-
- // Send at node 0
-
- MessageProducer prod0 = sess0.createProducer(queue[0]);
-
- prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- final int NUM_MESSAGES = 100;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess0.createTextMessage("message0-" + i);
-
- prod0.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons0.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message0-" + i, tm.getText());
- }
-
- Message m = cons0.receive(2000);
-
- assertNull(m);
-
- m = cons1.receive(2000);
-
- assertNull(m);
-
- m = cons2.receive(2000);
-
- assertNull(m);
-
- // Send at node 1
-
- MessageProducer prod1 = sess1.createProducer(queue[1]);
-
- prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess1.createTextMessage("message1-" + i);
-
- prod1.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons1.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message1-" + i, tm.getText());
- }
-
- m = cons0.receive(2000);
-
- assertNull(m);
-
- m = cons1.receive(2000);
-
- assertNull(m);
-
- m = cons2.receive(2000);
-
- assertNull(m);
-
- // Send at node 2
-
- MessageProducer prod2 = sess2.createProducer(queue[2]);
-
- prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess2.createTextMessage("message2-" + i);
-
- prod2.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message2-" + i, tm.getText());
- }
-
- m = cons0.receive(2000);
-
- assertNull(m);
-
- m = cons1.receive(2000);
-
- assertNull(m);
-
- m = cons2.receive(2000);
-
- assertNull(m);
-
-
- //Now close the consumers at node 0 and node 1
-
- cons0.close();
-
- cons1.close();
-
- //Send more messages at node 0
-
- String messageIdCorrelate[] = new String[NUM_MESSAGES];
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess0.createTextMessage("message3-" + i);
-
- prod0.send(tm);
-
- messageIdCorrelate[i] = tm.getJMSMessageID();
-
- log.info("SetID[" + i + "]=" + tm.getJMSMessageID());
-
- }
-
- // consume them on node2
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
- assertEquals(messageIdCorrelate[i], tm.getJMSMessageID());
-
- assertEquals("message3-" + i, tm.getText());
- }
-
- m = cons2.receive(2000);
-
- assertNull(m);
-
- //Send more messages at node 0 and node 1
-
- for (int i = 0; i < NUM_MESSAGES / 2; i++)
- {
- TextMessage tm = sess0.createTextMessage("message4-" + i);
-
- prod0.send(tm);
- }
-
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess2.createTextMessage("message4-" + i);
-
- prod2.send(tm);
- }
-
- //consume them on node 2 - we will get messages from both nodes so the order is undefined
-
- Set msgs = new HashSet();
-
- TextMessage tm = null;
-
- do
- {
- tm = (TextMessage)cons2.receive(1000);
-
- if (tm != null)
- {
- msgs.add(tm.getText());
- }
- }
- while (tm != null);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- assertTrue(msgs.contains("message4-" + i));
- }
-
- assertEquals(NUM_MESSAGES, msgs.size());
-
- msgs.clear();
-
- // Now repeat but this time creating the consumer after send
-
- cons2.close();
-
- // Send more messages at node 0 and node 1
-
- for (int i = 0; i < NUM_MESSAGES / 2; i++)
- {
- tm = sess0.createTextMessage("message5-" + i);
-
- prod0.send(tm);
- }
-
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
- {
- tm = sess1.createTextMessage("message5-" + i);
-
- prod2.send(tm);
- }
-
- cons2 = sess2.createConsumer(queue[2]);
-
- //consume them on node 2 - we will get messages from both nodes so the order is undefined
-
- msgs = new HashSet();
-
- do
- {
- tm = (TextMessage)cons2.receive(1000);
-
- if (tm != null)
- {
- msgs.add(tm.getText());
- }
- }
- while (tm != null);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- assertTrue(msgs.contains("message5-" + i));
- }
-
- assertEquals(NUM_MESSAGES, msgs.size());
-
- msgs.clear();
-
-
- //Now send messages at node 0 - but consume from node 1 AND node 2
-
- //order is undefined
-
- cons2.close();
-
- cons1 = sess1.createConsumer(queue[1]);
-
- cons2 = sess2.createConsumer(queue[2]);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- tm = sess0.createTextMessage("message6-" + i);
-
- prod0.send(tm);
- }
-
- msgs = new HashSet();
-
- int count = 0;
-
- do
- {
- tm = (TextMessage)cons1.receive(1000);
-
- if (tm != null)
- {
- msgs.add(tm.getText());
-
- count++;
- }
- }
- while (tm != null);
-
- do
- {
- tm = (TextMessage)cons2.receive(1000);
-
- if (tm != null)
- {
- msgs.add(tm.getText());
-
- count++;
- }
- }
- while (tm != null);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- assertTrue(msgs.contains("message6-" + i));
- }
-
- assertEquals(NUM_MESSAGES, count);
-
- msgs.clear();
-
- //as above but start consumers AFTER sending
-
- cons1.close();
-
- cons2.close();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- tm = sess0.createTextMessage("message7-" + i);
-
- prod0.send(tm);
- }
-
- cons1 = sess1.createConsumer(queue[1]);
-
- cons2 = sess2.createConsumer(queue[2]);
-
-
- msgs = new HashSet();
-
- count = 0;
-
- do
- {
- tm = (TextMessage)cons1.receive(1000);
-
- if (tm != null)
- {
- msgs.add(tm.getText());
-
- count++;
- }
- }
- while (tm != null);
-
- do
- {
- tm = (TextMessage)cons2.receive(1000);
-
- if (tm != null)
- {
- msgs.add(tm.getText());
-
- count++;
- }
- }
- while (tm != null);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- assertTrue(msgs.contains("message7-" + i));
- }
-
- assertEquals(NUM_MESSAGES, count);
-
- msgs.clear();
-
-
- // Now send message on node 0, consume on node2, then cancel, consume on node1, cancel, consume on node 0
-
- cons1.close();
-
- cons2.close();
-
- sess2.close();
-
- sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- cons2 = sess2.createConsumer(queue[2]);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- tm = sess0.createTextMessage("message8-" + i);
-
- prod0.send(tm);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message8-" + i, tm.getText());
- }
-
- sess2.close(); // messages should go back on queue
-
- //Now try on node 1
-
- sess1.close();
-
- sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- cons1 = sess1.createConsumer(queue[1]);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- tm = (TextMessage)cons1.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message8-" + i, tm.getText());
- }
-
- sess1.close(); // messages should go back on queue
-
- //Now try on node 0
-
- cons0 = sess0.createConsumer(queue[0]);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- tm = (TextMessage)cons0.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message8-" + i, tm.getText());
- }
-
- Message msg = cons0.receive(5000);
-
- assertNull(msg);
- }
- finally
- {
- if (conn0 != null)
- {
- conn0.close();
- }
-
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
- private void messagePropertiesPreservedOnSuck(boolean persistent) throws Exception
- {
- Connection conn0 = null;
- Connection conn1 = null;
- Connection conn2 = null;
-
- try
- {
-
- conn0 = this.createConnectionOnServer(cf, 0);
- conn1 = this.createConnectionOnServer(cf, 1);
- conn2 = this.createConnectionOnServer(cf, 2);
-
- checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
-
- Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons2 = sess2.createConsumer(queue[2]);
-
- conn0.start();
- conn2.start();
-
- // Send at node 0
-
- MessageProducer prod0 = sess0.createProducer(queue[0]);
-
- prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-
-
- TextMessage tm = sess0.createTextMessage("blahmessage");
-
- prod0.setPriority(7);
-
- prod0.setTimeToLive(1 * 60 * 60 * 1000);
-
- prod0.send(tm);
-
- long expiration = tm.getJMSExpiration();
-
- assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
-
-
-
- tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("blahmessage", tm.getText());
-
- assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
-
- assertEquals(7, tm.getJMSPriority());
-
- assertTrue(Math.abs(expiration - tm.getJMSExpiration()) < 100);
-
- Message m = cons2.receive(5000);
-
- assertNull(m);
-
-
- //Now do one with expiration = 0
-
-
- tm = sess0.createTextMessage("blahmessage2");
-
- prod0.setPriority(7);
-
- prod0.setTimeToLive(0);
-
- prod0.send(tm);
-
- assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
-
-
-
- tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("blahmessage2", tm.getText());
-
- assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
-
- assertEquals(7, tm.getJMSPriority());
-
- assertEquals(0, tm.getJMSExpiration());
-
- m = cons2.receive(5000);
-
- assertNull(m);
- }
- finally
- {
- if (conn0 != null)
- {
- conn0.close();
- }
-
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
- }
- }
-
-
- /* Check that non clustered queues behave properly when deployed on a cluster */
- private void localQueue(boolean persistent) throws Exception
- {
- Connection conn0 = null;
- Connection conn1 = null;
- Connection conn2 = null;
-
- //Deploy three non clustered queues with same name on different nodes
-
- try
- {
- ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 0, false);
-
- ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 1, false);
-
- ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 2, false);
-
- Queue queue0 = (Queue)ic[0].lookup("/nonClusteredQueue");
- Queue queue1 = (Queue)ic[1].lookup("/nonClusteredQueue");
- Queue queue2 = (Queue)ic[2].lookup("/nonClusteredQueue");
-
- //This will create 3 different connection on 3 different nodes, since
- //the cf is clustered
- conn0 = this.createConnectionOnServer(cf, 0);
- conn1 = this.createConnectionOnServer(cf, 1);
- conn2 = this.createConnectionOnServer(cf, 2);
-
- checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
-
- Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- conn0.start();
- conn1.start();
- conn2.start();
-
- // ==============
- // Send at node 0
-
- MessageProducer prod0 = sess0.createProducer(queue0);
-
- prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- final int NUM_MESSAGES = 100;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess0.createTextMessage("message" + i);
-
- prod0.send(tm);
- }
-
- // Try and consume at node 1
-
- MessageConsumer cons1 = sess1.createConsumer(queue1);
-
- Message m = cons1.receive(2000);
-
- assertNull(m);
-
- cons1.close();
-
- //And at node 2
-
- MessageConsumer cons2 = sess2.createConsumer(queue2);
-
- m = cons2.receive(2000);
-
- assertNull(m);
-
- cons2.close();
-
- // Now consume at node 0
-
- MessageConsumer cons0 = sess0.createConsumer(queue0);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons0.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- m = cons0.receive(2000);
-
- assertNull(m);
-
- cons0.close();
-
- // ==============
- // Send at node 1
-
- MessageProducer prod1 = sess1.createProducer(queue1);
-
- prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess1.createTextMessage("message" + i);
-
- prod1.send(tm);
- }
-
- // Try and consume at node 0
-
- cons0 = sess0.createConsumer(queue0);
-
- m = cons0.receive(2000);
-
- assertNull(m);
-
- cons0.close();
-
- //And at node 2
-
- cons2 = sess2.createConsumer(queue2);
-
- m = cons2.receive(2000);
-
- assertNull(m);
-
- cons2.close();
-
- // Now consume at node 1
-
- cons1 = sess1.createConsumer(queue1);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons1.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- m = cons1.receive(2000);
-
- assertNull(m);
-
- cons1.close();
-
- // ==============
- // Send at node 2
-
- MessageProducer prod2 = sess2.createProducer(queue2);
-
- prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess2.createTextMessage("message" + i);
-
- prod2.send(tm);
- }
-
- // Try and consume at node 0
-
- cons0 = sess0.createConsumer(queue0);
-
- m = cons0.receive(2000);
-
- assertNull(m);
-
- cons0.close();
-
- //And at node 1
-
- cons1 = sess1.createConsumer(queue1);
-
- m = cons1.receive(2000);
-
- assertNull(m);
-
- cons1.close();
-
- // Now consume at node 2
-
- cons2 = sess2.createConsumer(queue2);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- m = cons2.receive(2000);
-
- assertNull(m);
-
- cons2.close();
-
- }
- finally
- {
- if (conn0 != null)
- {
- conn0.close();
- }
-
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn2 != null)
- {
- conn2.close();
- }
-
- ServerManagement.undeployQueue("nonClusteredQueue", 0);
-
- ServerManagement.undeployQueue("nonClusteredQueue", 1);
-
- ServerManagement.undeployQueue("nonClusteredQueue", 2);
- }
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Deleted: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueUseXATest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueUseXATest.java 2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueUseXATest.java 2007-11-16 12:22:58 UTC (rev 3337)
@@ -1,73 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms.clustering;
-
-import org.jboss.test.messaging.tools.container.ServiceAttributeOverrides;
-import org.jboss.test.messaging.tools.container.ServiceContainer;
-
-/**
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: $</tt>10 Jul 2007
- *
- * $Id: $
- *
- */
-public class DistributedQueueUseXATest extends DistributedQueueTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public DistributedQueueUseXATest(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
-
- // Package private ---------------------------------------------
-
- // protected ----------------------------------------------------
-
- protected void setUp() throws Exception
- {
- this.overrides = new ServiceAttributeOverrides();
-
- overrides.put(ServiceContainer.SERVER_PEER_OBJECT_NAME, "UseXAForMessagePull", "true");
-
- super.setUp();
- }
-
- // private -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
More information about the jboss-cvs-commits
mailing list