[jboss-svn-commits] JBL Code SVN: r7209 - in labs/jbossesb/trunk/product/core/listeners/src/org/jboss: internal/soa/esb/couriers soa/esb/actions soa/esb/couriers soa/esb/listeners/message
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sun Oct 29 21:47:10 EST 2006
Author: daniel.brum at jboss.com
Date: 2006-10-29 21:47:06 -0500 (Sun, 29 Oct 2006)
New Revision: 7209
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/couriers/CourierFactory.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/EsbListenerController.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueListener.java
Log:
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2006-10-30 02:47:06 UTC (rev 7209)
@@ -22,19 +22,31 @@
package org.jboss.internal.soa.esb.couriers;
-import java.util.*;
-import javax.jms.*;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
import javax.naming.Context;
import org.apache.log4j.Logger;
-import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.couriers.Courier;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.KeyValuePair;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.util.Util;
-import org.jboss.soa.esb.helpers.*;
public class JmsCourier implements Courier
{
@@ -46,8 +58,9 @@
* package protected constructor - Objects of Courier should only be instantiated by the Factory
* @param epr
*/
- public JmsCourier(EPR epr) throws CourierException
+ public JmsCourier(JMSEpr epr) throws CourierException
{
+ _epr = epr;
_sleepForRetries = 3000;
try
@@ -74,7 +87,10 @@
try
{
// obtain Serializable version of arg0 and package it in a jms ObjectMessage
- sendMessage(_jmsSession.createObjectMessage(Util.serialize(message)));
+ ObjectMessage msg = _jmsSession.createObjectMessage(Util.serialize(message));
+ for (KeyValuePair kvp : _messageProperties)
+ msg.setStringProperty(kvp.getKey(), kvp.getValue());
+ sendMessage(msg);
return true;
}
catch (JMSException e) { jmsRetry(e); }
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java 2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java 2006-10-30 02:47:06 UTC (rev 7209)
@@ -22,6 +22,7 @@
package org.jboss.soa.esb.actions;
+import org.apache.log4j.Logger;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.message.Body;
import org.jboss.soa.esb.message.Message;
@@ -64,7 +65,9 @@
{
Body body = message.getBody();
body.remove(BEFORE_ACTION);
- body.add(BEFORE_ACTION,body.get(CURRENT_OBJECT));
+ Object obj = body.get(CURRENT_OBJECT);
+ if (null != obj)
+ body.add(BEFORE_ACTION,obj);
}
/**
* Put current object in standard spot within message
@@ -92,7 +95,7 @@
{
ConfigTree oRet = (ConfigTree)message.getBody().remove(CONFIG_TREE);
if (null!=tree)
- message.getBody().add(CONFIG_TREE,tree);
+ message.getBody().add(CONFIG_TREE,tree.toXml());
return oRet;
}
/**
@@ -104,7 +107,9 @@
*/
public static ConfigTree getConfigTree(Message message)
{
- ConfigTree oRet = (ConfigTree)message.getBody().get(CONFIG_TREE);
+ ConfigTree oRet =null;
+ try { oRet = ConfigTree.fromXml((String)message.getBody().get(CONFIG_TREE)); }
+ catch(Exception e) { _logger.error("Problems getting ConfigTree from Message",e); }
return (null==oRet)?new ConfigTree("mock"):oRet;
}
@@ -121,5 +126,5 @@
}
return null;
}
-
+ private static final Logger _logger = Logger.getLogger(ConfigTree.class);
}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/couriers/CourierFactory.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/couriers/CourierFactory.java 2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/couriers/CourierFactory.java 2006-10-30 02:47:06 UTC (rev 7209)
@@ -44,7 +44,7 @@
{
String address = epr.getAddr().getAddress();
if (address.startsWith(JMSEpr.JMS_PROTOCOL))
- return new JmsCourier(epr);
+ return new JmsCourier((JMSEpr)epr);
}
catch (URISyntaxException e) { throw new CourierException(e); }
throw new CourierException("Unknown protocol");
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2006-10-30 02:47:06 UTC (rev 7209)
@@ -30,7 +30,7 @@
public ActionProcessingPipeline(Message message)
throws ConfigurationException
{
- if (null==_message)
+ if (null==message)
throw new IllegalArgumentException("Message must be not null");
_message = message;
_config = ActionUtils.getConfigTree(_message);
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/EsbListenerController.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/EsbListenerController.java 2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/EsbListenerController.java 2006-10-30 02:47:06 UTC (rev 7209)
@@ -32,9 +32,8 @@
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.command.CommandQueue;
import org.jboss.internal.soa.esb.command.CommandQueueException;
-import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.util.EPRManager;
-import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
import org.jboss.soa.esb.helpers.ConfigTree;
@@ -258,12 +257,7 @@
String sEndT = p_oP.getAttribute(PARM_END_TIME);
m_lEndTime = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
sEndT).getTime();
-
- // Read and initialise the action definitions...
- ConfigTree actionConfig = p_oP.getFirstChild("Actions");
- if(actionConfig == null) {
- throw new ConfigurationException("No 'Actions' configuration.");
- }
+
} // ________________________________
/**
@@ -342,7 +336,8 @@
// Close the command queue...
try {
- commandQueue.close();
+ if (null != commandQueue)
+ commandQueue.close();
} catch (CommandQueueException e) {
_logger.error("Error closing Command Queue.", e);
}
@@ -374,8 +369,8 @@
// No command queue nor topic - Just sleep until time
// exhausted, or thread interrupted
try {
- if (lToGo > 0)
- Thread.sleep(lToGo);
+ while ((lToGo=millisToWait()) > 0)
+ Thread.sleep(500);
} catch (InterruptedException e) {
m_lEndTime = 0; // mark as end requested and return
}
@@ -466,6 +461,11 @@
}
} // ________________________________
+ public void requestEnd() {
+ m_bEndRequested=true;
+ m_lEndTime = 0;
+ }
+
/**
* Accessor to determine if execution time is expired or shutdown requested
*
@@ -512,7 +512,7 @@
return (endNotRequested() && !timeToReload());
} // ________________________________
- private static EPRManager getEprManager()
+ public static EPRManager getEprManager()
{
PropertyManager manager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.CORE_MODULE);
String sDir = manager.getProperty(Environment.REGISTRY_FILE_HELPER_DIR,".");
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueListener.java 2006-10-30 02:46:42 UTC (rev 7208)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/JmsQueueListener.java 2006-10-30 02:47:06 UTC (rev 7209)
@@ -22,6 +22,9 @@
package org.jboss.soa.esb.listeners.message;
+import java.io.IOException;
+import java.io.Serializable;
+
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
@@ -31,6 +34,7 @@
import javax.jms.QueueSession;
import javax.jms.TopicSession;
import javax.naming.Context;
+import javax.xml.parsers.ParserConfigurationException;
import org.apache.log4j.Logger;
@@ -38,8 +42,10 @@
import org.jboss.soa.esb.helpers.AppServerContext;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.util.Util;
import org.jboss.soa.esb.actions.ActionUtils;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.xml.sax.SAXException;
/**
* Esb Message aware JMS queue listener.
@@ -82,7 +88,8 @@
while (_controller.continueLooping())
{
- org.jboss.soa.esb.message.Message message = receiveEsbMessage(_controller.millisToWait());
+ long lwait = _controller.millisToWait();
+ org.jboss.soa.esb.message.Message message = (lwait > 0 ) ? receiveEsbMessage(100) : null;
if (null!=message)
{
ActionUtils.setConfigTree(message,_config);
@@ -98,7 +105,6 @@
new Thread(chain).start();
}
}
-
_controller.unRegister(_eprName);
cleanup();
} // _______________________________
@@ -143,6 +149,7 @@
_oQsess = _oQconn.createQueueSession(false,TopicSession.AUTO_ACKNOWLEDGE);
_oQconn.start();
_receiver = _oQsess.createReceiver(_oQueue, _sSelector);
+ return;
}
catch (javax.naming.NamingException e) { thrown = e; }
catch (JMSException e) { thrown = e; }
@@ -153,7 +160,10 @@
protected org.jboss.soa.esb.message.Message receiveEsbMessage(long millis)
{
javax.jms.Message jmsMessage = null;
- try { jmsMessage = _receiver.receive(millis); }
+ try
+ {
+ jmsMessage = _receiver.receive(millis);
+ }
catch (JMSException oJ)
{
_logger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
@@ -181,7 +191,8 @@
}
try
{
- return (org.jboss.soa.esb.message.Message)((ObjectMessage)jmsMessage).getObject();
+ Serializable obj = (Serializable)((ObjectMessage)jmsMessage).getObject();
+ return Util.deserialize(obj);
}
catch (JMSException e1)
{ _logger.error("Failed to read Serialized Object from JMS message.", e1);
@@ -190,6 +201,15 @@
catch (ClassCastException e2)
{ _logger.error("Object in JMS message is not a org.jboss.soa.esb.message.Message", e2);
}
+ catch (IOException e3)
+ { _logger.error("Object in JMS message is not a Serializeable", e3);
+ }
+ catch (ParserConfigurationException e4)
+ { _logger.error("Object in JMS message has invalid XML", e4);
+ }
+ catch (SAXException e5)
+ { _logger.error("Object in JMS message has invalid XML", e5);
+ }
return null;
}
More information about the jboss-svn-commits
mailing list