[jboss-cvs] JBoss Messaging SVN: r1561 - in branches/Branch_1_0_CP1: src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/jms/server src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/plugin src/main/org/jboss/messaging/core/tx tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Nov 14 09:08:51 EST 2006
Author: timfox
Date: 2006-11-14 09:08:40 -0500 (Tue, 14 Nov 2006)
New Revision: 1561
Added:
branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java
Modified:
branches/Branch_1_0_CP1/src/etc/server/default/deploy/messaging-service.xml
branches/Branch_1_0_CP1/src/etc/xmdesc/ServerPeer-xmbean.xml
branches/Branch_1_0_CP1/src/etc/xmdesc/Topic-xmbean.xml
branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java
branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/Transaction.java
branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
Log:
Interim patch commit
Modified: branches/Branch_1_0_CP1/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/server/default/deploy/messaging-service.xml 2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/etc/server/default/deploy/messaging-service.xml 2006-11-14 14:08:40 UTC (rev 1561)
@@ -32,6 +32,8 @@
<role name="guest" read="true" write="true" create="true"/>
</security>
</attribute>
+ <attribute name="MaxDeliveryAttempts">10</attribute>
+ <attribute name="DLQName">DLQ</attribute>
</mbean>
<!-- Plug-ins -->
Modified: branches/Branch_1_0_CP1/src/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/xmdesc/ServerPeer-xmbean.xml 2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/etc/xmdesc/ServerPeer-xmbean.xml 2006-11-14 14:08:40 UTC (rev 1561)
@@ -139,6 +139,18 @@
<name>QueuedExecutorPoolSize</name>
<type>int</type>
</attribute>
+
+ <attribute access="read-write" getMethod="getMaxDeliveryAttempts" setMethod="setMaxDeliveryAttempts">
+ <description>The maximum delivery attempts for destinations</description>
+ <name>MaxDeliveryAttempts</name>
+ <type>int</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getDLQName" setMethod="setDLQName">
+ <description>The JNDI name of the DLQ</description>
+ <name>DLQName</name>
+ <type>java.lang.String</type>
+ </attribute>
<!-- Managed operations -->
Modified: branches/Branch_1_0_CP1/src/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/xmdesc/Topic-xmbean.xml 2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/etc/xmdesc/Topic-xmbean.xml 2006-11-14 14:08:40 UTC (rev 1561)
@@ -87,7 +87,7 @@
<name>DownCacheSize</name>
<type>int</type>
</attribute>
-
+
<!-- Managed operations -->
<operation>
Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java 2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java 2006-11-14 14:08:40 UTC (rev 1561)
@@ -35,6 +35,8 @@
import javax.naming.NamingException;
import org.jboss.aop.AspectXmlLoader;
+import org.jboss.jms.destination.JBossDestination;
+import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
import org.jboss.jms.server.connectormanager.SimpleConnectorManager;
@@ -45,6 +47,7 @@
import org.jboss.jms.server.security.SecurityMetadataStore;
import org.jboss.jms.util.ExceptionUtil;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.local.CoreDestination;
import org.jboss.messaging.core.memory.MemoryManager;
import org.jboss.messaging.core.memory.SimpleMemoryManager;
import org.jboss.messaging.core.plugin.IdManager;
@@ -99,6 +102,10 @@
protected boolean started;
protected int objectIDSequence = Integer.MIN_VALUE + 1;
+
+ private int maxDeliveryAttempts = 10;
+
+ private String dlqName;
// wired components
@@ -154,7 +161,7 @@
version = Version.instance();
- started = false;
+ started = false;
}
// ServiceMBeanSupport overrides ---------------------------------
@@ -163,6 +170,9 @@
{
try
{
+
+ log.info("starting serverpeer");
+
if (started)
{
return;
@@ -219,7 +229,7 @@
initializeRemoting(mbeanServer);
createRecoverable();
-
+
started = true;
log.info("JBoss Messaging " + getVersion().getProviderVersion() + " server [" +
@@ -272,6 +282,26 @@
}
// JMX Attributes ------------------------------------------------
+
+ public String getDLQName()
+ {
+ return dlqName;
+ }
+
+ public void setDLQName(String dlqName)
+ {
+ this.dlqName = dlqName;
+ }
+
+ public int getMaxDeliveryAttempts()
+ {
+ return maxDeliveryAttempts;
+ }
+
+ public void setMaxDeliveryAttempts(int attempts)
+ {
+ this.maxDeliveryAttempts = attempts;
+ }
public ObjectName getPersistenceManager()
{
@@ -485,6 +515,34 @@
}
// Public --------------------------------------------------------
+
+ public CoreDestination getDLQ() throws Exception
+ {
+ if (dlqName == null)
+ {
+ //No DLQ name specified so there is no DLQ
+ return null;
+ }
+
+ CoreDestination dlq = channelMapper.getCoreDestination(new JBossQueue(dlqName));
+
+// if (dlq == null)
+// {
+// //DLQ not deployed - so deploy default one
+// log.info("DLQ not deployed so deploying default one");
+//
+// createDestinationDefault(true, dlqName, null);
+//
+// dlq = channelMapper.getCoreDestination(new JBossQueue(dlqName));
+//
+// if (dlq == null)
+// {
+// throw new IllegalStateException("Cannot find dlq!");
+// }
+// }
+
+ return dlq;
+ }
public boolean isDeployed(boolean isQueue, String name)
{
Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2006-11-14 14:08:40 UTC (rev 1561)
@@ -84,7 +84,7 @@
// Down-cache size
private int downCacheSize = DOWN_CACHE_SIZE;
-
+
// Constructors --------------------------------------------------
public DestinationServiceSupport(boolean createdProgrammatically)
@@ -110,10 +110,10 @@
throw new IllegalStateException( "The " + (isQueue() ? "queue" : "topic") + " " +
"name was not properly set in the service's" +
"ObjectName");
- }
+ }
ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
-
+
dm = serverPeer.getDestinationManager();
sm = serverPeer.getSecurityManager();
cm = serverPeer.getChannelMapperDelegate();
@@ -156,7 +156,7 @@
}
// JMX managed attributes ----------------------------------------
-
+
public String getJNDIName()
{
return jndiName;
Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-11-14 14:08:40 UTC (rev 1561)
@@ -53,8 +53,10 @@
import org.jboss.messaging.core.Routable;
import org.jboss.messaging.core.SimpleDelivery;
import org.jboss.messaging.core.SingleReceiverDelivery;
+import org.jboss.messaging.core.local.CoreDestination;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionException;
+import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.util.Future;
@@ -79,10 +81,8 @@
private static final Logger log = Logger.getLogger(ServerConsumerEndpoint.class);
- // Static --------------------------------------------------------
+ // Static --------------------------------------------------------
- private static final int MAX_DELIVERY_ATTEMPTS = 10;
-
// Attributes ----------------------------------------------------
private boolean trace = log.isTraceEnabled();
@@ -123,13 +123,20 @@
private Object lock;
private Map deliveries;
+
+ private int maxDeliveryAttempts;
+
+ private CoreDestination dlq;
+
+ private TransactionRepository tr;
// Constructors --------------------------------------------------
protected ServerConsumerEndpoint(int id, Channel channel,
ServerSessionEndpoint sessionEndpoint,
String selector, boolean noLocal, JBossDestination dest,
- int prefetchSize)
+ int prefetchSize, int maxDeliveryAttempts,
+ CoreDestination dlq, TransactionRepository tr)
throws InvalidSelectorException
{
if (trace) { log.trace("constructing consumer endpoint " + id); }
@@ -138,6 +145,9 @@
this.channel = channel;
this.sessionEndpoint = sessionEndpoint;
this.prefetchSize = prefetchSize;
+ this.maxDeliveryAttempts = maxDeliveryAttempts;
+ this.dlq = dlq;
+ this.tr = tr;
// We always created with clientConsumerFull = true. This prevents the SCD sending messages to
// the client before the client has fully finished creating the MessageCallbackHandler.
@@ -251,7 +261,14 @@
return delivery;
}
- checkDeliveryCount(delivery);
+ try
+ {
+ checkDeliveryCount(delivery);
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to check delivery count", t);
+ }
if (delivery.isDone())
{
@@ -689,23 +706,37 @@
}
}
- private void checkDeliveryCount(SimpleDelivery del)
+ private void checkDeliveryCount(SimpleDelivery del) throws Throwable
{
- //TODO - We need to put the message in a DLQ
- // For now we just ack it otherwise the message will keep being retried and we'll never get
- // anywhere
- if (del.getReference().getDeliveryCount() > MAX_DELIVERY_ATTEMPTS)
+ if (del.getReference().getDeliveryCount() > maxDeliveryAttempts)
{
- log.warn(del.getReference() + " has exceed maximum delivery attempts and will be removed");
+ log.info(del.getReference() + " has exceed maximum delivery attempts and will be sent to the DLQ");
- try
+ if (dlq != null)
+ {
+ Transaction tx = tr.createTransaction();
+
+ try
+ {
+ dlq.handle(null, del.getReference(), tx);
+
+ del.acknowledge(tx);
+
+ tx.commit();
+ }
+ catch (Throwable t)
+ {
+ tx.rollback();
+
+ throw t;
+ }
+ }
+ else
{
+ log.info("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
+
del.acknowledge(null);
}
- catch (Throwable t)
- {
- log.error("Failed to acknowledge delivery", t);
- }
}
}
Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-14 14:08:40 UTC (rev 1561)
@@ -57,6 +57,7 @@
import org.jboss.messaging.core.memory.MemoryManager;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.tx.TransactionRepository;
/**
* Concrete implementation of SessionEndpoint.
@@ -92,10 +93,14 @@
private PersistenceManager pm;
private MessageStore ms;
private MemoryManager mm;
+ private CoreDestination dlq;
+ private TransactionRepository tr;
+ private int maxDeliveryAttempts;
// Constructors --------------------------------------------------
protected ServerSessionEndpoint(int sessionID, ServerConnectionEndpoint connectionEndpoint)
+ throws Exception
{
this.sessionID = sessionID;
@@ -110,6 +115,10 @@
consumers = new HashMap();
browsers = new HashMap();
+
+ dlq = sp.getDLQ();
+ tr = sp.getTxRepository();
+ maxDeliveryAttempts = sp.getMaxDeliveryAttempts();
}
// SessionDelegate implementation --------------------------------
@@ -252,7 +261,8 @@
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID,
subscription == null ? (Channel)coreDestination : subscription,
- this, selector, noLocal, jmsDestination, prefetchSize);
+ this, selector, noLocal, jmsDestination, prefetchSize,
+ maxDeliveryAttempts, dlq, tr);
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java 2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java 2006-11-14 14:08:40 UTC (rev 1561)
@@ -246,9 +246,7 @@
public JBossDestination getJBossDestination(long coreDestinationId)
{
return (JBossDestination)idMap.get(new Long(coreDestinationId));
- }
-
-
+ }
public void deployCoreDestination(boolean isQueue,
String destName,
Modified: branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/Transaction.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/Transaction.java 2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/Transaction.java 2006-11-14 14:08:40 UTC (rev 1561)
@@ -209,7 +209,7 @@
keyedCallbackMap = null;
- if (transactionRepository!=null)
+ if (transactionRepository != null)
{
transactionRepository.deleteTransaction(this);
}
Modified: branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2006-11-14 14:08:40 UTC (rev 1561)
@@ -140,22 +140,17 @@
final Xid id = transaction.getXid();
final int state = transaction.getState();
- if (id==null)
+ if (id == null)
{
- Exception ex = new Exception();
- log.warn("DeleteTransaction was called for non XA transaction",ex);
- return;
+ throw new IllegalArgumentException("DeleteTransaction was called for non XA transaction");
}
- if (state!=Transaction.STATE_COMMITTED && state!=Transaction.STATE_ROLLEDBACK)
+ if (state != Transaction.STATE_COMMITTED && state != Transaction.STATE_ROLLEDBACK)
{
throw new TransactionException("Transaction with xid " + id + " can't be removed as it's not yet commited or rolledback: (Current state is " + Transaction.stateToString(state));
}
- globalToLocalMap.remove(id);
-
-
-
+ globalToLocalMap.remove(id);
}
public Transaction createTransaction(Xid xid) throws Exception
Added: branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java
===================================================================
--- branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java 2006-11-14 13:07:50 UTC (rev 1560)
+++ branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java 2006-11-14 14:08:40 UTC (rev 1561)
@@ -0,0 +1,225 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import java.util.Enumeration;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+
+import org.jboss.jms.destination.JBossQueue;
+import org.jboss.messaging.core.local.CoreDestination;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A DLQTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DLQTest extends MessagingTestCase
+{
+ protected InitialContext ic;
+
+ protected ConnectionFactory cf;
+
+ protected Queue queue;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ ServerManagement.start("all");
+
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+ cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ ServerManagement.deployQueue("Queue");
+
+ queue = (Queue)ic.lookup("/queue/Queue");
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ ServerManagement.undeployQueue("Queue");
+
+ if (ic != null) ic.close();
+ }
+
+ public DLQTest(String name)
+ {
+ super(name);
+ }
+
+ public void testDLQAlreadyDeployed() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ {
+ //This test can only run in local mode
+ return;
+ }
+
+ ServerManagement.deployQueue("DLQ");
+
+ CoreDestination dlq = ServerManagement.getServer().getServerPeer().getDLQ();
+
+ assertNotNull(dlq);
+
+ InitialContext ic = null;
+
+ try
+ {
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+ JBossQueue q = (JBossQueue)ic.lookup("/queue/DLQ");
+
+ assertNotNull(q);
+
+ assertEquals("DLQ", q.getName());
+ }
+ finally
+ {
+ if (ic != null) ic.close();
+
+ ServerManagement.undeployQueue("DLQ");
+ }
+ }
+
+ public void testDLQNotAlreadyDeployed() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ {
+ //This test can only run in local mode
+ return;
+ }
+
+ CoreDestination dlq = ServerManagement.getServer().getServerPeer().getDLQ();
+
+ assertNull(dlq);
+
+ InitialContext ic = null;
+
+ try
+ {
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+ try
+ {
+ JBossQueue q = (JBossQueue)ic.lookup("/queue/DLQ");
+
+ fail();
+ }
+ catch (NameNotFoundException e)
+ {
+ //Ok
+ }
+ }
+ finally
+ {
+ if (ic != null) ic.close();
+ }
+ }
+
+ public void testSendToDLQWithMessageListener() throws Exception
+ {
+ Connection conn = null;
+
+ ServerManagement.deployQueue("DLQ");
+
+ Queue dlq = (Queue)ic.lookup("/queue/DLQ");
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ for (int i = 0; i < 10; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ cons.setMessageListener(new FailingMessageListener());
+
+ conn.start();
+
+ Thread.sleep(4000);
+
+ QueueBrowser browser = sess.createBrowser(dlq);
+
+ Enumeration enumeration = browser.getEnumeration();
+
+ int i = 0;
+ while (enumeration.hasMoreElements())
+ {
+ TextMessage tm = (TextMessage)enumeration.nextElement();
+
+ assertEquals("message:" + i++, tm.getText());
+ }
+
+ log.info("YUP THAT WORKED");
+
+ }
+ finally
+ {
+ ServerManagement.undeployQueue("DLQ");
+
+ if (conn != null) conn.close();
+ }
+ }
+
+
+ class FailingMessageListener implements MessageListener
+ {
+
+ public void onMessage(Message msg)
+ {
+ throw new RuntimeException("Your mum!");
+ }
+
+ }
+
+}
More information about the jboss-cvs-commits
mailing list