[jboss-svn-commits] JBL Code SVN: r7209 - in labs/jbossesb/trunk/product/core/listeners/src/org/jboss: internal/soa/esb/couriers soa/esb/actions soa/esb/couriers soa/esb/listeners/message

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sun Oct 29 21:47:10 EST 2006


Author: daniel.brum at jboss.com
Date: 2006-10-29 21:47:06 -0500 (Sun, 29 Oct 2006)
New Revision: 7209

Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/couriers/CourierFactory.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/EsbListenerController.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueListener.java
Log:


Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2006-10-30 02:47:06 UTC (rev 7209)
@@ -22,19 +22,31 @@
 
 package org.jboss.internal.soa.esb.couriers;
 
-import java.util.*;
-import javax.jms.*;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
 import javax.naming.Context;
 
 import org.apache.log4j.Logger;
-import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.couriers.Courier;
 import org.jboss.soa.esb.couriers.CourierException;
 import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.KeyValuePair;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.util.Util;
-import org.jboss.soa.esb.helpers.*;
 
 public class JmsCourier implements Courier 
 {
@@ -46,8 +58,9 @@
 	 * package protected constructor - Objects of Courier should only be instantiated by the Factory
 	 * @param epr
 	 */
-	public JmsCourier(EPR epr) throws CourierException
+	public JmsCourier(JMSEpr epr) throws CourierException
 	{
+		_epr	= epr;
 		_sleepForRetries	= 3000;
 
 		try 
@@ -74,7 +87,10 @@
 			try 
 			{
 				// obtain Serializable version of arg0 and package it in a jms ObjectMessage
-				sendMessage(_jmsSession.createObjectMessage(Util.serialize(message)));
+				ObjectMessage msg = _jmsSession.createObjectMessage(Util.serialize(message));
+				for (KeyValuePair kvp : _messageProperties)
+					msg.setStringProperty(kvp.getKey(), kvp.getValue());
+				sendMessage(msg);
 				return true;
 			}
 			catch (JMSException e)	{ jmsRetry(e); }

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java	2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java	2006-10-30 02:47:06 UTC (rev 7209)
@@ -22,6 +22,7 @@
 
 package org.jboss.soa.esb.actions;
 
+import org.apache.log4j.Logger;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.message.Body;
 import org.jboss.soa.esb.message.Message;
@@ -64,7 +65,9 @@
     {
     	Body body = message.getBody();
     	body.remove(BEFORE_ACTION);
-    	body.add(BEFORE_ACTION,body.get(CURRENT_OBJECT));
+    	Object obj = body.get(CURRENT_OBJECT);
+    	if (null != obj)
+    		body.add(BEFORE_ACTION,obj);
     }
     /**
      * Put current object in standard spot within message
@@ -92,7 +95,7 @@
     {
     	ConfigTree oRet = (ConfigTree)message.getBody().remove(CONFIG_TREE);
     	if (null!=tree)
-    		message.getBody().add(CONFIG_TREE,tree);
+    		message.getBody().add(CONFIG_TREE,tree.toXml());
     	return oRet;
     }	
     /**
@@ -104,7 +107,9 @@
      */
     public static ConfigTree getConfigTree(Message message)
     {
-    	ConfigTree oRet = (ConfigTree)message.getBody().get(CONFIG_TREE);
+    	ConfigTree oRet =null; 
+    	try { oRet = ConfigTree.fromXml((String)message.getBody().get(CONFIG_TREE)); }
+    	catch(Exception e)  { _logger.error("Problems getting ConfigTree from Message",e); }
     	return (null==oRet)?new ConfigTree("mock"):oRet;
     }
     
@@ -121,5 +126,5 @@
     	}
     	return null;
     }
-    
+    private static final Logger _logger = Logger.getLogger(ConfigTree.class);
 }

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/couriers/CourierFactory.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/couriers/CourierFactory.java	2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/couriers/CourierFactory.java	2006-10-30 02:47:06 UTC (rev 7209)
@@ -44,7 +44,7 @@
 		{
 			String address = epr.getAddr().getAddress();
 			if (address.startsWith(JMSEpr.JMS_PROTOCOL))
-				return new JmsCourier(epr);
+				return new JmsCourier((JMSEpr)epr);
 		}
 		catch (URISyntaxException e) { throw new CourierException(e); }
 		throw new CourierException("Unknown protocol");

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2006-10-30 02:47:06 UTC (rev 7209)
@@ -30,7 +30,7 @@
     public ActionProcessingPipeline(Message message)
     	throws ConfigurationException
     {
-    	if (null==_message)
+    	if (null==message)
     		throw new IllegalArgumentException("Message must be not null");
         _message	= message;
         _config		= ActionUtils.getConfigTree(_message);

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/EsbListenerController.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/EsbListenerController.java	2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/EsbListenerController.java	2006-10-30 02:47:06 UTC (rev 7209)
@@ -32,9 +32,8 @@
 import org.apache.log4j.Logger;
 import org.jboss.internal.soa.esb.command.CommandQueue;
 import org.jboss.internal.soa.esb.command.CommandQueueException;
-import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.util.EPRManager;
-import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.common.Environment;
 import org.jboss.soa.esb.common.ModulePropertyManager;
 import org.jboss.soa.esb.helpers.ConfigTree;
@@ -258,12 +257,7 @@
 		String sEndT = p_oP.getAttribute(PARM_END_TIME);
 		m_lEndTime = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
 				sEndT).getTime();
-
-        // Read and initialise the action definitions...
-        ConfigTree actionConfig = p_oP.getFirstChild("Actions");
-        if(actionConfig == null) {
-            throw new ConfigurationException("No 'Actions' configuration.");
-        }        
+          
 	} // ________________________________
 
     /**
@@ -342,7 +336,8 @@
 
 		// Close the command queue...
 		try {
-			commandQueue.close();
+			if (null != commandQueue)
+				commandQueue.close();
 		} catch (CommandQueueException e) {
 			_logger.error("Error closing Command Queue.", e);
 		}
@@ -374,8 +369,8 @@
 			// No command queue nor topic - Just sleep until time
 			// exhausted, or thread interrupted
 			try {
-				if (lToGo > 0)
-					Thread.sleep(lToGo);
+				while ((lToGo=millisToWait()) > 0)
+					Thread.sleep(500);
 			} catch (InterruptedException e) {
 				m_lEndTime = 0; // mark as end requested and return
 			}
@@ -466,6 +461,11 @@
 		}
 	} // ________________________________
 
+	public void requestEnd() {
+		m_bEndRequested=true;
+		m_lEndTime = 0;
+	}
+	
 	/**
 	 * Accessor to determine if execution time is expired or shutdown requested
 	 * 
@@ -512,7 +512,7 @@
 		return (endNotRequested() && !timeToReload());
 	} // ________________________________
 	
-	private static EPRManager getEprManager()
+	public static EPRManager getEprManager()
 	{
 		PropertyManager manager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.CORE_MODULE);
 		String sDir = manager.getProperty(Environment.REGISTRY_FILE_HELPER_DIR,".");	

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueListener.java	2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueListener.java	2006-10-30 02:47:06 UTC (rev 7209)
@@ -22,6 +22,9 @@
 
 package org.jboss.soa.esb.listeners.message;
 
+import java.io.IOException;
+import java.io.Serializable;
+
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.ObjectMessage;
@@ -31,6 +34,7 @@
 import javax.jms.QueueSession;
 import javax.jms.TopicSession;
 import javax.naming.Context;
+import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.log4j.Logger;
 
@@ -38,8 +42,10 @@
 import org.jboss.soa.esb.helpers.AppServerContext;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.util.Util;
 import org.jboss.soa.esb.actions.ActionUtils;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.xml.sax.SAXException;
 
 /**
  * Esb Message aware JMS queue listener.
@@ -82,7 +88,8 @@
 
     	while (_controller.continueLooping())
         {
-        	org.jboss.soa.esb.message.Message message = receiveEsbMessage(_controller.millisToWait());
+    		long lwait = _controller.millisToWait();
+    		org.jboss.soa.esb.message.Message message = (lwait > 0 ) ? receiveEsbMessage(100) : null;
         	if (null!=message)
         	{	
         		ActionUtils.setConfigTree(message,_config);
@@ -98,7 +105,6 @@
         		new Thread(chain).start();
         	}
         }
-
     	_controller.unRegister(_eprName);
     	cleanup();
     } // _______________________________
@@ -143,6 +149,7 @@
         	_oQsess = _oQconn.createQueueSession(false,TopicSession.AUTO_ACKNOWLEDGE);
         	_oQconn.start();
         	_receiver = _oQsess.createReceiver(_oQueue, _sSelector);
+        	return;
         }
         catch (javax.naming.NamingException e)	{ thrown = e; }
         catch (JMSException e) 					{ thrown = e; }
@@ -153,7 +160,10 @@
     protected org.jboss.soa.esb.message.Message receiveEsbMessage(long millis)
     {
     	javax.jms.Message jmsMessage = null;
-        try {	jmsMessage = _receiver.receive(millis); }
+        try 
+		{	
+        	jmsMessage = _receiver.receive(millis);
+		}
         catch (JMSException oJ)
         {
         	_logger.error("JMS error on receive.  Attempting JMS Destination reconnect.", oJ);
@@ -181,7 +191,8 @@
         }
         try
         {
-        	return (org.jboss.soa.esb.message.Message)((ObjectMessage)jmsMessage).getObject();
+        	Serializable obj = (Serializable)((ObjectMessage)jmsMessage).getObject();
+        	return Util.deserialize(obj);
         } 
         catch (JMSException e1)
         { _logger.error("Failed to read Serialized Object from JMS message.", e1);
@@ -190,6 +201,15 @@
         catch (ClassCastException e2)
         { _logger.error("Object in JMS message is not a org.jboss.soa.esb.message.Message", e2);
         }
+        catch (IOException e3)
+        { _logger.error("Object in JMS message is not a Serializeable", e3);
+        }
+        catch (ParserConfigurationException e4)
+        { _logger.error("Object in JMS message has invalid XML", e4);
+        }
+        catch (SAXException e5)
+        { _logger.error("Object in JMS message has invalid XML", e5);
+        }
         return null;
     }
     




More information about the jboss-svn-commits mailing list