[jboss-cvs] JBoss Messaging SVN: r1561 - in branches/Branch_1_0_CP1: src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/jms/server src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/plugin src/main/org/jboss/messaging/core/tx tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Nov 14 09:08:51 EST 2006


Author: timfox
Date: 2006-11-14 09:08:40 -0500 (Tue, 14 Nov 2006)
New Revision: 1561

Added:
   branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java
Modified:
   branches/Branch_1_0_CP1/src/etc/server/default/deploy/messaging-service.xml
   branches/Branch_1_0_CP1/src/etc/xmdesc/ServerPeer-xmbean.xml
   branches/Branch_1_0_CP1/src/etc/xmdesc/Topic-xmbean.xml
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java
   branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/Transaction.java
   branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
Log:
Interim patch commit


Modified: branches/Branch_1_0_CP1/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/server/default/deploy/messaging-service.xml	2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/etc/server/default/deploy/messaging-service.xml	2006-11-14 14:08:40 UTC (rev 1561)
@@ -32,6 +32,8 @@
             <role name="guest" read="true" write="true" create="true"/>
         </security>
       </attribute>
+      <attribute name="MaxDeliveryAttempts">10</attribute>
+      <attribute name="DLQName">DLQ</attribute>
    </mbean>
 
    <!-- Plug-ins -->

Modified: branches/Branch_1_0_CP1/src/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/xmdesc/ServerPeer-xmbean.xml	2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/etc/xmdesc/ServerPeer-xmbean.xml	2006-11-14 14:08:40 UTC (rev 1561)
@@ -139,6 +139,18 @@
       <name>QueuedExecutorPoolSize</name>
       <type>int</type>
    </attribute>
+   
+   <attribute access="read-write" getMethod="getMaxDeliveryAttempts" setMethod="setMaxDeliveryAttempts">
+      <description>The maximum delivery attempts for destinations</description>
+      <name>MaxDeliveryAttempts</name>
+      <type>int</type>
+   </attribute>  
+   
+   <attribute access="read-write" getMethod="getDLQName" setMethod="setDLQName">
+      <description>The JNDI name of the DLQ</description>
+      <name>DLQName</name>
+      <type>java.lang.String</type>
+   </attribute>     
 
    <!-- Managed operations -->
 

Modified: branches/Branch_1_0_CP1/src/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/xmdesc/Topic-xmbean.xml	2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/etc/xmdesc/Topic-xmbean.xml	2006-11-14 14:08:40 UTC (rev 1561)
@@ -87,7 +87,7 @@
       <name>DownCacheSize</name>
       <type>int</type>
    </attribute>
-
+   
    <!-- Managed operations -->
 
    <operation>

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java	2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java	2006-11-14 14:08:40 UTC (rev 1561)
@@ -35,6 +35,8 @@
 import javax.naming.NamingException;
 
 import org.jboss.aop.AspectXmlLoader;
+import org.jboss.jms.destination.JBossDestination;
+import org.jboss.jms.destination.JBossQueue;
 import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
 import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
 import org.jboss.jms.server.connectormanager.SimpleConnectorManager;
@@ -45,6 +47,7 @@
 import org.jboss.jms.server.security.SecurityMetadataStore;
 import org.jboss.jms.util.ExceptionUtil;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.local.CoreDestination;
 import org.jboss.messaging.core.memory.MemoryManager;
 import org.jboss.messaging.core.memory.SimpleMemoryManager;
 import org.jboss.messaging.core.plugin.IdManager;
@@ -99,6 +102,10 @@
    protected boolean started;
 
    protected int objectIDSequence = Integer.MIN_VALUE + 1;
+   
+   private int maxDeliveryAttempts = 10;
+   
+   private String dlqName;
 
    // wired components
 
@@ -154,7 +161,7 @@
 
       version = Version.instance();
 
-      started = false;
+      started = false;            
    }
 
    // ServiceMBeanSupport overrides ---------------------------------
@@ -163,6 +170,9 @@
    {
       try
       {
+         
+         log.info("starting serverpeer");
+         
          if (started)
          {
             return;
@@ -219,7 +229,7 @@
          initializeRemoting(mbeanServer);
    
          createRecoverable();
-   
+         
          started = true;
    
          log.info("JBoss Messaging " + getVersion().getProviderVersion() + " server [" +
@@ -272,6 +282,26 @@
    }
 
    // JMX Attributes ------------------------------------------------
+   
+   public String getDLQName()
+   {
+      return dlqName;
+   }
+   
+   public void setDLQName(String dlqName)
+   {
+      this.dlqName = dlqName;
+   }
+   
+   public int getMaxDeliveryAttempts()
+   {
+      return maxDeliveryAttempts;
+   }
+   
+   public void setMaxDeliveryAttempts(int attempts)
+   {
+      this.maxDeliveryAttempts = attempts;
+   }
 
    public ObjectName getPersistenceManager()
    {
@@ -485,6 +515,34 @@
    }
 
    // Public --------------------------------------------------------
+   
+   public CoreDestination getDLQ() throws Exception
+   {
+      if (dlqName == null)
+      {
+         //No DLQ name specified so there is no DLQ
+         return null;
+      }
+      
+      CoreDestination dlq = channelMapper.getCoreDestination(new JBossQueue(dlqName));
+      
+//      if (dlq == null)
+//      {
+//         //DLQ not deployed - so deploy default one
+//         log.info("DLQ not deployed so deploying default one");
+//         
+//         createDestinationDefault(true, dlqName, null);
+//         
+//         dlq = channelMapper.getCoreDestination(new JBossQueue(dlqName));
+//         
+//         if (dlq == null)
+//         {
+//            throw new IllegalStateException("Cannot find dlq!");
+//         }         
+//      }
+      
+      return dlq;
+   }
 
    public boolean isDeployed(boolean isQueue, String name)
    {

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2006-11-14 14:08:40 UTC (rev 1561)
@@ -84,7 +84,7 @@
 
    // Down-cache size
    private int downCacheSize = DOWN_CACHE_SIZE;
-
+   
    // Constructors --------------------------------------------------
 
    public DestinationServiceSupport(boolean createdProgrammatically)
@@ -110,10 +110,10 @@
             throw new IllegalStateException( "The " + (isQueue() ? "queue" : "topic") + " " +
                                              "name was not properly set in the service's" +
                                              "ObjectName");
-         }
+         }                  
    
          ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
-
+         
          dm = serverPeer.getDestinationManager();
          sm = serverPeer.getSecurityManager();
          cm = serverPeer.getChannelMapperDelegate();
@@ -156,7 +156,7 @@
    }
 
    // JMX managed attributes ----------------------------------------
-
+   
    public String getJNDIName()
    {
       return jndiName;

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-11-14 14:08:40 UTC (rev 1561)
@@ -53,8 +53,10 @@
 import org.jboss.messaging.core.Routable;
 import org.jboss.messaging.core.SimpleDelivery;
 import org.jboss.messaging.core.SingleReceiverDelivery;
+import org.jboss.messaging.core.local.CoreDestination;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TransactionException;
+import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.messaging.core.tx.TxCallback;
 import org.jboss.messaging.util.Future;
 
@@ -79,10 +81,8 @@
 
    private static final Logger log = Logger.getLogger(ServerConsumerEndpoint.class);
 
-   // Static --------------------------------------------------------
+   // Static --------------------------------------------------------  
 
-   private static final int MAX_DELIVERY_ATTEMPTS = 10;
-
    // Attributes ----------------------------------------------------
 
    private boolean trace = log.isTraceEnabled();
@@ -123,13 +123,20 @@
    private Object lock;
 
    private Map deliveries;
+   
+   private int maxDeliveryAttempts;
+   
+   private CoreDestination dlq;
+   
+   private TransactionRepository tr;
 
    // Constructors --------------------------------------------------
 
    protected ServerConsumerEndpoint(int id, Channel channel,
                                     ServerSessionEndpoint sessionEndpoint,
                                     String selector, boolean noLocal, JBossDestination dest,
-                                    int prefetchSize)
+                                    int prefetchSize, int maxDeliveryAttempts,
+                                    CoreDestination dlq, TransactionRepository tr)
                                     throws InvalidSelectorException
    {
       if (trace) { log.trace("constructing consumer endpoint " + id); }
@@ -138,6 +145,9 @@
       this.channel = channel;
       this.sessionEndpoint = sessionEndpoint;
       this.prefetchSize = prefetchSize;
+      this.maxDeliveryAttempts = maxDeliveryAttempts;
+      this.dlq = dlq;
+      this.tr = tr;
 
       // We always created with clientConsumerFull = true. This prevents the SCD sending messages to
       // the client before the client has fully finished creating the MessageCallbackHandler.
@@ -251,7 +261,14 @@
             return delivery;
          }
             
-         checkDeliveryCount(delivery);
+         try
+         {
+            checkDeliveryCount(delivery);
+         }
+         catch (Throwable t)
+         {
+            log.error("Failed to check delivery count", t);
+         }
          
          if (delivery.isDone())
          {
@@ -689,23 +706,37 @@
       }
    }
      
-   private void checkDeliveryCount(SimpleDelivery del)
+   private void checkDeliveryCount(SimpleDelivery del) throws Throwable
    {
-      //TODO - We need to put the message in a DLQ
-      // For now we just ack it otherwise the message will keep being retried and we'll never get
-      // anywhere
-      if (del.getReference().getDeliveryCount() > MAX_DELIVERY_ATTEMPTS)
+      if (del.getReference().getDeliveryCount() > maxDeliveryAttempts)
       {
-         log.warn(del.getReference() + " has exceed maximum delivery attempts and will be removed");
+         log.info(del.getReference() + " has exceed maximum delivery attempts and will be sent to the DLQ");
          
-         try
+         if (dlq != null)
+         {                
+            Transaction tx = tr.createTransaction();
+            
+            try
+            {         
+               dlq.handle(null, del.getReference(), tx);
+               
+               del.acknowledge(tx);
+                        
+               tx.commit(); 
+            }
+            catch (Throwable t)
+            {
+               tx.rollback();
+               
+               throw t;
+            }    
+         }
+         else
          {
+            log.info("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
+            
             del.acknowledge(null);
          }
-         catch (Throwable t)
-         {
-            log.error("Failed to acknowledge delivery", t);
-         }
       }                 
    }
    

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-14 14:08:40 UTC (rev 1561)
@@ -57,6 +57,7 @@
 import org.jboss.messaging.core.memory.MemoryManager;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.tx.TransactionRepository;
 
 /**
  * Concrete implementation of SessionEndpoint.
@@ -92,10 +93,14 @@
    private PersistenceManager pm;
    private MessageStore ms;
    private MemoryManager mm;
+   private CoreDestination dlq;
+   private TransactionRepository tr;
+   private int maxDeliveryAttempts;
    
    // Constructors --------------------------------------------------
 
    protected ServerSessionEndpoint(int sessionID, ServerConnectionEndpoint connectionEndpoint)
+      throws Exception
    {
       this.sessionID = sessionID;
       
@@ -110,6 +115,10 @@
 
       consumers = new HashMap();
 		browsers = new HashMap();  
+      
+      dlq = sp.getDLQ();
+      tr = sp.getTxRepository();
+      maxDeliveryAttempts = sp.getMaxDeliveryAttempts();
    }
    
    // SessionDelegate implementation --------------------------------
@@ -252,7 +261,8 @@
          ServerConsumerEndpoint ep =
             new ServerConsumerEndpoint(consumerID,
                                        subscription == null ? (Channel)coreDestination : subscription,
-                                       this, selector, noLocal, jmsDestination, prefetchSize);
+                                       this, selector, noLocal, jmsDestination, prefetchSize,
+                                       maxDeliveryAttempts, dlq, tr);
           
          JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
             

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java	2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java	2006-11-14 14:08:40 UTC (rev 1561)
@@ -246,9 +246,7 @@
    public JBossDestination getJBossDestination(long coreDestinationId)
    {
       return (JBossDestination)idMap.get(new Long(coreDestinationId));
-   }
-   
-   
+   }      
     
    public void deployCoreDestination(boolean isQueue, 
                                      String destName,

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/Transaction.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/Transaction.java	2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/Transaction.java	2006-11-14 14:08:40 UTC (rev 1561)
@@ -209,7 +209,7 @@
       
       keyedCallbackMap = null;
       
-      if (transactionRepository!=null)
+      if (transactionRepository != null)
       {
     	  transactionRepository.deleteTransaction(this);
       }

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2006-11-14 14:08:40 UTC (rev 1561)
@@ -140,22 +140,17 @@
 	   final Xid id = transaction.getXid();
 	   final int state = transaction.getState();
 	   
-	   if (id==null)
+	   if (id == null)
 	   {
-		   Exception ex = new Exception();
-		   log.warn("DeleteTransaction was called for non XA transaction",ex);
-		   return;
+		   throw new IllegalArgumentException("DeleteTransaction was called for non XA transaction");
 	   }
 
-	   if (state!=Transaction.STATE_COMMITTED && state!=Transaction.STATE_ROLLEDBACK)
+	   if (state != Transaction.STATE_COMMITTED && state != Transaction.STATE_ROLLEDBACK)
 	   {
 		   throw new TransactionException("Transaction with xid " + id + " can't be removed as it's not yet commited or rolledback: (Current state is " + Transaction.stateToString(state));
 	   }
 	   
-	   globalToLocalMap.remove(id);
-	   
-	   
-	   
+	   globalToLocalMap.remove(id);  
    }
    
    public Transaction createTransaction(Xid xid) throws Exception

Added: branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java
===================================================================
--- branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java	2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java	2006-11-14 14:08:40 UTC (rev 1561)
@@ -0,0 +1,225 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import java.util.Enumeration;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+
+import org.jboss.jms.destination.JBossQueue;
+import org.jboss.messaging.core.local.CoreDestination;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A DLQTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DLQTest extends MessagingTestCase
+{
+   protected InitialContext ic;
+   
+   protected ConnectionFactory cf;
+   
+   protected Queue queue;
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      ServerManagement.start("all");
+      
+      ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+      
+      cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+      
+      ServerManagement.deployQueue("Queue");
+      
+      queue = (Queue)ic.lookup("/queue/Queue");
+      
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      ServerManagement.undeployQueue("Queue");
+      
+      if (ic != null) ic.close();
+   }
+
+   public DLQTest(String name)
+   {
+      super(name);
+   }
+   
+   public void testDLQAlreadyDeployed() throws Exception
+   {
+      if (ServerManagement.isRemote())
+      {
+         //This test can only run in local mode
+         return;
+      }
+      
+      ServerManagement.deployQueue("DLQ");
+      
+      CoreDestination dlq = ServerManagement.getServer().getServerPeer().getDLQ();
+      
+      assertNotNull(dlq);
+      
+      InitialContext ic = null;
+      
+      try      
+      {
+         ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+            
+         JBossQueue q = (JBossQueue)ic.lookup("/queue/DLQ");
+         
+         assertNotNull(q);
+         
+         assertEquals("DLQ", q.getName());
+      }
+      finally
+      {      
+         if (ic != null) ic.close();
+         
+         ServerManagement.undeployQueue("DLQ");
+      }            
+   }
+   
+   public void testDLQNotAlreadyDeployed() throws Exception
+   {
+      if (ServerManagement.isRemote())
+      {
+         //This test can only run in local mode
+         return;
+      }
+      
+      CoreDestination dlq = ServerManagement.getServer().getServerPeer().getDLQ();
+      
+      assertNull(dlq);
+      
+      InitialContext ic = null;
+      
+      try      
+      {
+         ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+            
+         try
+         {
+            JBossQueue q = (JBossQueue)ic.lookup("/queue/DLQ");
+            
+            fail();
+         }
+         catch (NameNotFoundException e)
+         {
+            //Ok
+         }
+      }
+      finally
+      {      
+         if (ic != null) ic.close();
+      }            
+   }
+   
+   public void testSendToDLQWithMessageListener() throws Exception
+   {
+      Connection conn = null;
+      
+      ServerManagement.deployQueue("DLQ");
+      
+      Queue dlq = (Queue)ic.lookup("/queue/DLQ");
+                 
+      try
+      {                  
+         conn = cf.createConnection();
+         
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer prod = sess.createProducer(queue);
+         
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+            
+            prod.send(tm);
+         }
+         
+         MessageConsumer cons = sess.createConsumer(queue);
+         
+         cons.setMessageListener(new FailingMessageListener());
+         
+         conn.start();
+         
+         Thread.sleep(4000);
+         
+         QueueBrowser browser = sess.createBrowser(dlq);
+         
+         Enumeration enumeration = browser.getEnumeration();
+         
+         int i = 0;
+         while (enumeration.hasMoreElements())
+         {
+            TextMessage tm = (TextMessage)enumeration.nextElement();
+            
+            assertEquals("message:" + i++, tm.getText());
+         }
+         
+         log.info("YUP THAT WORKED");
+         
+      }
+      finally
+      {
+         ServerManagement.undeployQueue("DLQ");
+         
+         if (conn != null) conn.close();
+      }
+   }
+   
+   
+   class FailingMessageListener implements MessageListener
+   {
+
+      public void onMessage(Message msg)
+      {
+         throw new RuntimeException("Your mum!");
+      }
+      
+   }
+
+}




More information about the jboss-cvs-commits mailing list