[jboss-svn-commits] JBL Code SVN: r5121 - labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Jul 17 08:29:03 EDT 2006
Author: estebanschifman
Date: 2006-07-17 08:29:02 -0400 (Mon, 17 Jul 2006)
New Revision: 5121
Added:
labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
Log:
First draft of a configurable JMS listener
Added: labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-07-17 12:28:30 UTC (rev 5120)
+++ labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-07-17 12:29:02 UTC (rev 5121)
@@ -0,0 +1,506 @@
+package org.jboss.soa.esb.listeners;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Observer;
+import java.util.Observable;
+
+import org.apache.log4j.*;
+
+import javax.naming.*;
+import javax.jms.*;
+
+import org.jboss.soa.esb.services.*;
+import org.jboss.soa.esb.util.*;
+import org.jboss.soa.esb.common.*;
+import org.jboss.soa.esb.helpers.*;
+import org.jboss.soa.esb.notification.*;
+import org.jboss.soa.esb.parameters.*;
+import org.jboss.soa.esb.processors.*;
+
+public class JmsQueueListener
+{
+ // You can override this value at constructor time of your
+ // derived class after calling super(String)
+ protected int m_iDfltReloadMillis= 180000 // default interval between
+ // parameter reloading
+ ;
+
+ public static final String PARM_ACTION_CLASS = "actionClass";
+
+ public static final String PARM_RELOAD_LTCY = "parmsReloadSecs";
+
+ public static final String PARM_TOPIC_CONN_FACT = "topicConnFactoryClass";
+ public static final String PARM_QUIESCE_TOPIC = "quiesceTopic";
+ public static final String PARM_QUIESCE_SELECTOR = "quiesceSelector";
+
+ public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
+ public static final String LISTEN_QUEUE = "listenQueue";
+ public static final String LISTEN_MSG_SELECTOR = "listenMsgSelector";
+
+ protected ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup();
+ protected Map<String,GroupOfChilds> m_omChildPrc
+ = new HashMap<String,GroupOfChilds>();
+
+ protected ParamsRepository m_oParmRepos;
+ protected Name m_oParmsName;
+ protected Logger m_oLogger;
+
+ protected DomElement m_oParms;
+ protected boolean m_bEndRequested;
+
+ protected TopicConnection m_oTopicConn;
+ protected TopicSession m_oSession;
+ protected Topic m_oTopic;
+ protected TopicSubscriber m_oTopicSubs;
+
+ protected long m_lNextReload;
+
+
+
+ protected JmsQueueListener(String p_sParamsUid) throws Exception
+ {
+ m_oLogger = EsbUtil.getDefaultLogger(this.getClass());
+
+ String sFactoryClass = EsbSysProps.getParamsReposFactoryClass();
+ m_oParmRepos = ParamsReposUtil.reposFromFactory(sFactoryClass,null);
+ m_oParmsName = m_oParmRepos.nameFromString(p_sParamsUid);
+ } //__________________________________
+
+ protected void runUntilEndRequested() throws Exception
+ { while (! m_bEndRequested)
+ { try
+ { String sMsg = (null == m_oParms)
+ ? "Initial Parameter loading" : "Reloading Params";
+ m_oLogger.info(formatLogMsg(sMsg));
+ m_oParms = m_oParmRepos.getElement(m_oParmsName);
+ }
+ catch (Exception e)
+ {
+ StringBuilder sb = new StringBuilder ("Problems loading params ")
+ .append(m_oParmsName)
+ .append((null==m_oParms)? " exiting..." : "continuing to use cached params")
+ ;
+ m_oLogger.error(formatLogMsg(sb.toString()));
+ if (null==m_oParms)
+ throw e;
+ }
+ quiesceTopicSubscribe();
+ executeOneCycle();
+ }
+ } //__________________________________
+
+ private void executeOneCycle() throws Exception
+ {
+ String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY);
+ m_lNextReload = System.currentTimeMillis()
+ + ( (null != sAtt)
+ ? (1000 * Integer.parseInt(sAtt))
+ : m_iDfltReloadMillis
+ );
+
+ DomElement[] oaParms = m_oParms.getAllElemChildren();
+
+ boolean bFirst = true;
+ while (System.currentTimeMillis() <= m_lNextReload)
+ {
+ for (DomElement oCurr : oaParms)
+ {
+ oneScan(oCurr, bFirst);
+ }
+ if (waitForQuiesce(1000))
+ { m_bEndRequested = true;
+ return;
+ }
+ bFirst = false;
+ }
+ } //_________________________________________
+
+ protected String formatLogMsg(String p_s)
+ { return new StringBuilder("Processor '")
+ .append(getClass().getSimpleName()).append("' <")
+ .append(p_s).append(">")
+ .toString();
+ } //__________________________________
+
+ private final void quiesceTopicSubscribe() throws JMSException, NamingException
+ {
+ try
+ {
+ m_oTopicConn = null;
+ m_oSession = null;
+ m_oTopic = null;
+ m_oTopicSubs = null;
+
+ String sStopTopic = m_oParms.getAttr(PARM_QUIESCE_TOPIC);
+ if (EsbUtil.isNullString(sStopTopic))
+ return;
+ String sFactClass = m_oParms.getAttr(PARM_TOPIC_CONN_FACT);
+ if (EsbUtil.isNullString(sFactClass))
+ sFactClass = "ConnectionFactory";
+
+ String sJndiType = EsbSysProps.getJndiServerType();
+ String sJndiURL = EsbSysProps.getJndiServerURL();
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+ Object tmp = oJndiCtx.lookup(sFactClass);
+ TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
+
+ m_oTopicConn = tcf.createTopicConnection();
+ m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic);
+ m_oSession = m_oTopicConn.createTopicSession
+ (false,TopicSession.AUTO_ACKNOWLEDGE);
+ m_oTopicConn.start();
+
+ String sSelector = m_oParms.getAttr(PARM_QUIESCE_SELECTOR);
+ if (EsbUtil.isNullString(sSelector))
+ sSelector = "processor='"+getClass().getSimpleName()+"'";
+ m_oTopicSubs = m_oSession.createSubscriber(m_oTopic, sSelector,true);
+ }
+ catch (Exception e)
+ { m_oLogger.error("Problems connecting to JMS. ",e);
+ }
+
+ } //_________________________________________
+
+ protected boolean waitForQuiesce(long p_lMillis) throws Exception
+ {
+ try
+ { boolean bSleep = (null== m_oTopicSubs);
+ Object oMsg = (bSleep) ? null : secureQuiesceReceive(p_lMillis);
+
+ if (null!=oMsg)
+ { m_oLogger.info("Starting Quiesce of "
+ +getClass().getSimpleName());
+ return true;
+ }
+ if (bSleep)
+ Thread.sleep(p_lMillis);
+ return false;
+
+ }
+ catch (Exception e)
+ { m_oLogger.error("Problems with waitForQuiesce. ",e);
+ Thread.sleep(p_lMillis);
+ return false;
+ }
+ } //_________________________________________
+
+ private Message secureQuiesceReceive(long p_lMillis) throws Exception
+ {
+ while (true)
+ try
+ { return (null==m_oTopicSubs) ? null : m_oTopicSubs.receive(p_lMillis); }
+ catch (JMSException e)
+ {
+ // put here your recovery code
+ return null;
+ }
+
+ } //_________________________________________
+
+
+ protected void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception
+ {
+ String sPrcName = p_oP.getName();
+ if (!m_omChildPrc.containsKey(sPrcName))
+ {
+ ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()];
+ int iMax = m_oThrGrp.enumerate(oaCh);
+
+ ThreadGroup oThG = null;
+ for (int i1 = 0; null == oThG && i1 < iMax; i1++)
+ { if (m_oThrGrp.getName().equals(sPrcName))
+ oThG = oaCh[i1];
+ }
+ if (null == oThG)
+ oThG = new ThreadGroup(sPrcName);
+ m_omChildPrc.put(sPrcName, newChildGroup(oThG));
+ }
+ GroupOfChilds oChildGrp = m_omChildPrc.get(sPrcName);
+
+ if (null == oChildGrp) return;
+ if (p_bFirst)
+ oChildGrp.m_bError = false;
+
+ try
+ {
+ oChildGrp.execute(p_oP);
+ }
+ catch (Exception e)
+ {
+ oChildGrp.m_bError = true;
+ m_oLogger.error(formatLogMsg("GroupOfChilds.execute"), e);
+ }
+ } //_________________________________________
+
+ protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception
+ { return new GroupOfChilds(pThG); }
+
+ protected class GroupOfChilds implements Observer
+ {
+ public static final String PARM_MAX_THREADS = "maxThreads";
+
+ protected ThreadGroup m_oThrGrp;
+ protected boolean m_bError = false;
+
+ protected Class m_oExecClass;
+ protected DomElement m_oChParms;
+
+ protected int m_iQthr = 0, m_iMaxThr;
+ protected StringBuilder m_sb;
+ protected int m_iSbIni;
+
+ protected QueueConnection m_oQconn;
+ protected QueueSession m_oQsess;
+ protected Queue m_oQueue;
+
+ protected GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception
+ {
+ m_oThrGrp = p_oThrGrp;
+ m_sb = new StringBuilder("GroupOfThreads ")
+ .append(m_oThrGrp.getName()).append(" : ");
+ m_iSbIni = m_sb.length();
+ } //________________________________
+
+ public void update(Observable p_oObs, Object p_oUsrObj)
+ {
+ if (p_oUsrObj instanceof Integer)
+ {
+ updQthreads( ( (Integer) p_oUsrObj).intValue());
+ }
+ } //________________________________
+
+ private synchronized void updQthreads(int p_i)
+ {
+ m_iQthr += p_i;
+ } //________________________________
+
+ private void execute(DomElement p_oP) throws Exception
+ {
+ m_sb.setLength(m_iSbIni);
+ if (m_bError)
+ {
+ m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors")
+ .toString());
+ return;
+ }
+ checkParms(p_oP);
+ doYourJob (p_oP);
+ } //________________________________
+
+ protected void setMaxThreads(DomElement p_oP,int p_iMax)
+ {
+ String sAtt = p_oP.getAttr(PARM_MAX_THREADS);
+ m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt);
+ m_iMaxThr = (m_iMaxThr < 1) ? 1
+ : (m_iMaxThr > p_iMax) ? p_iMax
+ : m_iMaxThr;
+
+ } //________________________________
+
+ protected Class checkActionClass(String p_sName) throws Exception
+ {
+ Class oCls;
+ try { oCls = Class.forName(p_sName); }
+ catch (ClassNotFoundException e)
+ { throw new Exception(formatLogMsg("Class "+p_sName
+ +" not found in classpath"));
+ }
+ try { oCls.getConstructor(new Class[] {DomElement.class}); }
+ catch (NoSuchMethodException eN)
+ { throw new Exception(formatLogMsg("No appropriate constructor "
+ +p_sName+"(DomElement) found for class "));
+ }
+ return oCls;
+ } //_________________________________________
+
+ protected String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
+ throws Exception
+ {
+ String sVal = p_oP.getAttr(p_sAtt);
+ if (null==sVal)
+ sVal = p_sDefault;
+ if (EsbUtil.isNullString(sVal) && (null==p_sDefault))
+ throw new Exception(formatLogMsg("Missing or invalid <"+p_sAtt+"> attribute"));
+
+ return sVal;
+ } //________________________________
+
+ protected void checkParms(DomElement p_oP) throws Exception
+ {
+ m_sb.setLength(m_iSbIni);
+ m_oChParms = p_oP.cloneObj();
+ m_oChParms.rmvChildsByName(EsbAbstractProcessor.PARMS_THIS_INSTANCE);
+ setMaxThreads(p_oP,1);
+
+ obtainAtt(p_oP,LISTEN_QUEUE,null);
+ } //________________________________
+
+ protected final void obtainQueue() throws JMSException, NamingException
+ {
+ try
+ {
+ m_oQconn = null;
+ m_oQsess = null;
+ m_oQueue = null;
+
+ String sQueue = m_oParms.getAttr(LISTEN_QUEUE);
+ String sFactClass = m_oParms.getAttr(LISTEN_QUEUE_CONN_FACT);
+ if (EsbUtil.isNullString(sFactClass))
+ sFactClass = "ConnectionFactory";
+
+ String sJndiType = EsbSysProps.getJndiServerType();
+ String sJndiURL = EsbSysProps.getJndiServerURL();
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+ Object tmp = oJndiCtx.lookup(sFactClass);
+ QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+ m_oQconn = qcf.createQueueConnection();
+ m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
+ m_oQsess = m_oQconn.createQueueSession
+ (false,TopicSession.AUTO_ACKNOWLEDGE);
+ m_oQconn.start();
+
+ }
+ catch (Exception e)
+ { m_oLogger.error("Problems connecting to JMS. ",e);
+ }
+
+ } //_________________________________________
+
+ protected void doYourJob(DomElement p_oP) throws Exception
+ {
+ while(System.currentTimeMillis() < m_lNextReload)
+ {
+ if (m_iQthr >= m_iMaxThr)
+ { m_oLogger.info(m_sb.append("Waiting for available threads").toString());
+ Thread.sleep(5000);
+ continue;
+ }
+ String sSelector = m_oParms.getAttr(LISTEN_MSG_SELECTOR);
+ MessageConsumer oReader = m_oQsess.createReceiver(m_oQueue, sSelector);
+ Message oMsg = (null==oReader) ? null
+ : oReader.receiveNoWait();
+ if (null==oMsg)
+ { Thread.sleep(500);
+ continue;
+ }
+
+ MsgChildProcess oNew = getMsgChildProcess(this,oMsg);
+ new Thread(m_oThrGrp,oNew).start();
+ // Wait a little bit, so thread count will be updated
+ // at some point in the past, this sleep was indispensable
+ // new thread control classes in Java 5 might have solved the problem
+ Thread.sleep(500);
+ }
+ } //________________________________
+
+ protected MsgChildProcess getMsgChildProcess
+ (GroupOfChilds pDad, Message pMsg) throws Exception
+ { return new MsgChildProcess (pDad,pMsg); }
+
+ } //______________________________________________________
+
+ protected class MsgChildProcess extends Observable implements Runnable
+ {
+ protected GroupOfChilds m_oParent; // you can always go there for common stuff
+ protected Message m_oMsg;
+
+ public MsgChildProcess(GroupOfChilds p_oGrp, Message p_oMsg)
+ throws Exception
+ {
+ m_oParent = p_oGrp;
+ this.addObserver(m_oParent);
+ setChanged();
+ // add 1 to child thread count
+ notifyObservers(new Integer(1));
+ } //__________________________________
+
+ public void run()
+ {
+ Exception oAbend = null;
+ try
+ {
+ Constructor oCnst = m_oParent.m_oExecClass
+ .getConstructor (new Class[] {DomElement.class});
+ DomElement oParms = m_oParent.m_oChParms.cloneObj();
+ Object oInst = oCnst.newInstance (new Object[] {oParms});
+ ((EsbMsgProcessor)oInst).processMessage(m_oMsg);
+ }
+ catch (Exception e)
+ { m_oLogger.error("run() FAILED",e);
+ oAbend = e;
+ }
+
+ if (null==oAbend)
+ notifyOK();
+ else
+ notifyError(oAbend);
+
+ setChanged();
+ // decrease child thread count in parent group
+ notifyObservers(new Integer(-1));
+ } //______________________________
+
+ public void notifyOK()
+ { try
+ {
+ String sNotif = getOkNotifContent();
+ for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
+ { NotificationList oNL = new NotificationList(oCurr);
+ if (! oNL.isOK()) continue;
+ getNotifHandler().sendNotifications(oCurr,sNotif);
+ }
+ }
+ catch (Exception e) {}
+ } //__________________________________
+
+ public void notifyError(Exception p_e)
+ {
+ String sNotif = getErrorNotifContent();
+ ByteArrayOutputStream oBO = new ByteArrayOutputStream();
+ PrintStream oPS = new PrintStream(oBO);
+ try
+ { oPS.println(sNotif);
+ if (null != p_e) p_e.printStackTrace(oPS);
+ oPS.close();
+
+ String sMsg = oBO.toString();
+ for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
+ { NotificationList oNL = new NotificationList(oCurr);
+ if (! oNL.isErr()) continue;
+ getNotifHandler().sendNotifications(oNL,sMsg);
+ }
+ }
+ catch (Exception e) { }
+ } //__________________________________
+
+ protected InotificationHandler getNotifHandler()
+ {
+ try { return NotificationHandlerFactory.getNotifHandler
+ ("remote"
+ ,EsbSysProps.getJndiServerType()
+ ,EsbSysProps.getJndiServerURL()
+ );
+ }
+ catch (Exception e)
+ { m_oLogger.error(formatLogMsg("Notification FAILED"),e);
+ return null;
+ }
+ } //______________________________
+
+ // These methods to be overriden by you own derived class
+ protected String getOkNotifContent()
+ {
+ return "Success";
+ }
+ protected String getErrorNotifContent()
+ {
+ return "FAILURE";
+ }
+
+ } //______________________________________________________
+
+} //____________________________________________________________________________
More information about the jboss-svn-commits
mailing list