[jboss-svn-commits] JBL Code SVN: r10475 - in labs/jbossesb/branches/JBESB_4_0_MP1/product/core: rosetta/src/org/jboss/soa/esb/couriers and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Mar 23 09:54:17 EDT 2007
Author: kevin.conner at jboss.com
Date: 2007-03-23 09:54:17 -0400 (Fri, 23 Mar 2007)
New Revision: 10475
Added:
labs/jbossesb/branches/JBESB_4_0_MP1/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierCollection.java
Modified:
labs/jbossesb/branches/JBESB_4_0_MP1/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
Log:
Rewrite handling of couriers
Modified: labs/jbossesb/branches/JBESB_4_0_MP1/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_0_MP1/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-03-23 13:52:54 UTC (rev 10474)
+++ labs/jbossesb/branches/JBESB_4_0_MP1/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-03-23 13:54:17 UTC (rev 10475)
@@ -28,11 +28,10 @@
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
-import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierCollection;
import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.couriers.CourierUtil;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.helpers.NamingContext;
import org.jboss.soa.esb.listeners.ListenerTagNames;
@@ -88,16 +87,30 @@
protected void doInitialise()
throws ManagedLifecycleException
{
+ final Collection<EPR> targetEprs;
try
{
- _targetEprs = RegistryUtil.getEprs(_targetServiceCategory, _targetServiceName);
- if (null == _targetEprs || _targetEprs.size() < 1)
+ targetEprs = RegistryUtil.getEprs(_targetServiceCategory, _targetServiceName);
+ if (null == targetEprs || targetEprs.size() < 1)
throw new ManagedLifecycleException("EPR <" + _targetServiceName + "> not found in registry");
}
catch (final RegistryException re)
{
throw new ManagedLifecycleException("Unexpected registry exception", re);
}
+
+ try
+ {
+ couriers = new CourierCollection(targetEprs) ;
+ }
+ catch (final CourierException ce)
+ {
+ throw new ManagedLifecycleException("Unexpected courier exception", ce);
+ }
+ catch (final MalformedEPRException mepre)
+ {
+ throw new ManagedLifecycleException("Invalid EPR specified", mepre);
+ }
if (_serviceName != null)
{
@@ -158,69 +171,41 @@
if (null != msgIn)
try
{
- Object obj = _processMethod.invoke(_composer,
+ final Object obj ;
+ try
+ {
+ obj = _processMethod.invoke(_composer,
new Object[]{msgIn});
- if (null == obj)
+ }
+ catch (InvocationTargetException e)
{
- _logger.warn("Action class method <" + _processMethod
- .getName() + "> returned a null object");
- continue;
+ _logger.error("Problems invoking method <" + _processMethod
+ .getName() + ">", e);
+ continue ;
}
- // try to deliver the composed message, using the
- // appropriate courier
- // to the target service
- try
+ catch (IllegalAccessException e)
{
- boolean bSent = false;
- for (EPR current : _targetEprs)
- {
- _courier = CourierFactory.getCourier(current);
- try
- {
- if (_courier
- .deliver((org.jboss.soa.esb.message.Message) obj))
- {
- bSent = true;
- break;
- }
- }
- finally
- {
- CourierUtil.cleanCourier(_courier);
- }
- }
- if (!bSent)
- {
- String text = "Target service <" + _targetServiceCategory + "," + _targetServiceName + "> is not registered";
- throw new Exception(text);
- }
+ _logger.error("Problems invoking method <" + _processMethod
+ .getName() + ">", e);
+ continue ;
}
- catch (ClassCastException e)
+
+ if (null == obj)
{
- _logger.error("Action class method <" + _processMethod
- .getName() + "> returned a non Message object",
- e);
+ _logger.warn("Action class method <" + _processMethod
+ .getName() + "> returned a null object");
continue;
}
- catch (CourierException e)
+ if (!(obj instanceof Message))
{
- String text = (null != _courier) ? "Courier <" + _courier
- .getClass().getName() + ".deliver(Message) FAILED" : "NULL courier can't deliver Message";
- _logger.error(text, e);
- continue;
+ _logger.error("Action class method <" + _processMethod
+ .getName() + "> returned a non Message object" +
+ obj.getClass().getName()) ;
+ continue;
}
- continue;
+
+ couriers.deliver((Message)obj) ;
}
- catch (InvocationTargetException e)
- {
- _logger.error("Problems invoking method <" + _processMethod
- .getName() + ">", e);
- }
- catch (IllegalAccessException e)
- {
- _logger.error("Problems invoking method <" + _processMethod
- .getName() + ">", e);
- }
catch (Exception e)
{
_logger.error("Unexpected problem", e);
@@ -362,10 +347,17 @@
_pool = JmsConnectionPoolContainer.getPool(sJndiURL, sJndiContextFactory, sJndiPkgPrefix, sFactClass, JMSEpr.QUEUE_TYPE);
+ try
+ {
+ _queueSession = _pool.getQueueSession();
+ }
+ catch (final NamingException ne)
+ {
+ throw new ConfigurationException("Naming exception while obtaining Queue session", ne) ;
+ }
try
{
- _queueSession = _pool.getQueueSession();
_queue = (Queue) oJndiCtx.lookup(_queueName);
}
catch (NamingException ne)
@@ -393,27 +385,40 @@
*/
protected javax.jms.Message receiveOne()
{
- while (isRunning())
+ javax.jms.Message ret = null ;
+ try
+ {
+ if (_messageReceiver != null)
+ {
+ ret = _messageReceiver.receive(200);
+ }
+ }
+ catch (JMSException oJ)
+ {
+ _logger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
try
{
- javax.jms.Message ret = _messageReceiver.receive(200);
- if (null != ret) return ret;
+ _messageReceiver.close() ;
+ _pool.releaseSession(_queueSession) ;
}
- catch (JMSException oJ)
+ // try to reconnect to the queue
+ catch (Exception e)
{
- _logger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
- try
- {
- prepareMessageReceiver();
- }
- // try to reconnect to the queue
- catch (Exception e)
- {
- _logger.error("Reconnecting to Queue", e);
- waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _sleepForRetries);
- }
+ _logger.error("Error cleaning up", e);
}
- return null;
+
+ try
+ {
+ prepareMessageReceiver();
+ }
+ // try to reconnect to the queue
+ catch (Exception e)
+ {
+ _logger.error("Reconnecting to Queue", e);
+ waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _sleepForRetries);
+ }
+ }
+ return ret ;
} // ________________________________
/**
@@ -504,7 +509,7 @@
protected EPR _myEpr;
- protected Collection<EPR> _targetEprs;
+ protected CourierCollection couriers ;
protected String _composerName;
@@ -514,7 +519,5 @@
protected Method _processMethod;
- protected Courier _courier;
-
protected JmsConnectionPool _pool;
}
Added: labs/jbossesb/branches/JBESB_4_0_MP1/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierCollection.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_0_MP1/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierCollection.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_0_MP1/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierCollection.java 2007-03-23 13:54:17 UTC (rev 10475)
@@ -0,0 +1,117 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.couriers;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.message.Message;
+/**
+ * Handle delivery of a message to a collection of Couriers.
+ *
+ * @author kevin
+ */
+public class CourierCollection
+{
+ /**
+ * The array of couriers.
+ */
+ private final Courier[] couriers ;
+
+ /**
+ * Construct the courier collection.
+ * @param eprs The collection of courier eprs.
+ */
+ public CourierCollection(final Collection<EPR> eprs)
+ throws MalformedEPRException, CourierException
+ {
+ if ((eprs != null) && (eprs.size() > 0))
+ {
+ final ArrayList<Courier> courierList = new ArrayList<Courier>() ;
+ try
+ {
+ for (EPR epr: eprs)
+ {
+ final Courier courier = CourierFactory.getCourier(epr) ;
+ if (courier == null)
+ {
+ throw new CourierException("Null courier returned for EPR: " + epr) ;
+ }
+ courierList.add(courier) ;
+ }
+ couriers = courierList.toArray(new Courier[courierList.size()]) ;
+ courierList.clear() ;
+ }
+ finally
+ {
+ if (courierList.size() > 0)
+ {
+ for(Courier courier: courierList)
+ {
+ CourierUtil.cleanCourier(courier) ;
+ }
+ }
+ }
+ }
+ else
+ {
+ couriers = null ;
+ }
+ }
+
+ /**
+ * Deliver the message to the list of couriers.
+ * @param message The message to deliver.
+ * @throws CourierException For processing errors.
+ * @throws MalformedEPRException For configuration errors.
+ */
+ public boolean deliver(final Message message)
+ throws CourierException, MalformedEPRException
+ {
+ if (couriers != null)
+ {
+ for(Courier courier: couriers)
+ {
+ courier.deliver(message) ;
+ }
+ return true ;
+ }
+ else
+ {
+ return false ;
+ }
+ }
+
+ /**
+ * Cleanup and release all couriers.
+ */
+ public void cleanup()
+ {
+ for(Courier courier: couriers)
+ {
+ CourierUtil.cleanCourier(courier) ;
+ }
+ }
+}
Property changes on: labs/jbossesb/branches/JBESB_4_0_MP1/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierCollection.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
More information about the jboss-svn-commits
mailing list