[jboss-cvs] JBoss Messaging SVN: r2859 - in trunk: src/main/org/jboss/messaging/core/impl/clusterconnection and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jul 9 11:35:01 EDT 2007
Author: timfox
Date: 2007-07-09 11:35:01 -0400 (Mon, 09 Jul 2007)
New Revision: 2859
Modified:
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java
Log:
Fixed some more tests
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-07-09 15:35:01 UTC (rev 2859)
@@ -67,7 +67,6 @@
import org.jboss.messaging.util.Util;
import org.jboss.remoting.Client;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
-import org.jboss.util.id.GUID;
/**
* Concrete implementation of ConnectionEndpoint.
@@ -264,8 +263,6 @@
log.debug("created and registered " + ep);
- log.info("*********** CREATING SESSION WITH ID:" + sessionID);
-
ClientSessionDelegate d = new ClientSessionDelegate(sessionID, dupsOKBatchSize);
log.debug("created " + d);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-09 15:35:01 UTC (rev 2859)
@@ -36,6 +36,7 @@
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
+import javax.jms.TextMessage;
import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
@@ -1009,7 +1010,7 @@
if (trace) { log.trace("Collected " + map.size() + " deliveries"); }
}
- public void replicateDeliveryResponseReceived(long deliveryID) throws Exception
+ public synchronized void replicateDeliveryResponseReceived(long deliveryID) throws Exception
{
//We look up the delivery in the list and actually perform the delivery
@@ -1061,7 +1062,7 @@
{
toDeliver.take();
- performDelivery(dr.del.getReference(), deliveryID, dr.getConsumer());
+ performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer());
delivered = true;
@@ -1086,8 +1087,6 @@
*/
void waitForDeliveriesFromConsumer(String consumerID) throws Exception
{
- log.info("Waiting for deliveries for consumer " + consumerID);
-
long toWait = CLOSE_TIMEOUT;
boolean wait;
@@ -1136,14 +1135,11 @@
while (toDeliver.take() != null) {}
log.warn("Timed out waiting for response to arrive");
- }
-
-
+ }
}
- log.info("Done Waiting for deliveries for consumer " + consumerID);
}
- void handleDelivery(Delivery delivery, ServerConsumerEndpoint consumer) throws Exception
+ synchronized void handleDelivery(Delivery delivery, ServerConsumerEndpoint consumer) throws Exception
{
long deliveryId = -1;
@@ -1151,12 +1147,13 @@
DeliveryRecord rec = null;
+ deliveryId = deliveryIdSequence.increment();
+
//TODO can't we combine flags isRetainDeliveries and isReplicating - surely they're mutually exclusive?
if (consumer.isRetainDeliveries())
- {
+ {
// Add a delivery
- deliveryId = deliveryIdSequence.increment();
-
+
rec = new DeliveryRecord(delivery, consumer, deliveryId);
deliveries.put(new Long(deliveryId), rec);
Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-07-09 15:35:01 UTC (rev 2859)
@@ -268,8 +268,18 @@
((MessageProxy)msg).getMessage().putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
}
- producer.send(null, msg, -1, -1, Long.MIN_VALUE);
+ long timeToLive = msg.getJMSExpiration();
+ if (timeToLive != 0)
+ {
+ timeToLive -= System.currentTimeMillis();
+ if (timeToLive <= 0)
+ {
+ timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
+ }
+ }
+ producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive);
+
if (trace) { log.trace(this + " forwarded message to queue"); }
if (startTx)
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-09 15:35:01 UTC (rev 2859)
@@ -2638,35 +2638,40 @@
//FIXME - this is ugly
//Find a better way of getting the sessions
-
- Collection sessions = serverPeer.getSessions();
-
- Iterator iter2 = sessions.iterator();
-
- while (iter2.hasNext())
- {
- ServerSessionEndpoint session = (ServerSessionEndpoint)iter2.next();
+ //We shouldn't know abou the server peer
+
+ if (serverPeer != null)
+ {
- session.collectDeliveries(deliveries, firstNode);
- }
-
- if (!firstNode)
- {
- PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(failoverNodeID));
-
- if (info == null)
- {
- throw new IllegalStateException("Cannot find address for failover node " + failoverNodeID);
- }
+ Collection sessions = serverPeer.getSessions();
- ClusterRequest request = new AddAllReplicatedDeliveriesMessage(thisNodeID, deliveries);
+ Iterator iter2 = sessions.iterator();
- //send sync
+ while (iter2.hasNext())
+ {
+ ServerSessionEndpoint session = (ServerSessionEndpoint)iter2.next();
+
+ session.collectDeliveries(deliveries, firstNode);
+ }
- groupMember.unicastControl(request, info.getControlChannelAddress(), true);
-
- if (trace) { log.trace("Sent AddAllReplicatedDeliveriesMessage"); }
- }
+ if (!firstNode)
+ {
+ PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(failoverNodeID));
+
+ if (info == null)
+ {
+ throw new IllegalStateException("Cannot find address for failover node " + failoverNodeID);
+ }
+
+ ClusterRequest request = new AddAllReplicatedDeliveriesMessage(thisNodeID, deliveries);
+
+ //send sync
+
+ groupMember.unicastControl(request, info.getControlChannelAddress(), true);
+
+ if (trace) { log.trace("Sent AddAllReplicatedDeliveriesMessage"); }
+ }
+ }
}
}
finally
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/build.xml 2007-07-09 15:35:01 UTC (rev 2859)
@@ -106,9 +106,9 @@
<property name="junit.haltonfailure" value="false"/>
<property name="junit.fork" value="true"/>
<property name="junit.includeantruntime" value="true"/>
- <property name="junit.timeout" value="1800000"/>
- <property name="clustering.junit.timeout" value="1800000"/>
- <property name="stress.timeout" value="1800000"/>
+ <property name="junit.timeout" value="1200000"/>
+ <property name="clustering.junit.timeout" value="1200000"/>
+ <property name="stress.timeout" value="1200000"/>
<property name="junit.showoutput" value="true"/>
<property name="junit.jvm" value=""/>
Modified: trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java 2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java 2007-07-09 15:35:01 UTC (rev 2859)
@@ -773,7 +773,9 @@
ServerManagement.deployQueue("QA");
ServerManagement.deployQueue("QB");
-
+
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
conn3 = cf.createXAConnection();
XASession sess3 = conn3.createXASession();
@@ -1042,6 +1044,8 @@
ServerManagement.deployQueue("QB");
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
conn3 = cf.createXAConnection();
XASession sess3 = conn3.createXASession();
@@ -1608,6 +1612,8 @@
//Now recover
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
conn3 = cf.createXAConnection();
XASession sess3 = conn3.createXASession();
@@ -1911,6 +1917,7 @@
ServerManagement.deployQueue("TXQ");
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
//Now recover
@@ -2390,6 +2397,7 @@
ServerManagement.deployTopic("TXTOPIC");
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
conn3 = cf.createXAConnection();
@@ -2608,6 +2616,8 @@
ServerManagement.deployQueue("TXQ");
//Try and recover
+
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
XAResource res = cf.createXAConnection().createXASession().getXAResource();
@@ -2795,6 +2805,7 @@
ServerManagement.deployQueue("QA");
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
XAResource res = cf.createXAConnection().createXASession().getXAResource();
@@ -2954,6 +2965,7 @@
ServerManagement.deployQueue("Queue");
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
XAResource res = cf.createXAConnection().createXASession().getXAResource();
@@ -3095,8 +3107,8 @@
ServerManagement.deployQueue("Queue");
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
-
XAResource res = cf.createXAConnection().createXASession().getXAResource();
Xid[] xids = res.recover(XAResource.TMSTARTRSCAN);
@@ -3243,7 +3255,8 @@
ServerManagement.deployQueue("QB");
-
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
XAResource res = cf.createXAConnection().createXASession().getXAResource();
Xid[] xids = res.recover(XAResource.TMSTARTRSCAN);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java 2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java 2007-07-09 15:35:01 UTC (rev 2859)
@@ -1225,8 +1225,13 @@
ServerManagement.startServerPeer();
ServerManagement.deployQueue("Queue");
+
+ cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+ conn1.close();
+
+ conn1 = cf.createXAConnection();
-
XAResource res = cf.createXAConnection().createXASession().getXAResource();
Xid[] xids = res.recover(XAResource.TMSTARTRSCAN);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2007-07-09 15:35:01 UTC (rev 2859)
@@ -64,6 +64,16 @@
}
// Public --------------------------------------------------------
+
+ public void testMessagePropertiesPreservedOnSuckPersistent() throws Exception
+ {
+ this.messagePropertiesPreservedOnSuck(true);
+ }
+
+ public void testMessagePropertiesPreservedOnSuckNonPersistent() throws Exception
+ {
+ this.messagePropertiesPreservedOnSuck(false);
+ }
public void testClusteredQueueNonPersistent() throws Exception
{
@@ -83,9 +93,8 @@
public void testLocalPersistent() throws Exception
{
localQueue(true);
- }
+ }
-
public void testWithConnectionsOnAllNodesClientAck() throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
@@ -252,11 +261,123 @@
}
}
}
+
+ public void testMixedSuck() throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
- // Package protected ---------------------------------------------
+ try
+ {
- // Protected -----------------------------------------------------
+ conn0 = this.createConnectionOnServer(cf, 0);
+ conn1 = this.createConnectionOnServer(cf, 1);
+ conn2 = this.createConnectionOnServer(cf, 2);
+
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+
+ conn0.start();
+ conn2.start();
+
+ final int NUM_MESSAGES = 300;
+
+
+ // Send at node 0
+
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ MessageProducer prod2 = sess2.createProducer(queue[2]);
+
+ //Send more messages at node 0 and node 2
+
+ boolean persistent = false;
+ for (int i = 0; i < NUM_MESSAGES / 2 ; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message4-" + i);
+
+ prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ prod0.send(tm);
+
+ persistent = !persistent;
+ }
+
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message4-" + i);
+
+ prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ prod2.send(tm);
+
+ persistent = !persistent;
+ }
+
+ //consume them on node 2 - we will get messages from both nodes so the order is undefined
+
+ Set msgs = new HashSet();
+
+ TextMessage tm = null;
+
+ do
+ {
+ tm = (TextMessage)cons2.receive(1000);
+
+ if (tm != null)
+ {
+ msgs.add(tm.getText());
+ }
+ }
+ while (tm != null);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ assertTrue(msgs.contains("message4-" + i));
+ }
+
+ assertEquals(NUM_MESSAGES, msgs.size());
+
+ cons2.close();
+
+ sess2.close();
+
+ sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ cons2 = sess2.createConsumer(queue[2]);
+
+ Message msg = cons2.receive(5000);
+
+ assertNull(msg);
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ // Package private ---------------------------------------------
+
+ // protected ----------------------------------------------------
+
protected void setUp() throws Exception
{
nodeCount = 3;
@@ -271,7 +392,10 @@
super.tearDown();
}
- protected void clusteredQueue(boolean persistent) throws Exception
+ // private -----------------------------------------------------
+
+
+ private void clusteredQueue(boolean persistent) throws Exception
{
Connection conn0 = null;
Connection conn1 = null;
@@ -309,7 +433,7 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- TextMessage tm = sess0.createTextMessage("message" + i);
+ TextMessage tm = sess0.createTextMessage("message0-" + i);
prod0.send(tm);
}
@@ -320,7 +444,7 @@
assertNotNull(tm);
- assertEquals("message" + i, tm.getText());
+ assertEquals("message0-" + i, tm.getText());
}
Message m = cons0.receive(2000);
@@ -343,7 +467,7 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- TextMessage tm = sess1.createTextMessage("message" + i);
+ TextMessage tm = sess1.createTextMessage("message1-" + i);
prod1.send(tm);
}
@@ -354,7 +478,7 @@
assertNotNull(tm);
- assertEquals("message" + i, tm.getText());
+ assertEquals("message1-" + i, tm.getText());
}
m = cons0.receive(2000);
@@ -377,7 +501,7 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- TextMessage tm = sess2.createTextMessage("message" + i);
+ TextMessage tm = sess2.createTextMessage("message2-" + i);
prod2.send(tm);
}
@@ -388,7 +512,7 @@
assertNotNull(tm);
- assertEquals("message" + i, tm.getText());
+ assertEquals("message2-" + i, tm.getText());
}
m = cons0.receive(2000);
@@ -414,7 +538,7 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- TextMessage tm = sess0.createTextMessage("message2-" + i);
+ TextMessage tm = sess0.createTextMessage("message3-" + i);
prod0.send(tm);
}
@@ -427,7 +551,7 @@
assertNotNull(tm);
- assertEquals("message2-" + i, tm.getText());
+ assertEquals("message3-" + i, tm.getText());
}
m = cons2.receive(2000);
@@ -438,14 +562,14 @@
for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
- TextMessage tm = sess0.createTextMessage("message3-" + i);
+ TextMessage tm = sess0.createTextMessage("message4-" + i);
prod0.send(tm);
}
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
{
- TextMessage tm = sess1.createTextMessage("message3-" + i);
+ TextMessage tm = sess2.createTextMessage("message4-" + i);
prod2.send(tm);
}
@@ -462,8 +586,6 @@
if (tm != null)
{
- assertNotNull(tm);
-
msgs.add(tm.getText());
}
}
@@ -471,9 +593,13 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- assertTrue(msgs.contains("message3-" + i));
+ assertTrue(msgs.contains("message4-" + i));
}
+ assertEquals(NUM_MESSAGES, msgs.size());
+
+ msgs.clear();
+
// Now repeat but this time creating the consumer after send
cons2.close();
@@ -482,14 +608,14 @@
for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
- tm = sess0.createTextMessage("message3-" + i);
+ tm = sess0.createTextMessage("message5-" + i);
prod0.send(tm);
}
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
{
- tm = sess1.createTextMessage("message3-" + i);
+ tm = sess1.createTextMessage("message5-" + i);
prod2.send(tm);
}
@@ -506,8 +632,6 @@
if (tm != null)
{
- assertNotNull(tm);
-
msgs.add(tm.getText());
}
}
@@ -515,10 +639,14 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- assertTrue(msgs.contains("message3-" + i));
+ assertTrue(msgs.contains("message5-" + i));
}
+ assertEquals(NUM_MESSAGES, msgs.size());
+ msgs.clear();
+
+
//Now send messages at node 0 - but consume from node 1 AND node 2
//order is undefined
@@ -531,7 +659,7 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- tm = sess0.createTextMessage("message4-" + i);
+ tm = sess0.createTextMessage("message6-" + i);
prod0.send(tm);
}
@@ -568,11 +696,13 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- assertTrue(msgs.contains("message4-" + i));
+ assertTrue(msgs.contains("message6-" + i));
}
assertEquals(NUM_MESSAGES, count);
+ msgs.clear();
+
//as above but start consumers AFTER sending
cons1.close();
@@ -581,7 +711,7 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- tm = sess0.createTextMessage("message4-" + i);
+ tm = sess0.createTextMessage("message7-" + i);
prod0.send(tm);
}
@@ -623,12 +753,14 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- assertTrue(msgs.contains("message4-" + i));
+ assertTrue(msgs.contains("message7-" + i));
}
assertEquals(NUM_MESSAGES, count);
+ msgs.clear();
+
// Now send message on node 0, consume on node2, then cancel, consume on node1, cancel, consume on node 0
cons1.close();
@@ -643,7 +775,7 @@
for (int i = 0; i < NUM_MESSAGES; i++)
{
- tm = sess0.createTextMessage("message5-" + i);
+ tm = sess0.createTextMessage("message8-" + i);
prod0.send(tm);
}
@@ -654,7 +786,7 @@
assertNotNull(tm);
- assertEquals("message5-" + i, tm.getText());
+ assertEquals("message8-" + i, tm.getText());
}
sess2.close(); // messages should go back on queue
@@ -673,7 +805,7 @@
assertNotNull(tm);
- assertEquals("message5-" + i, tm.getText());
+ assertEquals("message8-" + i, tm.getText());
}
sess1.close(); // messages should go back on queue
@@ -688,9 +820,124 @@
assertNotNull(tm);
- assertEquals("message5-" + i, tm.getText());
- }
+ assertEquals("message8-" + i, tm.getText());
+ }
+
+ Message msg = cons0.receive(5000);
+
+ assertNull(msg);
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ private void messagePropertiesPreservedOnSuck(boolean persistent) throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ try
+ {
+
+ conn0 = this.createConnectionOnServer(cf, 0);
+ conn1 = this.createConnectionOnServer(cf, 1);
+ conn2 = this.createConnectionOnServer(cf, 2);
+
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+
+ conn0.start();
+ conn2.start();
+
+ // Send at node 0
+
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+
+
+ TextMessage tm = sess0.createTextMessage("blahmessage");
+
+ prod0.setPriority(7);
+
+ prod0.setTimeToLive(1 * 60 * 60 * 1000);
+
+ prod0.send(tm);
+
+ long expiration = tm.getJMSExpiration();
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+
+
+ tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("blahmessage", tm.getText());
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+ assertEquals(7, tm.getJMSPriority());
+
+ assertTrue(Math.abs(expiration - tm.getJMSExpiration()) < 100);
+ Message m = cons2.receive(5000);
+
+ assertNull(m);
+
+
+ //Now do one with expiration = 0
+
+
+ tm = sess0.createTextMessage("blahmessage2");
+
+ prod0.setPriority(7);
+
+ prod0.setTimeToLive(0);
+
+ prod0.send(tm);
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+
+
+ tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("blahmessage2", tm.getText());
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+ assertEquals(7, tm.getJMSPriority());
+
+ assertEquals(0, tm.getJMSExpiration());
+
+ m = cons2.receive(5000);
+
+ assertNull(m);
}
finally
{
@@ -713,7 +960,7 @@
/* Check that non clustered queues behave properly when deployed on a cluster */
- protected void localQueue(boolean persistent) throws Exception
+ private void localQueue(boolean persistent) throws Exception
{
Connection conn0 = null;
Connection conn1 = null;
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java 2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java 2007-07-09 15:35:01 UTC (rev 2859)
@@ -260,6 +260,8 @@
conn.close();
conn = null;
+ fail("this test is BS - it doesn't check it receives all the messages");
+
Iterator iter = list.msgs.iterator();
count = 0;
@@ -274,7 +276,7 @@
count++;
}
-
+
if (list.failed)
{
fail();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java 2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java 2007-07-09 15:35:01 UTC (rev 2859)
@@ -380,9 +380,7 @@
{
tm.acknowledge();
}
- }
-
-
+ }
}
finally
{
More information about the jboss-cvs-commits
mailing list