[jboss-svn-commits] JBL Code SVN: r7514 - labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Nov 9 13:50:29 EST 2006


Author: estebanschifman
Date: 2006-11-09 13:50:27 -0500 (Thu, 09 Nov 2006)
New Revision: 7514

Modified:
   labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
Log:
Implement receiver part of JmsCourier

Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2006-11-09 18:41:32 UTC (rev 7513)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java	2006-11-09 18:50:27 UTC (rev 7514)
@@ -22,6 +22,8 @@
 
 package org.jboss.internal.soa.esb.couriers;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 
 import javax.jms.Connection;
@@ -39,6 +41,7 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.naming.Context;
+import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
@@ -47,6 +50,7 @@
 import org.jboss.soa.esb.helpers.KeyValuePair;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.util.Util;
+import org.xml.sax.SAXException;
 
 public class JmsCourier implements PickUpCourier, DeliverCourier 
 {
@@ -71,11 +75,9 @@
 		_epr		= epr;
 		_sleepForRetries	= 3000;
 
-		try 
-		{
-			_messageProperties = Util.propertiesFromSelector(_epr.getMessageSelector());
-		}
-		catch (Exception e) { throw new CourierException(e); }
+		if (! _isReceiver)
+			try { _messageProperties = Util.propertiesFromSelector(_epr.getMessageSelector()); }
+			catch (Exception e) { throw new CourierException(e); }
 		
     } //________________________________
 
@@ -84,22 +86,23 @@
 		if (null != _messageProducer)
 			try { _messageProducer.close(); }
 			catch (JMSException e) {/*  OK do nothing  */ }
-		_messageProducer = null;
 		
 		if (null != _messageConsumer)
 			try { _messageConsumer.close(); }
 			catch (JMSException e) {/*  OK do nothing  */ }
-		_messageConsumer = null;
 		
 		if (null != _jmsSession)
 			try { _jmsSession.close(); }
 			catch (JMSException e) {/*  OK do nothing  */ }
-		_jmsSession = null;
 		
 		if (null != _jmsConnection)
 			try { _jmsConnection.close(); }
 			catch (JMSException e) {/*  OK do nothing  */ }
-		_jmsConnection = null;
+
+		_messageProducer 	= null;
+		_messageConsumer 	= null;
+		_jmsSession 		= null;
+		_jmsConnection 		= null;
     } //________________________________
 
 	/**
@@ -119,7 +122,7 @@
 			try {  createMessageProducer();}
 			catch (Exception e) {throw new CourierException(e); }
 
-			while (null!=_messageProducer)
+		while (null!=_messageProducer)
 		{
 			try 
 			{
@@ -130,20 +133,12 @@
 				sendMessage(msg);
 				return true;
 			}
-			catch (JMSException e)	{ jmsRetryDeliver(e); }
+			catch (JMSException e)	{ jmsConnectRetry(e); }
 			catch (Exception e)		{ throw new CourierException(e);}
 		}
 		return false;
 	} //________________________________
 	
-	public Message pickUp(long millis) throws CourierException 
-	{
-		if (! _isReceiver)
-			throw new CourierException("This is an outgoing-only Courier");
-		// TODO Auto-generated method stub
-		return null;
-	}
-    
 	/**
 	 * send/publish a javax.jms.ObjectMessage (that will contain the serialized ESB Message)
 	 * @param jmsMessage
@@ -156,17 +151,24 @@
     		_messageProducer.send(jmsMessage);
     } //________________________________
     
-    private void jmsRetryDeliver(Exception exc)
+    private void jmsConnectRetry(Exception exc)
     {
     	_logger.error("JMS error.  Attempting JMS reconnect.", exc);
         _jmsConnection	= null;
         _jmsSession		= null;
 		_messageProducer= null;
+		_messageConsumer= null;
 
 		for (int i1 = 0; i1 < 5; i1++)
 		{
     		// try to reconnect to the queue
-    		try { createMessageProducer(); } 
+    		try 
+    		{ 
+    			if (_isReceiver)
+    				createMessageConsumer();
+    			else
+    				createMessageProducer(); 
+    		} 
     		catch (Exception e)
     		{
     			_logger.error("Reconnecting to JMS", e);
@@ -182,7 +184,6 @@
     
     protected void createMessageProducer() throws Exception
     {
-
         String sJndiType = _epr.getJndiType();
         if (Util.isNullString(sJndiType))
         	sJndiType = "jboss";
@@ -206,7 +207,7 @@
             QueueSession qSess = qConn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
             _jmsConnection	 = qConn;
             _jmsSession		 = qSess;
-            _messageProducer = qSess.createProducer(queue);
+            _messageProducer = qSess.createSender(queue);
         }
         else if(JMSEpr.TOPIC_TYPE.equals(sType))
         {
@@ -223,7 +224,107 @@
                 
     } //________________________________
 
-    protected boolean			_isReceiver;	  
+	public Message pickUp(long millis) throws CourierException 
+	{
+		if (! _isReceiver)
+			throw new CourierException("This is an outgoing-only Courier");
+		if (millis < 1)
+			throw new IllegalArgumentException("Timeout millis must be > 0");
+		if (null==_messageConsumer)
+			try {  createMessageConsumer();}
+			catch (Exception e) {throw new CourierException(e); }
+		
+	    javax.jms.Message jmsMessage = null;
+		while (null!=_messageConsumer)
+		{
+			try 
+			{ 
+            	jmsMessage = _messageConsumer.receive(millis);
+            	break;
+			}
+			catch (JMSException e)	{ jmsConnectRetry(e); }
+			catch (Exception e)		{ throw new CourierException(e);}
+		}
+    	if (null == jmsMessage)
+        	return null;
+
+        if (!(jmsMessage instanceof ObjectMessage))
+        {
+        	_logger.error("Unsupported JMS message type: " + jmsMessage.getClass().getName());
+        	return null;
+        }
+        try
+        {
+        	Serializable obj = (Serializable)((ObjectMessage)jmsMessage).getObject();
+        	return Util.deserialize(obj);
+        } 
+        catch (JMSException e1)
+        { _logger.error("Failed to read Serialized Object from JMS message.", e1);
+          return null;
+        }
+        catch (ClassCastException e2)
+        { _logger.error("Object in JMS message is not a org.jboss.soa.esb.message.Message", e2);
+        }
+        catch (IOException e3)
+        { _logger.error("Object in JMS message is not a Serializeable", e3);
+        }
+        catch (ParserConfigurationException e4)
+        { _logger.error("Object in JMS message has invalid XML", e4);
+        }
+        catch (SAXException e5)
+        { _logger.error("Object in JMS message has invalid XML", e5);
+        }
+    	return null;
+    } //________________________________
+
+    protected void createMessageConsumer() throws Exception
+    {
+        String sJndiType = _epr.getJndiType();
+        if (Util.isNullString(sJndiType))
+        	sJndiType = "jboss";
+        String sJndiURL = _epr.getJndiURL();
+        if (Util.isNullString(sJndiURL))
+        	sJndiURL = "localhost";
+        Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+
+        String sFactoryClass = _epr.getConnectionFactory();
+        if (Util.isNullString(sFactoryClass))
+        	sFactoryClass = "ConnectionFactory";
+        
+        Object tmp = oJndiCtx.lookup(sFactoryClass);
+        
+        String sType = _epr.getDestinationType();
+        if (JMSEpr.QUEUE_TYPE.equals(sType))
+        {
+            javax.jms.Queue queue = (javax.jms.Queue) oJndiCtx.lookup(_epr.getDestinationName());
+            QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+            QueueConnection qConn = qcf.createQueueConnection();
+            QueueSession qSess = qConn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
+            _jmsConnection	 = qConn;
+            _jmsSession		 = qSess;
+            _messageConsumer = qSess.createReceiver(queue,_epr.getMessageSelector());
+            qConn.start();
+        }
+        else if(JMSEpr.TOPIC_TYPE.equals(sType))
+        {
+            Topic topic = (Topic) oJndiCtx.lookup(_epr.getDestinationName());
+            TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
+            TopicConnection tConn = tcf.createTopicConnection();
+            TopicSession tSess = tConn.createTopicSession(false,TopicSession.AUTO_ACKNOWLEDGE);
+            _jmsConnection	 = tConn;
+            _jmsSession		 = tSess;
+            _messageConsumer = tSess.createConsumer(topic,_epr.getMessageSelector());
+            tConn.start();
+        }
+        else
+        	throw new CourierException("Unknown destination type");
+
+                
+    } //________________________________
+
+			long 				_sleepForRetries = 3000;   //  milliseconds
+
+	protected boolean			_isReceiver;	  
     protected JMSEpr			_epr;
     protected Logger			_logger = Logger.getLogger(JmsCourier.class);
     protected String 			_messageSelector;
@@ -231,6 +332,5 @@
     protected Session 			_jmsSession;
     protected MessageProducer 	_messageProducer;
     protected MessageConsumer	_messageConsumer;
-    protected long 				_sleepForRetries = 3000;   //  milliseconds
     protected List<KeyValuePair> _messageProperties;
 }




More information about the jboss-svn-commits mailing list