[jboss-svn-commits] JBL Code SVN: r5261 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Jul 24 14:08:53 EDT 2006
Author: estebanschifman
Date: 2006-07-24 14:08:51 -0400 (Mon, 24 Jul 2006)
New Revision: 5261
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
Log:
Fixed bugs in JmsQueueListener
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-07-24 15:42:39 UTC (rev 5260)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-07-24 18:08:51 UTC (rev 5261)
@@ -23,6 +23,11 @@
public class JmsQueueListener
{
+ public static void main(String[] args) throws Exception
+ {
+ new JmsQueueListener(args[0]);
+ }
+
// You can override this value at constructor time of your
// derived class after calling super(String)
protected int m_iDfltReloadMillis= 180000 // default interval between
@@ -77,7 +82,7 @@
{ try
{ String sMsg = (null == m_oParms)
? "Initial Parameter loading" : "Reloading Params";
- m_oLogger.info(formatLogMsg(sMsg));
+ m_oLogger.debug(formatLogMsg(sMsg));
m_oParms = m_oParmRepos.getElement(m_oParmsName);
}
catch (Exception e)
@@ -113,7 +118,7 @@
{
oneScan(oCurr, bFirst);
}
- if (waitForQuiesce(1000))
+ if (waitForQuiesce(5000))
{ m_bEndRequested = true;
return;
}
@@ -258,9 +263,8 @@
protected QueueConnection m_oQconn;
protected QueueSession m_oQsess;
protected Queue m_oQueue;
+ protected String m_sSelector;
- protected String m_sSelectorService;
-
protected GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception
{
m_oThrGrp = p_oThrGrp;
@@ -273,15 +277,11 @@
{
if (p_oUsrObj instanceof Integer)
{
- updQthreads( ( (Integer) p_oUsrObj).intValue());
+ int iAdd = ((Integer) p_oUsrObj).intValue();
+ m_iQthr += iAdd;
}
} //________________________________
- private synchronized void updQthreads(int p_i)
- {
- m_iQthr += p_i;
- } //________________________________
-
private void execute(DomElement p_oP) throws Exception
{
m_sb.setLength(m_iSbIni);
@@ -338,19 +338,20 @@
m_sb.setLength(m_iSbIni);
m_oChParms = p_oP.cloneObj();
m_oChParms.rmvChildsByName(EsbAbstractProcessor.PARMS_THIS_INSTANCE);
- setMaxThreads(p_oP,1);
+ setMaxThreads(p_oP,10);
obtainAtt(p_oP,LISTEN_QUEUE,null);
- m_sSelectorService = obtainAtt(p_oP,LISTEN_MSG_SELECTOR,null);
+ m_sSelector = obtainAtt(p_oP,LISTEN_MSG_SELECTOR,null);
-
-
-
String sClass = obtainAtt(p_oP,PARM_ACTION_CLASS,null);
if (EsbUtil.isNullString(sClass))
throw new Exception(formatLogMsg("Missing value for "+PARM_ACTION_CLASS));
m_oExecClass = Class.forName(sClass);
+ Constructor m_oConstructor = m_oExecClass.getConstructor
+ (new Class[] {DomElement.class});
+ if (null==m_oConstructor)
+ throw new Exception ("No constructor "+sClass+"(DomElement) found");
} //________________________________
protected final void obtainQueue(DomElement p_oP) throws JMSException, NamingException
@@ -387,49 +388,46 @@
protected void doYourJob(DomElement p_oP) throws Exception
{
+ obtainQueue(p_oP);
while(System.currentTimeMillis() < m_lNextReload)
{
- if (m_iQthr > m_iMaxThr)
- { m_oLogger.info(m_sb.append("Waiting for available threads").toString());
- m_sb.setLength(m_iSbIni);
- Thread.sleep(5000);
- continue;
- }
- obtainQueue(p_oP);
- //String sSelector = m_oParms.getAttr(LISTEN_MSG_SELECTOR);
- MessageConsumer oReader = m_oQsess.createReceiver(m_oQueue, m_sSelectorService);
- Message oMsg = (null==oReader) ? null
- : oReader.receiveNoWait();
- if (null==oMsg)
- { Thread.sleep(1000);
- continue;
- }
+ if (m_iQthr >= m_iMaxThr)
+ return;
- MsgChildProcess oNew = getMsgChildProcess(this,oMsg);
+ MsgChildProcess oNew = getMsgChildProcess(this,p_oP);
new Thread(m_oThrGrp,oNew).start();
// Wait a little bit, so thread count will be updated
// at some point in the past, this sleep was indispensable
// new thread control classes in Java 5 might have solved the problem
Thread.sleep(500);
}
+ if (null!=m_oQsess)
+ try { m_oQsess.close(); }
+ catch (Exception e1) {}
+ if (null!=m_oQconn)
+ try { m_oQconn.close(); }
+ catch (Exception e2) {}
+
} //________________________________
protected MsgChildProcess getMsgChildProcess
- (GroupOfChilds pDad, Message pMsg) throws Exception
- { return new MsgChildProcess (pDad,pMsg); }
+ (GroupOfChilds pDad, DomElement p_oP) throws Exception
+ { return new MsgChildProcess (pDad,p_oP); }
} //______________________________________________________
protected class MsgChildProcess extends Observable implements Runnable
{
protected GroupOfChilds m_oParent; // you can always go there for common stuff
- protected Message m_oMsg;
+ protected DomElement m_oParms;
+ protected EsbMsgProcessor m_oExec;
- public MsgChildProcess(GroupOfChilds p_oGrp, Message p_oMsg)
+ public MsgChildProcess(GroupOfChilds p_oGrp, DomElement p_oParms)
throws Exception
{
+ m_oLogger.debug("Child "+p_oParms.getName());
m_oParent = p_oGrp;
- m_oMsg = p_oMsg;
+ m_oParms = p_oParms;
this.addObserver(m_oParent);
setChanged();
// add 1 to child thread count
@@ -438,26 +436,40 @@
public void run()
{
- Exception oAbend = null;
+ Class[] oaArgs = {DomElement.class};
+ MessageConsumer oReader = null;
try
{
- Constructor oCnst = m_oParent.m_oExecClass
- .getConstructor (new Class[] {DomElement.class});
- DomElement oParms = m_oParent.m_oChParms.cloneObj();
- Object oInst = oCnst.newInstance (new Object[] {oParms});
- EsbMsgProcessor oMP = (EsbMsgProcessor)oInst;
- oMP.processMessage(m_oMsg);
-
+ oReader = m_oParent.m_oQsess.createReceiver
+ (m_oParent.m_oQueue, m_oParent.m_sSelector);
+
+ long lSlack;
+ while ((lSlack=m_lNextReload-System.currentTimeMillis()) > 0)
+ {
+ Message oMsg = (null==oReader) ? null
+ : oReader.receive(lSlack);
+ if (null==oMsg)
+ continue;
+
+ try
+ {
+ DomElement oParms = m_oParms.cloneObj();
+ Constructor oCns = m_oParent.m_oExecClass.getConstructor(oaArgs);
+ Object oInst = oCns.newInstance (new Object[] {oParms});
+ m_oExec = (EsbMsgProcessor)oInst;
+ m_oExec.processMessage(oMsg);
+ notifyOK();
+ }
+ catch (Exception e)
+ { e.printStackTrace();
+ notifyError(e);
+ }
+ }
}
- catch (Exception e)
- { m_oLogger.error("run() FAILED",e);
- oAbend = e;
+ catch (JMSException oJ)
+ {
+ m_oLogger.error(oJ);
}
-
- if (null==oAbend)
- notifyOK();
- else
- notifyError(oAbend);
setChanged();
// decrease child thread count in parent group
@@ -467,11 +479,11 @@
public void notifyOK()
{ try
{
- String sNotif = getOkNotifContent();
+ Object oNotif = m_oExec.getOkNotification();
for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
{ NotificationList oNL = new NotificationList(oCurr);
if (! oNL.isOK()) continue;
- getNotifHandler().sendNotifications(oCurr,sNotif);
+ getNotifHandler().sendNotifications(oCurr,oNotif);
}
}
catch (Exception e) {}
@@ -479,11 +491,13 @@
public void notifyError(Exception p_e)
{
- String sNotif = getErrorNotifContent();
+ Object oNotif = (null==m_oExec)
+ ? "No action class instantiated"
+ : m_oExec.getErrorNotification();
ByteArrayOutputStream oBO = new ByteArrayOutputStream();
PrintStream oPS = new PrintStream(oBO);
try
- { oPS.println(sNotif);
+ { oPS.println(oNotif.toString());
if (null != p_e) p_e.printStackTrace(oPS);
oPS.close();
@@ -511,21 +525,6 @@
}
} //______________________________
- // These methods to be overriden by you own derived class
- protected String getOkNotifContent()
- {
- return "Success";
- }
- protected String getErrorNotifContent()
- {
- return "FAILURE";
- }
-
} //______________________________________________________
- public static void main(String[] args) throws Exception
- {
- new JmsQueueListener(args[0]);
- }
-
} //____________________________________________________________________________
More information about the jboss-svn-commits
mailing list