[jboss-cvs] JBoss Messaging SVN: r2859 - in trunk: src/main/org/jboss/messaging/core/impl/clusterconnection and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jul 9 11:35:01 EDT 2007


Author: timfox
Date: 2007-07-09 11:35:01 -0400 (Mon, 09 Jul 2007)
New Revision: 2859

Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java
Log:
Fixed some more tests


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-07-09 15:35:01 UTC (rev 2859)
@@ -67,7 +67,6 @@
 import org.jboss.messaging.util.Util;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
-import org.jboss.util.id.GUID;
 
 /**
  * Concrete implementation of ConnectionEndpoint.
@@ -264,8 +263,6 @@
 
          log.debug("created and registered " + ep);
          
-         log.info("*********** CREATING SESSION WITH ID:" + sessionID);
-
          ClientSessionDelegate d = new ClientSessionDelegate(sessionID, dupsOKBatchSize);
 
          log.debug("created " + d);

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-07-09 15:35:01 UTC (rev 2859)
@@ -36,6 +36,7 @@
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
+import javax.jms.TextMessage;
 
 import org.jboss.aop.AspectManager;
 import org.jboss.jms.client.delegate.ClientBrowserDelegate;
@@ -1009,7 +1010,7 @@
    	if (trace) { log.trace("Collected " + map.size() + " deliveries"); }
    }
    
-   public void replicateDeliveryResponseReceived(long deliveryID) throws Exception
+   public synchronized void replicateDeliveryResponseReceived(long deliveryID) throws Exception
    {
    	//We look up the delivery in the list and actually perform the delivery
    	
@@ -1061,7 +1062,7 @@
    		{
    			toDeliver.take();
    			
-   			performDelivery(dr.del.getReference(), deliveryID, dr.getConsumer()); 
+   			performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer()); 
    			
    			delivered = true;
    	   	
@@ -1086,8 +1087,6 @@
     */
    void waitForDeliveriesFromConsumer(String consumerID) throws Exception
    {   
-   	log.info("Waiting for deliveries for consumer " + consumerID);
-   	
 		long toWait = CLOSE_TIMEOUT;
 		
 		boolean wait;
@@ -1136,14 +1135,11 @@
    			while (toDeliver.take() != null) {}
    			
    			log.warn("Timed out waiting for response to arrive");
-   		}
-   		
-   		
+   		}   		   		
    	}
-   	log.info("Done Waiting for deliveries for consumer " + consumerID);
    }
    
-   void handleDelivery(Delivery delivery, ServerConsumerEndpoint consumer) throws Exception
+   synchronized void handleDelivery(Delivery delivery, ServerConsumerEndpoint consumer) throws Exception
    {
    	 long deliveryId = -1;
    	 
@@ -1151,12 +1147,13 @@
    	 
    	 DeliveryRecord rec = null;
    	 
+   	 deliveryId = deliveryIdSequence.increment();   	 
+   	 
    	 //TODO can't we combine flags isRetainDeliveries and isReplicating - surely they're mutually exclusive?
        if (consumer.isRetainDeliveries())
-       {
+       {      	 
       	 // Add a delivery
-      	 deliveryId = deliveryIdSequence.increment();
-      	 
+
       	 rec = new DeliveryRecord(delivery, consumer, deliveryId);
           
           deliveries.put(new Long(deliveryId), rec);

Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2007-07-09 15:35:01 UTC (rev 2859)
@@ -268,8 +268,18 @@
 				((MessageProxy)msg).getMessage().putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
 			}
 			
-			producer.send(null, msg, -1, -1, Long.MIN_VALUE);
+			long timeToLive = msg.getJMSExpiration();
+			if (timeToLive != 0)
+			{
+				timeToLive -=  System.currentTimeMillis();
+				if (timeToLive <= 0)
+				{
+					timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
+				}
+			}
 			
+			producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive);
+			
 			if (trace) { log.trace(this + " forwarded message to queue"); }
 
 			if (startTx)

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-09 15:35:01 UTC (rev 2859)
@@ -2638,35 +2638,40 @@
 	   		
 				//FIXME - this is ugly
 				//Find a better way of getting the sessions
-				
-				Collection sessions = serverPeer.getSessions();
-				
-				Iterator iter2 = sessions.iterator();
-				
-				while (iter2.hasNext())
-				{
-					ServerSessionEndpoint session = (ServerSessionEndpoint)iter2.next();
+	   		//We shouldn't know abou the server peer
+	   		
+	   		if (serverPeer != null)
+	   		{
 					
-					session.collectDeliveries(deliveries, firstNode);				
-				}   				  
-				
-				if (!firstNode)
-				{			
-		   		PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(failoverNodeID));
-		   		
-		   		if (info == null)
-		   		{
-		   			throw new IllegalStateException("Cannot find address for failover node " + failoverNodeID);
-		   		}		   		
+					Collection sessions = serverPeer.getSessions();
 					
-					ClusterRequest request = new AddAllReplicatedDeliveriesMessage(thisNodeID, deliveries);
+					Iterator iter2 = sessions.iterator();
 					
-					//send sync
+					while (iter2.hasNext())
+					{
+						ServerSessionEndpoint session = (ServerSessionEndpoint)iter2.next();
+						
+						session.collectDeliveries(deliveries, firstNode);				
+					}   				  
 					
-					groupMember.unicastControl(request, info.getControlChannelAddress(), true);
-		   		
-		   		if (trace) { log.trace("Sent AddAllReplicatedDeliveriesMessage"); }
-				}
+					if (!firstNode)
+					{			
+			   		PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(failoverNodeID));
+			   		
+			   		if (info == null)
+			   		{
+			   			throw new IllegalStateException("Cannot find address for failover node " + failoverNodeID);
+			   		}		   		
+						
+						ClusterRequest request = new AddAllReplicatedDeliveriesMessage(thisNodeID, deliveries);
+						
+						//send sync
+						
+						groupMember.unicastControl(request, info.getControlChannelAddress(), true);
+			   		
+			   		if (trace) { log.trace("Sent AddAllReplicatedDeliveriesMessage"); }
+					}
+	   		}
 	   	}
    	}
    	finally

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/build.xml	2007-07-09 15:35:01 UTC (rev 2859)
@@ -106,9 +106,9 @@
    <property name="junit.haltonfailure" value="false"/>
    <property name="junit.fork" value="true"/>
    <property name="junit.includeantruntime" value="true"/>
-   <property name="junit.timeout" value="1800000"/>
-   <property name="clustering.junit.timeout" value="1800000"/>
-   <property name="stress.timeout" value="1800000"/>
+   <property name="junit.timeout" value="1200000"/>
+   <property name="clustering.junit.timeout" value="1200000"/>
+   <property name="stress.timeout" value="1200000"/>
 
    <property name="junit.showoutput" value="true"/>
    <property name="junit.jvm" value=""/>

Modified: trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java	2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java	2007-07-09 15:35:01 UTC (rev 2859)
@@ -773,7 +773,9 @@
          ServerManagement.deployQueue("QA");
          
          ServerManagement.deployQueue("QB");
-                           
+         
+         cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
+                  
          conn3 = cf.createXAConnection();
          
          XASession sess3 = conn3.createXASession();
@@ -1042,6 +1044,8 @@
          
          ServerManagement.deployQueue("QB");
                            
+         cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");         
+         
          conn3 = cf.createXAConnection();
          
          XASession sess3 = conn3.createXASession();
@@ -1608,6 +1612,8 @@
          
          //Now recover
          
+         cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");         
+         
          conn3 = cf.createXAConnection();
          
          XASession sess3 = conn3.createXASession();
@@ -1911,6 +1917,7 @@
          
          ServerManagement.deployQueue("TXQ");
          
+         cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");         
          
          //Now recover
          
@@ -2390,6 +2397,7 @@
          
          ServerManagement.deployTopic("TXTOPIC");
          
+         cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");         
                            
          conn3 = cf.createXAConnection();
          
@@ -2608,6 +2616,8 @@
          ServerManagement.deployQueue("TXQ");
    
          //Try and recover
+         
+         cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");         
    
          XAResource res = cf.createXAConnection().createXASession().getXAResource();
    
@@ -2795,6 +2805,7 @@
    
          ServerManagement.deployQueue("QA");
    
+         cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");         
    
          XAResource res = cf.createXAConnection().createXASession().getXAResource();
    
@@ -2954,6 +2965,7 @@
    
          ServerManagement.deployQueue("Queue");
    
+         cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");         
    
          XAResource res = cf.createXAConnection().createXASession().getXAResource();
    
@@ -3095,8 +3107,8 @@
    
          ServerManagement.deployQueue("Queue");
    
+         cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");         
    
-   
          XAResource res = cf.createXAConnection().createXASession().getXAResource();
    
          Xid[] xids = res.recover(XAResource.TMSTARTRSCAN);
@@ -3243,7 +3255,8 @@
          ServerManagement.deployQueue("QB");
    
    
-   
+         cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
+         
          XAResource res = cf.createXAConnection().createXASession().getXAResource();
    
          Xid[] xids = res.recover(XAResource.TMSTARTRSCAN);

Modified: trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java	2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java	2007-07-09 15:35:01 UTC (rev 2859)
@@ -1225,8 +1225,13 @@
          ServerManagement.startServerPeer();
 
          ServerManagement.deployQueue("Queue");
+         
+         cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
+         
+         conn1.close();
+         
+         conn1 = cf.createXAConnection();
 
-
          XAResource res = cf.createXAConnection().createXASession().getXAResource();
 
          Xid[] xids = res.recover(XAResource.TMSTARTRSCAN);

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java	2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java	2007-07-09 15:35:01 UTC (rev 2859)
@@ -64,6 +64,16 @@
    }
 
    // Public --------------------------------------------------------
+   
+   public void testMessagePropertiesPreservedOnSuckPersistent() throws Exception
+   {
+   	this.messagePropertiesPreservedOnSuck(true);
+   }
+   
+   public void testMessagePropertiesPreservedOnSuckNonPersistent() throws Exception
+   {
+   	this.messagePropertiesPreservedOnSuck(false);
+   }
 
    public void testClusteredQueueNonPersistent() throws Exception
    {
@@ -83,9 +93,8 @@
    public void testLocalPersistent() throws Exception
    {
       localQueue(true);
-   }
+   }   
    
-   
    public void testWithConnectionsOnAllNodesClientAck() throws Exception
    {
    	JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
@@ -252,11 +261,123 @@
          }
       }
    }
+   
+   public void testMixedSuck() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
 
-   // Package protected ---------------------------------------------
+      try
+      {
 
-   // Protected -----------------------------------------------------
+         conn0 = this.createConnectionOnServer(cf, 0);
+         conn1 = this.createConnectionOnServer(cf, 1);
+         conn2 = this.createConnectionOnServer(cf, 2);
+         
+         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
 
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+         
+         conn0.start();
+         conn2.start();
+
+         final int NUM_MESSAGES = 300;
+
+         
+         // Send at node 0
+
+         MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+         MessageProducer prod2 = sess2.createProducer(queue[2]);
+
+         //Send more messages at node 0 and node 2
+         
+         boolean persistent = false;
+         for (int i = 0; i < NUM_MESSAGES / 2 ; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("message4-" + i);
+
+            prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+            
+            prod0.send(tm);
+            
+            persistent = !persistent;
+         }
+         
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message4-" + i);
+
+            prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+                        
+            prod2.send(tm);
+            
+            persistent = !persistent;
+         }
+         
+         //consume them on node 2 - we will get messages from both nodes so the order is undefined
+         
+         Set msgs = new HashSet();
+         
+         TextMessage tm = null;
+         
+         do
+         {
+            tm = (TextMessage)cons2.receive(1000);
+            
+            if (tm != null)
+            {                     
+	            msgs.add(tm.getText());
+            }
+         }           
+         while (tm != null);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+         	assertTrue(msgs.contains("message4-" + i));
+         }
+         
+         assertEquals(NUM_MESSAGES, msgs.size());
+                
+         cons2.close();
+         
+         sess2.close();
+         
+         sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         
+         cons2 = sess2.createConsumer(queue[2]);
+                  
+         Message msg = cons2.receive(5000);
+         
+         assertNull(msg);                            
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   // Package private ---------------------------------------------
+   
+   // protected ----------------------------------------------------
+   
    protected void setUp() throws Exception
    {
       nodeCount = 3;
@@ -271,7 +392,10 @@
       super.tearDown();
    }
 
-   protected void clusteredQueue(boolean persistent) throws Exception
+   // private -----------------------------------------------------
+
+
+   private void clusteredQueue(boolean persistent) throws Exception
    {
       Connection conn0 = null;
       Connection conn1 = null;
@@ -309,7 +433,7 @@
 
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            TextMessage tm = sess0.createTextMessage("message" + i);
+            TextMessage tm = sess0.createTextMessage("message0-" + i);
 
             prod0.send(tm);
          }
@@ -320,7 +444,7 @@
 
             assertNotNull(tm);
             
-            assertEquals("message" + i, tm.getText());
+            assertEquals("message0-" + i, tm.getText());
          }                 
 
          Message m = cons0.receive(2000);
@@ -343,7 +467,7 @@
 
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            TextMessage tm = sess1.createTextMessage("message" + i);
+            TextMessage tm = sess1.createTextMessage("message1-" + i);
 
             prod1.send(tm);
          }
@@ -354,7 +478,7 @@
 
             assertNotNull(tm);
 
-            assertEquals("message" + i, tm.getText());
+            assertEquals("message1-" + i, tm.getText());
          }
 
          m = cons0.receive(2000);
@@ -377,7 +501,7 @@
 
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            TextMessage tm = sess2.createTextMessage("message" + i);
+            TextMessage tm = sess2.createTextMessage("message2-" + i);
 
             prod2.send(tm);
          }
@@ -388,7 +512,7 @@
 
             assertNotNull(tm);
 
-            assertEquals("message" + i, tm.getText());
+            assertEquals("message2-" + i, tm.getText());
          }
 
          m = cons0.receive(2000);
@@ -414,7 +538,7 @@
          
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            TextMessage tm = sess0.createTextMessage("message2-" + i);
+            TextMessage tm = sess0.createTextMessage("message3-" + i);
 
             prod0.send(tm);
          }
@@ -427,7 +551,7 @@
                   
             assertNotNull(tm);
             
-            assertEquals("message2-" + i, tm.getText());
+            assertEquals("message3-" + i, tm.getText());
          }                 
 
          m = cons2.receive(2000);
@@ -438,14 +562,14 @@
          
          for (int i = 0; i < NUM_MESSAGES / 2; i++)
          {
-            TextMessage tm = sess0.createTextMessage("message3-" + i);
+            TextMessage tm = sess0.createTextMessage("message4-" + i);
 
             prod0.send(tm);
          }
          
          for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
          {
-            TextMessage tm = sess1.createTextMessage("message3-" + i);
+            TextMessage tm = sess2.createTextMessage("message4-" + i);
 
             prod2.send(tm);
          }
@@ -462,8 +586,6 @@
             
             if (tm != null)
             {                     
-	            assertNotNull(tm);
-	            
 	            msgs.add(tm.getText());
             }
          }           
@@ -471,9 +593,13 @@
 
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-         	assertTrue(msgs.contains("message3-" + i));
+         	assertTrue(msgs.contains("message4-" + i));
          }
          
+         assertEquals(NUM_MESSAGES, msgs.size());
+         
+         msgs.clear();
+         
          // Now repeat but this time creating the consumer after send
          
          cons2.close();
@@ -482,14 +608,14 @@
          
          for (int i = 0; i < NUM_MESSAGES / 2; i++)
          {
-            tm = sess0.createTextMessage("message3-" + i);
+            tm = sess0.createTextMessage("message5-" + i);
 
             prod0.send(tm);
          }
          
          for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
          {
-            tm = sess1.createTextMessage("message3-" + i);
+            tm = sess1.createTextMessage("message5-" + i);
 
             prod2.send(tm);
          }
@@ -506,8 +632,6 @@
             
             if (tm != null)
             {            
-	            assertNotNull(tm);
-	            
 	            msgs.add(tm.getText());
             }
          }     
@@ -515,10 +639,14 @@
 
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-         	assertTrue(msgs.contains("message3-" + i));
+         	assertTrue(msgs.contains("message5-" + i));
          }
          
+         assertEquals(NUM_MESSAGES, msgs.size());
          
+         msgs.clear();
+         
+         
          //Now send messages at node 0 - but consume from node 1 AND node 2
          
          //order is undefined
@@ -531,7 +659,7 @@
          
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            tm = sess0.createTextMessage("message4-" + i);
+            tm = sess0.createTextMessage("message6-" + i);
 
             prod0.send(tm);
          }
@@ -568,11 +696,13 @@
          
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-         	assertTrue(msgs.contains("message4-" + i));
+         	assertTrue(msgs.contains("message6-" + i));
          }
          
          assertEquals(NUM_MESSAGES, count);
          
+         msgs.clear();
+         
          //as above but start consumers AFTER sending
          
          cons1.close();
@@ -581,7 +711,7 @@
          
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            tm = sess0.createTextMessage("message4-" + i);
+            tm = sess0.createTextMessage("message7-" + i);
 
             prod0.send(tm);
          }
@@ -623,12 +753,14 @@
          
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-         	assertTrue(msgs.contains("message4-" + i));
+         	assertTrue(msgs.contains("message7-" + i));
          }
          
          assertEquals(NUM_MESSAGES, count);         
          
+         msgs.clear();
          
+         
          // Now send message on node 0, consume on node2, then cancel, consume on node1, cancel, consume on node 0
          
          cons1.close();
@@ -643,7 +775,7 @@
          
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            tm = sess0.createTextMessage("message5-" + i);
+            tm = sess0.createTextMessage("message8-" + i);
 
             prod0.send(tm);
          }
@@ -654,7 +786,7 @@
             
             assertNotNull(tm);
                
-            assertEquals("message5-" + i, tm.getText());
+            assertEquals("message8-" + i, tm.getText());
          } 
          
          sess2.close(); // messages should go back on queue
@@ -673,7 +805,7 @@
             
             assertNotNull(tm);
                
-            assertEquals("message5-" + i, tm.getText());
+            assertEquals("message8-" + i, tm.getText());
          } 
          
          sess1.close(); // messages should go back on queue
@@ -688,9 +820,124 @@
             
             assertNotNull(tm);
                
-            assertEquals("message5-" + i, tm.getText());
-         }                  
+            assertEquals("message8-" + i, tm.getText());
+         }     
+         
+         Message msg = cons0.receive(5000);
+         
+         assertNull(msg);                          
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+   
+   private void messagePropertiesPreservedOnSuck(boolean persistent) throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+
+      try
+      {
+
+         conn0 = this.createConnectionOnServer(cf, 0);
+         conn1 = this.createConnectionOnServer(cf, 1);
+         conn2 = this.createConnectionOnServer(cf, 2);
+         
+         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+         
+         conn0.start();
+         conn2.start();
+
+         // Send at node 0
+
+         MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+
+
+         TextMessage tm = sess0.createTextMessage("blahmessage");
+            
+         prod0.setPriority(7);
+         
+         prod0.setTimeToLive(1 * 60 * 60 * 1000);
+
+         prod0.send(tm);
+         
+         long expiration = tm.getJMSExpiration();
+         
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+         
+                         
+
+         tm = (TextMessage)cons2.receive(1000);
+         
+         assertNotNull(tm);
+         
+         assertEquals("blahmessage", tm.getText());
+
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+         
+         assertEquals(7, tm.getJMSPriority());
+        
+         assertTrue(Math.abs(expiration - tm.getJMSExpiration()) < 100);
                   
+         Message m = cons2.receive(5000);
+         
+         assertNull(m);
+         
+         
+         //Now do one with expiration = 0
+         
+         
+         tm = sess0.createTextMessage("blahmessage2");
+         
+         prod0.setPriority(7);
+         
+         prod0.setTimeToLive(0);
+
+         prod0.send(tm);
+         
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+         
+                         
+
+         tm = (TextMessage)cons2.receive(1000);
+         
+         assertNotNull(tm);
+         
+         assertEquals("blahmessage2", tm.getText());
+
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+         
+         assertEquals(7, tm.getJMSPriority());
+        
+         assertEquals(0, tm.getJMSExpiration());
+                  
+         m = cons2.receive(5000);
+         
+         assertNull(m);                          
       }
       finally
       {
@@ -713,7 +960,7 @@
    
    
    /* Check that non clustered queues behave properly when deployed on a cluster */
-   protected void localQueue(boolean persistent) throws Exception
+   private void localQueue(boolean persistent) throws Exception
    {
    	Connection conn0 = null;
       Connection conn1 = null;

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java	2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java	2007-07-09 15:35:01 UTC (rev 2859)
@@ -260,6 +260,8 @@
          conn.close();
          conn = null;
          
+         fail("this test is BS - it doesn't check it receives all the messages");
+         
          Iterator iter = list.msgs.iterator();
          
          count = 0;
@@ -274,7 +276,7 @@
             
             count++;
          }
-         
+             
          if (list.failed)
          {
             fail();

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java	2007-07-09 01:12:36 UTC (rev 2858)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/PreserveOrderingTest.java	2007-07-09 15:35:01 UTC (rev 2859)
@@ -380,9 +380,7 @@
             {
             	tm.acknowledge();
             }
-         }     
-         
-     
+         }                   
       }
       finally
       {




More information about the jboss-cvs-commits mailing list