[jboss-svn-commits] JBL Code SVN: r5261 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Jul 24 14:08:53 EDT 2006


Author: estebanschifman
Date: 2006-07-24 14:08:51 -0400 (Mon, 24 Jul 2006)
New Revision: 5261

Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
Log:
Fixed bugs in JmsQueueListener

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-07-24 15:42:39 UTC (rev 5260)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-07-24 18:08:51 UTC (rev 5261)
@@ -23,6 +23,11 @@
 
 public class JmsQueueListener
 {  
+	public static void main(String[] args) throws Exception
+	{
+	    new JmsQueueListener(args[0]);
+	}
+
   // You can override this value at constructor time of your
   // derived class after calling super(String)
   protected int	m_iDfltReloadMillis= 180000 // default interval between
@@ -77,7 +82,7 @@
   	{ try 
   	  {	String sMsg = (null == m_oParms) 
   				? "Initial Parameter loading" : "Reloading Params";
-  	    m_oLogger.info(formatLogMsg(sMsg));
+  	    m_oLogger.debug(formatLogMsg(sMsg));
   		m_oParms	= m_oParmRepos.getElement(m_oParmsName); 
   	  }
   	  catch (Exception e)
@@ -113,7 +118,7 @@
       {
         oneScan(oCurr, bFirst);
       }
-      if (waitForQuiesce(1000))
+      if (waitForQuiesce(5000))
       { m_bEndRequested = true;
         return;
       }
@@ -258,9 +263,8 @@
     protected QueueConnection m_oQconn;
     protected QueueSession	m_oQsess;
     protected Queue			m_oQueue;
+    protected String		m_sSelector;
     
-    protected String		m_sSelectorService;
-    
     protected GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception
     {
       m_oThrGrp = p_oThrGrp;
@@ -273,15 +277,11 @@
     {
       if (p_oUsrObj instanceof Integer)
       {
-        updQthreads( ( (Integer) p_oUsrObj).intValue());
+        int iAdd = ((Integer) p_oUsrObj).intValue();
+        m_iQthr += iAdd;
       }
     } //________________________________
 
-    private synchronized void updQthreads(int p_i)
-    {
-      m_iQthr += p_i;
-    } //________________________________
-
     private void execute(DomElement p_oP) throws Exception
     {
       m_sb.setLength(m_iSbIni);
@@ -338,19 +338,20 @@
     	m_sb.setLength(m_iSbIni);
     	m_oChParms	= p_oP.cloneObj();
     	m_oChParms.rmvChildsByName(EsbAbstractProcessor.PARMS_THIS_INSTANCE);
-		setMaxThreads(p_oP,1);
+		setMaxThreads(p_oP,10);
 		
 		obtainAtt(p_oP,LISTEN_QUEUE,null);
 		
-		m_sSelectorService = obtainAtt(p_oP,LISTEN_MSG_SELECTOR,null);
+		m_sSelector = obtainAtt(p_oP,LISTEN_MSG_SELECTOR,null);
 		
-		
-		
-		
 		String sClass = obtainAtt(p_oP,PARM_ACTION_CLASS,null);
 		if (EsbUtil.isNullString(sClass))
 			throw new Exception(formatLogMsg("Missing value for "+PARM_ACTION_CLASS));
 		m_oExecClass = Class.forName(sClass);
+		Constructor m_oConstructor = m_oExecClass.getConstructor	
+				(new Class[]  {DomElement.class});
+	   if (null==m_oConstructor)
+		   throw new Exception ("No constructor "+sClass+"(DomElement) found");
 	} //________________________________
 
 	  protected final void obtainQueue(DomElement p_oP) throws JMSException, NamingException
@@ -387,49 +388,46 @@
 
 	protected void doYourJob(DomElement p_oP) throws Exception
 	{
+		obtainQueue(p_oP);
 		while(System.currentTimeMillis() < m_lNextReload)
         {
-		  if (m_iQthr > m_iMaxThr)
-		  {	m_oLogger.info(m_sb.append("Waiting for available threads").toString());
-		  	m_sb.setLength(m_iSbIni);
-          	Thread.sleep(5000);
-          	continue;
-		  }
-		  obtainQueue(p_oP);
-	      //String sSelector	= m_oParms.getAttr(LISTEN_MSG_SELECTOR);
-		  MessageConsumer oReader = m_oQsess.createReceiver(m_oQueue, m_sSelectorService);
-	      Message oMsg = (null==oReader) ? null
-	    		  : oReader.receiveNoWait();
-	      if (null==oMsg)
-	      {	Thread.sleep(1000);
-	    	continue;
-	      }
+		  if (m_iQthr >= m_iMaxThr)
+			  return;
 
-		  MsgChildProcess oNew = getMsgChildProcess(this,oMsg);
+		  MsgChildProcess oNew = getMsgChildProcess(this,p_oP);
 	      new Thread(m_oThrGrp,oNew).start();
 	        // Wait a little bit, so thread count will be updated
 	        // at some point in the past, this sleep was indispensable
 	        // new thread control classes in Java 5 might have solved the problem
 	        Thread.sleep(500);
         }
+		if (null!=m_oQsess)
+		      try { m_oQsess.close(); }    
+			catch (Exception e1) {}
+		if (null!=m_oQconn)
+		      try { m_oQconn.close(); }    
+			catch (Exception e2) {}
+
 	} //________________________________
 	
 	protected MsgChildProcess getMsgChildProcess
-		(GroupOfChilds pDad, Message pMsg) throws Exception
-		{ return new MsgChildProcess (pDad,pMsg); }
+		(GroupOfChilds pDad, DomElement p_oP) throws Exception
+		{ return new MsgChildProcess (pDad,p_oP); }
 	
   } //______________________________________________________
   
   protected class MsgChildProcess extends Observable implements Runnable
   { 
 	protected GroupOfChilds m_oParent;  // you can always go there for common stuff
-	protected Message		m_oMsg;
+	protected DomElement	m_oParms;
+	protected EsbMsgProcessor m_oExec;
 
-	public MsgChildProcess(GroupOfChilds p_oGrp, Message p_oMsg)
+	public MsgChildProcess(GroupOfChilds p_oGrp, DomElement p_oParms)
   		throws Exception
   	{
+	  m_oLogger.debug("Child "+p_oParms.getName());
       m_oParent	= p_oGrp;
-      m_oMsg	= p_oMsg;
+      m_oParms	= p_oParms;
       this.addObserver(m_oParent);
       setChanged();
       // add 1 to child thread count
@@ -438,26 +436,40 @@
 
 	 public void run()
 	 {
-       Exception oAbend = null;
+	   Class[] oaArgs = {DomElement.class};
+	   MessageConsumer oReader = null;		 
 	   try
 	   {
-		   Constructor oCnst = m_oParent.m_oExecClass
-		   		.getConstructor	(new Class[]  {DomElement.class});
-		   DomElement oParms = m_oParent.m_oChParms.cloneObj();
-		   Object oInst = oCnst.newInstance (new Object[] {oParms});
-		   EsbMsgProcessor oMP = (EsbMsgProcessor)oInst;
-		   oMP.processMessage(m_oMsg);
-		   
+	     oReader = m_oParent.m_oQsess.createReceiver
+	   			(m_oParent.m_oQueue, m_oParent.m_sSelector);
+	   
+	   	 long lSlack;
+	   	 while ((lSlack=m_lNextReload-System.currentTimeMillis()) > 0)
+	   	 {
+		   Message oMsg = (null==oReader) ? null
+	    		  : oReader.receive(lSlack);
+		   if (null==oMsg)
+			   continue;
+
+		   try
+		   {	
+			   DomElement oParms = m_oParms.cloneObj();
+		   	   Constructor oCns = m_oParent.m_oExecClass.getConstructor(oaArgs);
+			   Object oInst = oCns.newInstance (new Object[] {oParms});
+		   	   m_oExec = (EsbMsgProcessor)oInst;
+		   	   m_oExec.processMessage(oMsg);
+		   	   notifyOK();
+		   }
+		   catch (Exception e) 
+		   { e.printStackTrace();
+			   notifyError(e);
+	   	   }
+	     }
 	   }
-	   catch (Exception e) 
-	   { m_oLogger.error("run() FAILED",e);
-       	 oAbend = e;
+	   catch (JMSException oJ)
+	   {
+		   m_oLogger.error(oJ);
 	   }
-	   
-	   if (null==oAbend)
-		   notifyOK();
-	   else
-		   notifyError(oAbend);
 
 	   setChanged();
 	   // decrease child thread count in parent group
@@ -467,11 +479,11 @@
 	  public void notifyOK()
 	  { try
 	    { 
-		  String sNotif = getOkNotifContent();
+		  Object oNotif = m_oExec.getOkNotification();
 		  for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
 	      { NotificationList oNL = new NotificationList(oCurr);
 	        if (! oNL.isOK())    continue;
-	        getNotifHandler().sendNotifications(oCurr,sNotif);
+	        getNotifHandler().sendNotifications(oCurr,oNotif);
 	      }
 	    }
 	    catch (Exception e) {}
@@ -479,11 +491,13 @@
 
 	  public void notifyError(Exception p_e)
 	  { 
-		String sNotif = getErrorNotifContent();
+		Object oNotif = (null==m_oExec) 
+			? "No action class instantiated" 
+			: m_oExec.getErrorNotification();
 		ByteArrayOutputStream oBO = new ByteArrayOutputStream();
 	    PrintStream oPS = new PrintStream(oBO);
 	    try
-	    { oPS.println(sNotif);
+	    { oPS.println(oNotif.toString());
 	      if (null != p_e) p_e.printStackTrace(oPS);
 	      oPS.close();
 
@@ -511,21 +525,6 @@
 			}
 	  } //______________________________
 	  
-	  // These methods to be overriden by you own derived class
-	  protected String getOkNotifContent()
-	  {
-		  return "Success";
-	  }
-	  protected String getErrorNotifContent()
-	  {
-		  return "FAILURE";
-	  }
-
   } //______________________________________________________
   
-  public static void main(String[] args) throws Exception
-  {
-    new JmsQueueListener(args[0]);
-  }
-
 } //____________________________________________________________________________




More information about the jboss-svn-commits mailing list