[jboss-svn-commits] JBL Code SVN: r5121 - labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Jul 17 08:29:03 EDT 2006


Author: estebanschifman
Date: 2006-07-17 08:29:02 -0400 (Mon, 17 Jul 2006)
New Revision: 5121

Added:
   labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
Log:
First draft of a configurable JMS listener

Added: labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-07-17 12:28:30 UTC (rev 5120)
+++ labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-07-17 12:29:02 UTC (rev 5121)
@@ -0,0 +1,506 @@
+package org.jboss.soa.esb.listeners;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Observer;
+import java.util.Observable;
+
+import org.apache.log4j.*;
+
+import javax.naming.*;
+import javax.jms.*;
+
+import org.jboss.soa.esb.services.*;
+import org.jboss.soa.esb.util.*;
+import org.jboss.soa.esb.common.*;
+import org.jboss.soa.esb.helpers.*;
+import org.jboss.soa.esb.notification.*;
+import org.jboss.soa.esb.parameters.*;
+import org.jboss.soa.esb.processors.*;
+
+public class JmsQueueListener
+{  
+  // You can override this value at constructor time of your
+  // derived class after calling super(String)
+  protected int	m_iDfltReloadMillis= 180000 // default interval between
+  											 // parameter reloading
+  ;
+  
+  public static final String PARM_ACTION_CLASS		= "actionClass";
+
+  public static final String PARM_RELOAD_LTCY 		= "parmsReloadSecs";
+
+  public static final String PARM_TOPIC_CONN_FACT	= "topicConnFactoryClass";
+  public static final String PARM_QUIESCE_TOPIC		= "quiesceTopic";
+  public static final String PARM_QUIESCE_SELECTOR	= "quiesceSelector";
+
+  public static final String LISTEN_QUEUE_CONN_FACT	= "queueConnFactoryClass";
+  public static final String LISTEN_QUEUE			= "listenQueue";
+  public static final String LISTEN_MSG_SELECTOR	= "listenMsgSelector";
+
+  protected ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup();
+  protected Map<String,GroupOfChilds> m_omChildPrc
+  		= new HashMap<String,GroupOfChilds>();
+	
+  protected ParamsRepository m_oParmRepos;
+  protected Name	 	m_oParmsName;
+  protected Logger		m_oLogger;
+
+  protected DomElement	m_oParms;
+  protected boolean		m_bEndRequested;
+	
+  protected TopicConnection m_oTopicConn;
+  protected TopicSession 	m_oSession;
+  protected Topic 			m_oTopic;
+  protected TopicSubscriber m_oTopicSubs;
+  
+  protected long			m_lNextReload;
+  
+  
+
+  protected JmsQueueListener(String p_sParamsUid) throws Exception
+  {
+    m_oLogger = EsbUtil.getDefaultLogger(this.getClass());
+
+    String sFactoryClass = EsbSysProps.getParamsReposFactoryClass();
+    m_oParmRepos = ParamsReposUtil.reposFromFactory(sFactoryClass,null);
+    m_oParmsName = m_oParmRepos.nameFromString(p_sParamsUid);
+  } //__________________________________
+  
+  protected void runUntilEndRequested() throws Exception
+  {	while (! m_bEndRequested)
+  	{ try 
+  	  {	String sMsg = (null == m_oParms) 
+  				? "Initial Parameter loading" : "Reloading Params";
+  	    m_oLogger.info(formatLogMsg(sMsg));
+  		m_oParms	= m_oParmRepos.getElement(m_oParmsName); 
+  	  }
+  	  catch (Exception e)
+  	  {	
+  	    StringBuilder sb = new StringBuilder ("Problems loading params ")
+  			.append(m_oParmsName)
+  			.append((null==m_oParms)? " exiting..." : "continuing to use cached params") 
+  			;
+  	  	m_oLogger.error(formatLogMsg(sb.toString()));
+  	  	if (null==m_oParms)
+  	  		throw e;
+  	  }
+  	  quiesceTopicSubscribe();
+	  executeOneCycle();
+  	}
+  } //__________________________________
+  
+  private void executeOneCycle() throws Exception
+  {
+    String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY);
+    m_lNextReload = System.currentTimeMillis()
+        + ( (null != sAtt)
+        	? (1000 * Integer.parseInt(sAtt)) 
+            : m_iDfltReloadMillis
+           );
+
+    DomElement[] oaParms = m_oParms.getAllElemChildren();
+
+    boolean bFirst = true;
+    while (System.currentTimeMillis() <= m_lNextReload)
+    {
+      for (DomElement oCurr : oaParms)
+      {
+        oneScan(oCurr, bFirst);
+      }
+      if (waitForQuiesce(1000))
+      { m_bEndRequested = true;
+        return;
+      }
+      bFirst = false;
+    }
+  } //_________________________________________
+
+  protected String formatLogMsg(String p_s)
+  {	return new StringBuilder("Processor '")
+    	.append(getClass().getSimpleName()).append("'  <")
+    	.append(p_s).append(">")
+    	.toString();
+  } //__________________________________
+  
+  private final void quiesceTopicSubscribe() throws JMSException, NamingException
+  {
+    try
+    {
+      m_oTopicConn	= null;
+      m_oSession	= null;
+      m_oTopic		= null;
+      m_oTopicSubs	= null;
+      
+      String sStopTopic	= m_oParms.getAttr(PARM_QUIESCE_TOPIC);
+      if (EsbUtil.isNullString(sStopTopic))
+    	  return;
+      String sFactClass	= m_oParms.getAttr(PARM_TOPIC_CONN_FACT);
+      if (EsbUtil.isNullString(sFactClass))
+    	  sFactClass = "ConnectionFactory";
+
+      String sJndiType = EsbSysProps.getJndiServerType();
+      String sJndiURL = EsbSysProps.getJndiServerURL();
+      Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+      Object tmp = oJndiCtx.lookup(sFactClass);
+      TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
+
+      m_oTopicConn = tcf.createTopicConnection();
+      m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic);
+      m_oSession = m_oTopicConn.createTopicSession
+      		(false,TopicSession.AUTO_ACKNOWLEDGE);
+      m_oTopicConn.start();
+
+      String sSelector	= m_oParms.getAttr(PARM_QUIESCE_SELECTOR);
+      if (EsbUtil.isNullString(sSelector))
+    	  sSelector = "processor='"+getClass().getSimpleName()+"'";
+      m_oTopicSubs = m_oSession.createSubscriber(m_oTopic, sSelector,true);
+    }
+    catch (Exception e)
+    { m_oLogger.error("Problems connecting to JMS. ",e);
+    }
+
+  } //_________________________________________
+
+  protected boolean waitForQuiesce(long p_lMillis) throws Exception
+  {
+    try
+    { boolean bSleep = (null== m_oTopicSubs);
+      Object oMsg = (bSleep) ? null : secureQuiesceReceive(p_lMillis);
+    
+      if (null!=oMsg)
+        { m_oLogger.info("Starting Quiesce of "
+        		+getClass().getSimpleName());
+          return true;
+        }
+      if (bSleep)
+    	  Thread.sleep(p_lMillis);
+      return false;
+
+    }
+    catch (Exception e)
+    { m_oLogger.error("Problems with waitForQuiesce. ",e);
+      Thread.sleep(p_lMillis);
+      return false;
+    }
+  } //_________________________________________
+
+  private Message secureQuiesceReceive(long p_lMillis) throws Exception
+  {
+    while (true)
+    try
+    { return (null==m_oTopicSubs) ? null : m_oTopicSubs.receive(p_lMillis); }
+    catch (JMSException e)
+    {
+      // put here your recovery code
+      return null;
+    }
+
+  } //_________________________________________
+
+
+  protected void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception
+  {
+    String sPrcName = p_oP.getName();
+    if (!m_omChildPrc.containsKey(sPrcName))
+    {
+      ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()];
+      int iMax = m_oThrGrp.enumerate(oaCh);
+
+      ThreadGroup oThG = null;
+      for (int i1 = 0; null == oThG && i1 < iMax; i1++)
+      {	if (m_oThrGrp.getName().equals(sPrcName))
+          oThG = oaCh[i1];
+      }
+      if (null == oThG)
+        oThG = new ThreadGroup(sPrcName);
+      m_omChildPrc.put(sPrcName, newChildGroup(oThG));
+    }
+    GroupOfChilds oChildGrp = m_omChildPrc.get(sPrcName);
+
+    if (null == oChildGrp)				return;
+    if (p_bFirst)
+      oChildGrp.m_bError = false;
+
+    try
+    {
+      oChildGrp.execute(p_oP);
+    }
+    catch (Exception e)
+    {
+      oChildGrp.m_bError = true;
+      m_oLogger.error(formatLogMsg("GroupOfChilds.execute"), e);
+    }
+  } //_________________________________________
+
+  protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception
+  	{ return new GroupOfChilds(pThG); }
+
+  protected class GroupOfChilds implements Observer
+  {
+	public static final String PARM_MAX_THREADS = "maxThreads";
+	
+    protected ThreadGroup	m_oThrGrp;
+    protected boolean 		m_bError = false;
+
+    protected Class 		m_oExecClass;
+    protected DomElement 	m_oChParms;
+
+    protected int 			m_iQthr = 0, m_iMaxThr;
+    protected StringBuilder m_sb;
+    protected int 			m_iSbIni;
+
+    protected QueueConnection m_oQconn;
+    protected QueueSession	m_oQsess;
+    protected Queue			m_oQueue;
+    
+    protected GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception
+    {
+      m_oThrGrp = p_oThrGrp;
+      m_sb = new StringBuilder("GroupOfThreads ")
+          .append(m_oThrGrp.getName()).append(" : ");
+      m_iSbIni = m_sb.length();
+    } //________________________________
+
+    public void update(Observable p_oObs, Object p_oUsrObj)
+    {
+      if (p_oUsrObj instanceof Integer)
+      {
+        updQthreads( ( (Integer) p_oUsrObj).intValue());
+      }
+    } //________________________________
+
+    private synchronized void updQthreads(int p_i)
+    {
+      m_iQthr += p_i;
+    } //________________________________
+
+    private void execute(DomElement p_oP) throws Exception
+    {
+      m_sb.setLength(m_iSbIni);
+      if (m_bError)
+      {
+        m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors")
+                       .toString());
+        return;
+      }
+      checkParms(p_oP);
+      doYourJob	(p_oP);
+    } //________________________________
+
+    protected void setMaxThreads(DomElement p_oP,int p_iMax)
+    {
+      String sAtt = p_oP.getAttr(PARM_MAX_THREADS);
+      m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt);
+      m_iMaxThr = (m_iMaxThr < 1) ? 1 
+      			: (m_iMaxThr > p_iMax) ? p_iMax
+      			: m_iMaxThr;
+
+    } //________________________________
+
+    protected Class checkActionClass(String p_sName) throws Exception
+    {
+  	Class oCls;
+  	try	{ oCls = Class.forName(p_sName); }
+  	catch (ClassNotFoundException e)
+  	{ throw new Exception(formatLogMsg("Class "+p_sName
+  			+" not found in classpath"));
+  	}
+  	try	{ oCls.getConstructor(new Class[] {DomElement.class}); }
+  	catch (NoSuchMethodException eN)
+  	{ throw new Exception(formatLogMsg("No appropriate constructor "
+  			+p_sName+"(DomElement) found for class "));
+  	}
+  	return oCls;
+    } //_________________________________________
+
+    protected String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
+		throws Exception
+	{
+	  String sVal	= p_oP.getAttr(p_sAtt);
+	  if (null==sVal)
+		  sVal	= p_sDefault;
+	  if (EsbUtil.isNullString(sVal) && (null==p_sDefault))
+		  throw new Exception(formatLogMsg("Missing or invalid <"+p_sAtt+"> attribute"));
+	
+	  return sVal;
+	} //________________________________
+	
+	protected void checkParms(DomElement p_oP) throws Exception
+	{	
+    	m_sb.setLength(m_iSbIni);
+    	m_oChParms	= p_oP.cloneObj();
+    	m_oChParms.rmvChildsByName(EsbAbstractProcessor.PARMS_THIS_INSTANCE);
+		setMaxThreads(p_oP,1);
+		
+		obtainAtt(p_oP,LISTEN_QUEUE,null);		
+	} //________________________________
+
+	  protected final void obtainQueue() throws JMSException, NamingException
+	  {
+	    try
+	    {
+	      m_oQconn		= null;
+	      m_oQsess		= null;
+	      m_oQueue		= null;
+	      
+	      String sQueue	= m_oParms.getAttr(LISTEN_QUEUE);
+	      String sFactClass	= m_oParms.getAttr(LISTEN_QUEUE_CONN_FACT);
+	      if (EsbUtil.isNullString(sFactClass))
+	    	  sFactClass = "ConnectionFactory";
+
+	      String sJndiType = EsbSysProps.getJndiServerType();
+	      String sJndiURL = EsbSysProps.getJndiServerURL();
+	      Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+	      Object tmp = oJndiCtx.lookup(sFactClass);
+	      QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+	      m_oQconn = qcf.createQueueConnection();
+	      m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
+	      m_oQsess = m_oQconn.createQueueSession
+	      		(false,TopicSession.AUTO_ACKNOWLEDGE);
+	      m_oQconn.start();
+
+	    }
+	    catch (Exception e)
+	    { m_oLogger.error("Problems connecting to JMS. ",e);
+	    }
+
+	  } //_________________________________________
+
+	protected void doYourJob(DomElement p_oP) throws Exception
+	{
+		while(System.currentTimeMillis() < m_lNextReload)
+        {
+		  if (m_iQthr >= m_iMaxThr)
+		  {	m_oLogger.info(m_sb.append("Waiting for available threads").toString());
+          	Thread.sleep(5000);
+          	continue;
+		  }
+	      String sSelector	= m_oParms.getAttr(LISTEN_MSG_SELECTOR);
+		  MessageConsumer oReader = m_oQsess.createReceiver(m_oQueue, sSelector);
+	      Message oMsg = (null==oReader) ? null
+	    		  : oReader.receiveNoWait();
+	      if (null==oMsg)
+	      {	Thread.sleep(500);
+	    	continue;
+	      }
+
+		  MsgChildProcess oNew = getMsgChildProcess(this,oMsg);
+	      new Thread(m_oThrGrp,oNew).start();
+	        // Wait a little bit, so thread count will be updated
+	        // at some point in the past, this sleep was indispensable
+	        // new thread control classes in Java 5 might have solved the problem
+	        Thread.sleep(500);
+        }
+	} //________________________________
+	
+	protected MsgChildProcess getMsgChildProcess
+		(GroupOfChilds pDad, Message pMsg) throws Exception
+		{ return new MsgChildProcess (pDad,pMsg); }
+	
+  } //______________________________________________________
+  
+  protected class MsgChildProcess extends Observable implements Runnable
+  { 
+	protected GroupOfChilds m_oParent;  // you can always go there for common stuff
+	protected Message		m_oMsg;
+
+	public MsgChildProcess(GroupOfChilds p_oGrp, Message p_oMsg)
+  		throws Exception
+  	{
+      m_oParent	= p_oGrp;
+      this.addObserver(m_oParent);
+      setChanged();
+      // add 1 to child thread count
+      notifyObservers(new Integer(1));
+  	} //__________________________________
+
+	 public void run()
+	 {
+       Exception oAbend = null;
+	   try
+	   {
+		   Constructor oCnst = m_oParent.m_oExecClass
+		   		.getConstructor	(new Class[]  {DomElement.class});
+		   DomElement oParms = m_oParent.m_oChParms.cloneObj();
+		   Object oInst = oCnst.newInstance (new Object[] {oParms});
+	       ((EsbMsgProcessor)oInst).processMessage(m_oMsg);
+	   }
+	   catch (Exception e) 
+	   { m_oLogger.error("run() FAILED",e);
+       	 oAbend = e;
+	   }
+	   
+	   if (null==oAbend)
+		   notifyOK();
+	   else
+		   notifyError(oAbend);
+
+	   setChanged();
+	   // decrease child thread count in parent group
+	   notifyObservers(new Integer(-1));
+	  } //______________________________
+	 
+	  public void notifyOK()
+	  { try
+	    { 
+		  String sNotif = getOkNotifContent();
+		  for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
+	      { NotificationList oNL = new NotificationList(oCurr);
+	        if (! oNL.isOK())    continue;
+	        getNotifHandler().sendNotifications(oCurr,sNotif);
+	      }
+	    }
+	    catch (Exception e) {}
+	  } //__________________________________
+
+	  public void notifyError(Exception p_e)
+	  { 
+		String sNotif = getErrorNotifContent();
+		ByteArrayOutputStream oBO = new ByteArrayOutputStream();
+	    PrintStream oPS = new PrintStream(oBO);
+	    try
+	    { oPS.println(sNotif);
+	      if (null != p_e) p_e.printStackTrace(oPS);
+	      oPS.close();
+
+	      String sMsg = oBO.toString();
+	      for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
+	      { NotificationList oNL = new NotificationList(oCurr);
+	        if (! oNL.isErr())    continue;
+	        getNotifHandler().sendNotifications(oNL,sMsg);
+	      }
+	    }
+	    catch (Exception e) { }
+	  } //__________________________________
+
+	  protected InotificationHandler getNotifHandler()
+	  {
+		try {	return NotificationHandlerFactory.getNotifHandler
+		  			("remote"
+		  			,EsbSysProps.getJndiServerType()
+		  			,EsbSysProps.getJndiServerURL()
+		  			);
+			}
+		catch (Exception e)
+			{	m_oLogger.error(formatLogMsg("Notification FAILED"),e);
+				return null;
+			}
+	  } //______________________________
+	  
+	  // These methods to be overriden by you own derived class
+	  protected String getOkNotifContent()
+	  {
+		  return "Success";
+	  }
+	  protected String getErrorNotifContent()
+	  {
+		  return "FAILURE";
+	  }
+
+  } //______________________________________________________
+
+} //____________________________________________________________________________




More information about the jboss-svn-commits mailing list