[jboss-cvs] JBoss Messaging SVN: r5412 - in trunk: tests/src/org/jboss/messaging/tests/integration/queue and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 21 04:52:35 EST 2008
Author: ataylor
Date: 2008-11-21 04:52:35 -0500 (Fri, 21 Nov 2008)
New Revision: 5412
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
Log:
added tests for dlq and expiry and no binding check
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-21 07:13:04 UTC (rev 5411)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-21 09:52:35 UTC (rev 5412)
@@ -39,6 +39,8 @@
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.SimpleString;
+import java.util.List;
+
/**
* Implementation of a MessageReference
*
@@ -165,7 +167,15 @@
SimpleString deadLetterAddress = queueSettingsRepository.getMatch(queue.getName().toString()).getDeadLetterAddress();
if (deadLetterAddress != null)
{
- move(deadLetterAddress, persistenceManager, postOffice, false);
+ List<Binding> bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
+ if(bindingList == null || bindingList.size() == 0)
+ {
+ log.warn("Message has exceeded max delivery attempts. No bindings for Dead Letter Address " + deadLetterAddress + " so dropping it");
+ }
+ else
+ {
+ move(deadLetterAddress, persistenceManager, postOffice, false);
+ }
}
else
{
@@ -181,11 +191,19 @@
final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
- SimpleString expiryQueue = queueSettingsRepository.getMatch(queue.getName().toString()).getExpiryAddress();
+ SimpleString expiryAddress = queueSettingsRepository.getMatch(queue.getName().toString()).getExpiryAddress();
- if (expiryQueue != null)
+ if (expiryAddress != null)
{
- move(expiryQueue, persistenceManager, postOffice, true);
+ List<Binding> bindingList = postOffice.getBindingsForAddress(expiryAddress);
+ if(bindingList == null || bindingList.size() == 0)
+ {
+ log.warn("Message has expired. No bindings for Expiry Address " + expiryAddress + " so dropping it");
+ }
+ else
+ {
+ move(expiryAddress, persistenceManager, postOffice, true);
+ }
}
else
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java 2008-11-21 07:13:04 UTC (rev 5411)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java 2008-11-21 09:52:35 UTC (rev 5412)
@@ -35,10 +35,14 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.jms.client.JBossMessage;
import javax.transaction.xa.Xid;
import javax.transaction.xa.XAResource;
+import java.util.Map;
+import java.util.HashMap;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -151,6 +155,85 @@
clientConsumer.close();
}
+ public void testHeadersSet() throws Exception
+ {
+ final int MAX_DELIVERIES = 16;
+ final int NUM_MESSAGES = 5;
+ Xid xid = new XidImpl("bq".getBytes(), 0, "gt".getBytes());
+ SimpleString dla = new SimpleString("DLA");
+ SimpleString qName = new SimpleString("q1");
+ QueueSettings queueSettings = new QueueSettings();
+ queueSettings.setMaxDeliveryAttempts(MAX_DELIVERIES);
+ queueSettings.setDeadLetterAddress(dla);
+ messagingService.getServer().getQueueSettingsRepository().addMatch(qName.toString(), queueSettings);
+ SimpleString dlq = new SimpleString("DLQ1");
+ clientSession.createQueue(dla, dlq, null, false, false, false);
+ clientSession.createQueue(qName, qName, null, false, false, false);
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ ClientSession sendSession = sessionFactory.createSession(false, true, true);
+ ClientProducer producer = sendSession.createProducer(qName);
+ Map<String, Long> origIds = new HashMap<String, Long>();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = createTextMessage("Message:" + i, clientSession);
+ producer.send(tm);
+ }
+
+ ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+ clientSession.start();
+
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ for (int j = 0; j < NUM_MESSAGES; j++)
+ {
+ ClientMessage tm = clientConsumer.receive(1000);
+
+ assertNotNull(tm);
+ tm.acknowledge();
+ if(i == 0)
+ {
+ origIds.put("Message:" + j, tm.getMessageID());
+ }
+ assertEquals("Message:" + j, tm.getBody().getString());
+ }
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.rollback(xid);
+ }
+
+ assertEquals(messagingService.getServer().getPostOffice().getBinding(qName).getQueue().getMessageCount(), 0);
+ ClientMessage m = clientConsumer.receive(1000);
+ assertNull(m);
+ //All the messages should now be in the DLQ
+
+ ClientConsumer cc3 = clientSession.createConsumer(dlq);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = cc3.receive(1000);
+
+ assertNotNull(tm);
+
+ String text = tm.getBody().getString();
+ assertEquals("Message:" + i, text);
+
+ // Check the headers
+ SimpleString origDest =
+ (SimpleString) tm.getProperty(MessageImpl.HDR_ORIGIN_QUEUE);
+
+ Long origMessageId =
+ (Long) tm.getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+
+ assertEquals(qName, origDest);
+
+ Long origId = origIds.get(text);
+
+ assertEquals(origId, origMessageId);
+ }
+
+ }
+
@Override
protected void setUp() throws Exception
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java 2008-11-21 07:13:04 UTC (rev 5411)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java 2008-11-21 09:52:35 UTC (rev 5412)
@@ -34,9 +34,17 @@
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
import org.jboss.messaging.util.SimpleString;
+import javax.transaction.xa.Xid;
+import javax.transaction.xa.XAResource;
+import java.util.Map;
+import java.util.HashMap;
+
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
@@ -128,6 +136,53 @@
clientConsumer.close();
}
+ public void testHeadersSet() throws Exception
+ {
+ final int NUM_MESSAGES = 5;
+ SimpleString ea = new SimpleString("DLA");
+ SimpleString qName = new SimpleString("q1");
+ QueueSettings queueSettings = new QueueSettings();
+ queueSettings.setExpiryAddress(ea);
+ messagingService.getServer().getQueueSettingsRepository().addMatch(qName.toString(), queueSettings);
+ SimpleString eq = new SimpleString("EA1");
+ clientSession.createQueue(ea, eq, null, false, false, false);
+ clientSession.createQueue(qName, qName, null, false, false, false);
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ ClientSession sendSession = sessionFactory.createSession(false, true, true);
+ ClientProducer producer = sendSession.createProducer(qName);
+
+ long expiration = System.currentTimeMillis();
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = createTextMessage("Message:" + i, clientSession);
+ tm.setExpiration(expiration);
+ producer.send(tm);
+ }
+
+ ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+ clientSession.start();
+ ClientMessage m = clientConsumer.receive(1000);
+ assertNull(m);
+ //All the messages should now be in the EQ
+
+ ClientConsumer cc3 = clientSession.createConsumer(eq);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = cc3.receive(1000);
+
+ assertNotNull(tm);
+
+ String text = tm.getBody().getString();
+ assertEquals("Message:" + i, text);
+
+ // Check the headers
+ Long actualExpiryTime = (Long) tm.getProperty(HDR_ACTUAL_EXPIRY_TIME);
+ assertTrue(actualExpiryTime >= expiration);
+ }
+
+ }
+
@Override
protected void setUp() throws Exception
{
More information about the jboss-cvs-commits
mailing list