[jboss-cvs] JBoss Messaging SVN: r3186 - in trunk: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 9 14:34:46 EDT 2007


Author: timfox
Date: 2007-10-09 14:34:46 -0400 (Tue, 09 Oct 2007)
New Revision: 3186

Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1100


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-10-09 16:15:11 UTC (rev 3185)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-10-09 18:34:46 UTC (rev 3186)
@@ -264,6 +264,31 @@
             return delivery;
          }
          
+         if (noLocal)
+         {
+            String conId = ((JBossMessage) message).getConnectionID();
+
+            if (trace) { log.trace("message connection id: " + conId + " current connection connection id: " + sessionEndpoint.getConnectionEndpoint().getConnectionID()); }
+
+            if (sessionEndpoint.getConnectionEndpoint().getConnectionID().equals(conId))
+            {
+            	if (trace) { log.trace("Message from local connection so rejecting"); }
+            	
+            	try
+             	{
+             		delivery.acknowledge(null);
+             	}
+             	catch (Throwable t)
+             	{
+             		log.error("Failed to acknowledge delivery", t);
+             		
+             		return null;
+             	}
+             	
+             	return delivery;
+            }            
+         }
+                  
          if (slow)
          {
          	//If this is a slow consumer, we do not want to do any message buffering, so we immediately
@@ -307,20 +332,7 @@
             if (trace) { log.trace("message selector " + (accept ? "accepts " : "DOES NOT accept ") + "the message"); }
          }
       }
-
-      if (accept)
-      {
-         if (noLocal)
-         {
-            String conId = ((JBossMessage) msg).getConnectionID();
-
-            if (trace) { log.trace("message connection id: " + conId + " current connection connection id: " + sessionEndpoint.getConnectionEndpoint().getConnectionID()); }
-
-            accept = !sessionEndpoint.getConnectionEndpoint().getConnectionID().equals(conId);
-
-            if (trace) { log.trace("accepting? " + accept); }
-         }
-      }
+      
       return accept;
    }
 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2007-10-09 16:15:11 UTC (rev 3185)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2007-10-09 18:34:46 UTC (rev 3186)
@@ -2794,6 +2794,44 @@
          }
       }
    }
+   
+   public void testNoLocalMemoryExhaustion() throws Exception
+   {
+   	Connection conn = null;
+   	
+   	try
+   	{
+   		conn = cf.createConnection();
+   		
+   		Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+   		
+   		MessageProducer prod = sess.createProducer(topic1);
+   		
+   		MessageConsumer cons = sess.createConsumer(topic1, null, true);
+   		
+   		final int numMessages = 100;
+   		
+   		for (int i = 0; i < numMessages; i++)
+   		{
+   			prod.send(sess.createMessage());
+   		}
+   		
+   		conn.start();
+   		
+   		Message msg = cons.receive(3000);
+   		
+   		assertNull(msg);
+   		
+   		checkEmpty(topic1);
+   	}
+   	finally
+   	{
+   		if (conn != null)
+   		{
+   			conn.close();
+   		}
+   	}
+   }
 
    /*
     *




More information about the jboss-cvs-commits mailing list