[jboss-cvs] JBoss Messaging SVN: r5412 - in trunk: tests/src/org/jboss/messaging/tests/integration/queue and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 21 04:52:35 EST 2008


Author: ataylor
Date: 2008-11-21 04:52:35 -0500 (Fri, 21 Nov 2008)
New Revision: 5412

Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
Log:
added tests for dlq and expiry and no binding check

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-11-21 07:13:04 UTC (rev 5411)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-11-21 09:52:35 UTC (rev 5412)
@@ -39,6 +39,8 @@
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.SimpleString;
 
+import java.util.List;
+
 /**
  * Implementation of a MessageReference
  *
@@ -165,7 +167,15 @@
       SimpleString deadLetterAddress = queueSettingsRepository.getMatch(queue.getName().toString()).getDeadLetterAddress();
       if (deadLetterAddress != null)
       {
-         move(deadLetterAddress, persistenceManager, postOffice, false);
+         List<Binding> bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
+         if(bindingList == null || bindingList.size() == 0)
+         {
+             log.warn("Message has exceeded max delivery attempts. No bindings for Dead Letter Address " + deadLetterAddress + " so dropping it");
+         }
+         else
+         {
+            move(deadLetterAddress, persistenceManager, postOffice, false);
+         }
       }
       else
       {
@@ -181,11 +191,19 @@
                       final PostOffice postOffice,
                       final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {
-      SimpleString expiryQueue = queueSettingsRepository.getMatch(queue.getName().toString()).getExpiryAddress();
+      SimpleString expiryAddress = queueSettingsRepository.getMatch(queue.getName().toString()).getExpiryAddress();
 
-      if (expiryQueue != null)
+      if (expiryAddress != null)
       {
-         move(expiryQueue, persistenceManager, postOffice, true);
+         List<Binding> bindingList = postOffice.getBindingsForAddress(expiryAddress);
+         if(bindingList == null || bindingList.size() == 0)
+         {
+             log.warn("Message has expired. No bindings for Expiry Address " + expiryAddress + " so dropping it");
+         }
+         else
+         {
+            move(expiryAddress, persistenceManager, postOffice, true);
+         }
       }
       else
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java	2008-11-21 07:13:04 UTC (rev 5411)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java	2008-11-21 09:52:35 UTC (rev 5412)
@@ -35,10 +35,14 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.jms.client.JBossMessage;
 
 import javax.transaction.xa.Xid;
 import javax.transaction.xa.XAResource;
+import java.util.Map;
+import java.util.HashMap;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -151,6 +155,85 @@
       clientConsumer.close();
    }
 
+   public void testHeadersSet() throws Exception
+   {
+      final int MAX_DELIVERIES = 16;
+      final int NUM_MESSAGES = 5;
+      Xid xid = new XidImpl("bq".getBytes(), 0, "gt".getBytes());
+      SimpleString dla = new SimpleString("DLA");
+      SimpleString qName = new SimpleString("q1");
+      QueueSettings queueSettings = new QueueSettings();
+      queueSettings.setMaxDeliveryAttempts(MAX_DELIVERIES);
+      queueSettings.setDeadLetterAddress(dla);
+      messagingService.getServer().getQueueSettingsRepository().addMatch(qName.toString(), queueSettings);
+      SimpleString dlq = new SimpleString("DLQ1");
+      clientSession.createQueue(dla, dlq, null, false, false, false);
+      clientSession.createQueue(qName, qName, null, false, false, false);
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      ClientSession sendSession = sessionFactory.createSession(false, true, true);
+      ClientProducer producer = sendSession.createProducer(qName);
+      Map<String, Long> origIds = new HashMap<String, Long>();
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         ClientMessage tm = createTextMessage("Message:" + i, clientSession);
+         producer.send(tm);
+      }
+
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      clientSession.start();
+
+      for (int i = 0; i < MAX_DELIVERIES; i++)
+      {
+         clientSession.start(xid, XAResource.TMNOFLAGS);
+         for (int j = 0; j < NUM_MESSAGES; j++)
+         {
+            ClientMessage tm = clientConsumer.receive(1000);
+
+            assertNotNull(tm);
+            tm.acknowledge();
+            if(i == 0)
+            {
+               origIds.put("Message:" + j, tm.getMessageID());
+            }
+            assertEquals("Message:" + j, tm.getBody().getString());
+         }
+         clientSession.end(xid, XAResource.TMSUCCESS);
+         clientSession.rollback(xid);
+      }
+
+      assertEquals(messagingService.getServer().getPostOffice().getBinding(qName).getQueue().getMessageCount(), 0);
+      ClientMessage m = clientConsumer.receive(1000);
+      assertNull(m);
+      //All the messages should now be in the DLQ
+
+      ClientConsumer cc3 = clientSession.createConsumer(dlq);
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         ClientMessage tm = cc3.receive(1000);
+
+         assertNotNull(tm);
+
+         String text = tm.getBody().getString();
+         assertEquals("Message:" + i, text);
+
+         // Check the headers
+         SimpleString origDest =
+               (SimpleString) tm.getProperty(MessageImpl.HDR_ORIGIN_QUEUE);
+
+         Long origMessageId =
+               (Long) tm.getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+
+         assertEquals(qName, origDest);
+
+         Long origId = origIds.get(text);
+
+         assertEquals(origId, origMessageId);
+      }
+
+   }
+
    @Override
    protected void setUp() throws Exception
    {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java	2008-11-21 07:13:04 UTC (rev 5411)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java	2008-11-21 09:52:35 UTC (rev 5412)
@@ -34,9 +34,17 @@
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
 import org.jboss.messaging.util.SimpleString;
 
+import javax.transaction.xa.Xid;
+import javax.transaction.xa.XAResource;
+import java.util.Map;
+import java.util.HashMap;
 
+
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */
@@ -128,6 +136,53 @@
       clientConsumer.close();
    }
 
+    public void testHeadersSet() throws Exception
+   {
+      final int NUM_MESSAGES = 5;
+      SimpleString ea = new SimpleString("DLA");
+      SimpleString qName = new SimpleString("q1");
+      QueueSettings queueSettings = new QueueSettings();
+      queueSettings.setExpiryAddress(ea);
+      messagingService.getServer().getQueueSettingsRepository().addMatch(qName.toString(), queueSettings);
+      SimpleString eq = new SimpleString("EA1");
+      clientSession.createQueue(ea, eq, null, false, false, false);
+      clientSession.createQueue(qName, qName, null, false, false, false);
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      ClientSession sendSession = sessionFactory.createSession(false, true, true);
+      ClientProducer producer = sendSession.createProducer(qName);
+
+         long expiration = System.currentTimeMillis();
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         ClientMessage tm = createTextMessage("Message:" + i, clientSession);
+         tm.setExpiration(expiration);
+         producer.send(tm);
+      }
+
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      clientSession.start();
+      ClientMessage m = clientConsumer.receive(1000);
+      assertNull(m);
+      //All the messages should now be in the EQ
+
+      ClientConsumer cc3 = clientSession.createConsumer(eq);
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         ClientMessage tm = cc3.receive(1000);
+
+         assertNotNull(tm);
+
+         String text = tm.getBody().getString();
+         assertEquals("Message:" + i, text);
+
+         // Check the headers
+         Long actualExpiryTime = (Long) tm.getProperty(HDR_ACTUAL_EXPIRY_TIME);
+         assertTrue(actualExpiryTime >= expiration);
+      }
+
+   }
+
    @Override
    protected void setUp() throws Exception
    {




More information about the jboss-cvs-commits mailing list