[jboss-cvs] JBoss Messaging SVN: r3615 - trunk/src/main/org/jboss/jms/client.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jan 22 11:42:21 EST 2008
Author: timfox
Date: 2008-01-22 11:42:21 -0500 (Tue, 22 Jan 2008)
New Revision: 3615
Added:
trunk/src/main/org/jboss/jms/client/MessageHandler.java
Log:
Added missing file
Added: trunk/src/main/org/jboss/jms/client/MessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/MessageHandler.java (rev 0)
+++ trunk/src/main/org/jboss/jms/client/MessageHandler.java 2008-01-22 16:42:21 UTC (rev 3615)
@@ -0,0 +1,162 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.client;
+
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.jboss.jms.client.api.ClientSession;
+import org.jboss.jms.client.impl.Cancel;
+import org.jboss.jms.client.impl.CancelImpl;
+import org.jboss.jms.client.impl.DeliveryInfo;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.util.Logger;
+
+/**
+ *
+ * A MessageHandler
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class MessageHandler
+{
+ private static final Logger log = Logger.getLogger(MessageHandler.class);
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ // This is static so it can be called by the asf layer too
+ public static void callOnMessage(ClientSession sess,
+ MessageListener listener,
+ String consumerID,
+ boolean isConnectionConsumer,
+ JBossMessage m,
+ int ackMode,
+ int maxDeliveries,
+ ClientSession connectionConsumerSession,
+ boolean shouldAck)
+ throws JMSException
+ {
+ if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
+ {
+ // Message has been cancelled
+ return;
+ }
+
+ DeliveryInfo deliveryInfo =
+ new DeliveryInfo(m, consumerID, connectionConsumerSession, shouldAck);
+
+ m.incDeliveryCount();
+
+ // If this is the callback-handler for a connection consumer we don't want to acknowledge or
+ // add anything to the tx for this session.
+ if (!isConnectionConsumer)
+ {
+ // We need to call preDeliver, deliver the message then call postDeliver - this is because
+ // it is legal to call session.recover(), or session.rollback() from within the onMessage()
+ // method in which case the last message needs to be delivered so it needs to know about it
+ sess.preDeliver(deliveryInfo);
+ }
+
+ try
+ {
+ if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
+
+ listener.onMessage(m);
+
+ if (trace) { log.trace("listener's onMessage() finished"); }
+ }
+ catch (RuntimeException e)
+ {
+ log.error("RuntimeException was thrown from onMessage, " + m.getJMSMessageID() + " will be redelivered", e);
+
+ // See JMS 1.1 spec 4.5.2
+
+ if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ {
+ sess.recover();
+ }
+ }
+
+ // If this is the callback-handler for a connection consumer we don't want to acknowledge
+ //or add anything to the tx for this session
+ if (!isConnectionConsumer)
+ {
+ if (trace) { log.trace("Calling postDeliver"); }
+
+ sess.postDeliver();
+
+ if (trace) { log.trace("Called postDeliver"); }
+ }
+ }
+
+
+ public static boolean checkExpiredOrReachedMaxdeliveries(JBossMessage jbm,
+ ClientSession del,
+ int maxDeliveries, boolean shouldCancel)
+ {
+ Message msg = jbm.getCoreMessage();
+
+ boolean expired = msg.isExpired();
+
+ boolean reachedMaxDeliveries = jbm.getDeliveryCount() == maxDeliveries;
+
+ if (expired || reachedMaxDeliveries)
+ {
+ if (trace)
+ {
+ if (expired)
+ {
+ log.trace(msg + " has expired, cancelling to server");
+ }
+ else
+ {
+ log.trace(msg + " has reached maximum delivery number " + maxDeliveries +", cancelling to server");
+ }
+ }
+
+ if (shouldCancel)
+ {
+ final Cancel cancel = new CancelImpl(jbm.getDeliveryId(), jbm.getDeliveryCount(),
+ expired, reachedMaxDeliveries);
+ try
+ {
+ del.cancelDelivery(cancel);
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to cancel delivery", e);
+ }
+ }
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+
+}
More information about the jboss-cvs-commits
mailing list