[jboss-cvs] JBoss Messaging SVN: r8426 - in branches/Branch_1_4: src/main/org/jboss/jms/server/endpoint and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 25 19:51:25 EDT 2011


Author: gaohoward
Date: 2011-08-25 19:51:25 -0400 (Thu, 25 Aug 2011)
New Revision: 8426

Modified:
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
Log:
JBMESSAGING-1844


Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-08-23 03:08:54 UTC (rev 8425)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-08-25 23:51:25 UTC (rev 8426)
@@ -212,12 +212,19 @@
                                     SessionDelegate connectionConsumerSession,
                                     boolean shouldAck)
       throws JMSException
-   {      
-      if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
+   {
+      if (isSucker)
       {
-         //Message has been cancelled
-         return;
+         log.trace("We don't check expiry and max delivery for a sucker");
       }
+      else
+      {
+         if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
+         {
+            //Message has been cancelled
+            return;
+         }
+      }
       
       DeliveryInfo deliveryInfo =
          new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, m.getSource());
@@ -324,6 +331,8 @@
    //JBMESSAGING-1876
    private long minTimeoutProcessTime;
 
+   private boolean isSucker = false;
+
    public int getBufferSize()
    {
       return buffer.size();
@@ -1313,6 +1322,12 @@
          mainLock.notifyAll();
       }
    }
+
+   public void setSucker(boolean b)
+   {
+      isSucker  = b;
+   }
+
 }
 
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2011-08-23 03:08:54 UTC (rev 8425)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2011-08-25 23:51:25 UTC (rev 8426)
@@ -232,8 +232,8 @@
 
          return null;
       }
-
-      if (ref.getMessage().isExpired())
+      
+      if ((!remote) && ref.getMessage().isExpired())
       {
          SimpleDelivery delivery = new SimpleDelivery(observer, ref, true, false);
 

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2011-08-23 03:08:54 UTC (rev 8425)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2011-08-25 23:51:25 UTC (rev 8426)
@@ -131,6 +131,8 @@
 		consumer = (ClientConsumerDelegate)sourcedel.createConsumerDelegate(jbq, null, false, null, false, false);
 		
 		clientConsumer = ((ConsumerState)consumer.getState()).getClientConsumer();
+		
+		clientConsumer.setSucker(true);
 								
 		consumer.setMessageListener(this);		
 		

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java	2011-08-23 03:08:54 UTC (rev 8425)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java	2011-08-25 23:51:25 UTC (rev 8426)
@@ -64,6 +64,7 @@
 
    protected static InitialContext[] ic;
    protected static Queue queue[];
+   protected static Queue defaultExpiryQueue[];
    protected static Topic topic[];
 
    // No need to have multiple connection factories since a clustered connection factory will create
@@ -76,6 +77,8 @@
    
    protected Long overrideNodeStateRefreshInterval = 500l;
    protected boolean withNewFailoverModel = false;
+   
+   protected boolean deployDefaultExpiryQueue = false;
 
    protected static ServiceAttributeOverrides currentOverrides;
 
@@ -132,6 +135,8 @@
       queue = new Queue[nodeCount];
 
       topic = new Topic[nodeCount];
+      
+      defaultExpiryQueue = new Queue[nodeCount];
 
       for (int i = 0; i < nodeCount; i++)
       {
@@ -150,6 +155,11 @@
          queue[i] = (Queue)ic[i].lookup("queue/testDistributedQueue");
 
          topic[i] = (Topic)ic[i].lookup("topic/testDistributedTopic");
+         
+         if (this.deployDefaultExpiryQueue)
+         {
+            defaultExpiryQueue[i] = (Queue)ic[i].lookup("queue/ExpiryQueue");
+         }
       }
 
       cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
@@ -239,6 +249,11 @@
       ServerManagement.start(serverNumber, config, attributes, cleanDatabase);
 
       log.info("deploying queue on node " + serverNumber);
+      
+      if (deployDefaultExpiryQueue)
+      {
+         ServerManagement.deployQueue("ExpiryQueue", serverNumber);
+      }
       ServerManagement.deployQueue("testDistributedQueue", serverNumber);
       ServerManagement.deployTopic("testDistributedTopic", serverNumber);
    }

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java	2011-08-23 03:08:54 UTC (rev 8425)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java	2011-08-25 23:51:25 UTC (rev 8426)
@@ -387,7 +387,64 @@
          }
       }
    }
+   
+   //JBMESSAGING-1844
+   public void testSuckExpired() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
 
+      try
+      {
+         conn0 = this.createConnectionOnServer(cf, 0);
+         conn1 = this.createConnectionOnServer(cf, 1);
+
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         //send 20 messages
+         final int NUM_MESSAGES = 20;
+
+         MessageProducer prod0 = sess0.createProducer(queue[0]);
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("expired-message" + i);
+            prod0.setTimeToLive(500);
+            prod0.send(tm);
+         }
+         
+         Thread.sleep(1000);
+         
+         //now messages are expired. if we receive from node1
+         //messages should be put to its expiry queue.
+         MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+         conn1.start();
+         
+         Message m = cons1.receive(1000);
+         assertNull(m);
+         
+         //now receive it from its expiry queue
+         MessageConsumer cons2 = sess1.createConsumer(defaultExpiryQueue[1]);
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            Message rm = cons2.receive(5000);
+            assertNotNull(rm);
+         }
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+      }
+   }
+
    //https://jira.jboss.org/browse/JBMESSAGING-1822
    //send 1000 messages to node 0, receive on node 1 as such
    // 1. when receiving, kill node1
@@ -522,7 +579,7 @@
    protected void setUp() throws Exception
    {
       nodeCount = 3;
-
+      deployDefaultExpiryQueue = true;
       super.setUp();
    }
 



More information about the jboss-cvs-commits mailing list