[jboss-svn-commits] JBL Code SVN: r8363 - in labs/jbossesb/trunk/product/core: listeners/src/org/jboss/internal/soa/esb/listeners listeners/src/org/jboss/soa/esb/listeners listeners/src/org/jboss/soa/esb/listeners/message listeners/src/org/jboss/soa/esb/listeners/newgateway rosetta/src/org/jboss/soa/esb/couriers rosetta/src/org/jboss/soa/esb/helpers
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sat Dec 16 04:41:10 EST 2006
Author: estebanschifman
Date: 2006-12-16 04:40:54 -0500 (Sat, 16 Dec 2006)
New Revision: 8363
Added:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/newgateway/
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/newgateway/GatewayListener.java
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierUtil.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java
Log:
More progress on new listener classes
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java 2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/internal/soa/esb/listeners/DefaultListenerManager.java 2006-12-16 09:40:54 UTC (rev 8363)
@@ -27,7 +27,6 @@
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Observable;
import java.util.concurrent.ConcurrentHashMap;
@@ -265,7 +264,7 @@
{
synchronized (_synchAllReady)
{
- _childrenStarted = new HashMap<Class, Boolean>();
+ _childrenStarted = new HashMap<AbstractManagedListener, Boolean>();
for (ConfigTree oCurr : _config.getAllChildren())
{
String sClass = oCurr.getAttribute(PARM_LISTENER_CLASS);
@@ -305,19 +304,18 @@
+ " - Continuing with cached version", e);
}
}
- // m_oState = State.Shutting_down;
+// _status = State.Shutting_down;
_status = State.Done_OK;
_status.setCompletionCode(0);
- _logger.debug("Finishing_____________________________________________________");
// Close the command queue...
- try {
+ try
+ {
if (null != _commandQueue)
_commandQueue.close();
- } catch (CommandQueueException e) {
- _logger.error("Error closing Command Queue.", e);
}
+ catch (CommandQueueException e) { _logger.debug("Error closing Command Queue.", e); }
} // ________________________________
private void tryToLaunchChildListener(ConfigTree p_oP, String p_sClassName)
@@ -329,7 +327,7 @@
AbstractManagedListener listener = (AbstractManagedListener)
oConst.newInstance(new Object[] { this,p_oP});
- _childrenStarted.put(listener.getClass(), Boolean.FALSE);
+ _childrenStarted.put(listener, Boolean.FALSE);
((Observable)listener).addObserver(this);
new Thread(listener).start();
}
@@ -462,8 +460,12 @@
*/
public void requestEnd()
{
+ _endTimeStamp = System.currentTimeMillis()+1000;
+ _nextReload = _endTimeStamp+100;
+ _logger.debug("Waiting for child threads to finish___________________________");
+ waitForCompletion();
+ _logger.debug("ListenerManager ends__________________________________________");
_endRequested=true;
- _endTimeStamp = 0;
}
/* (non-Javadoc)
@@ -591,16 +593,6 @@
}
} //____________________________
- public EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException
- {
- return null;
- } //____________________________
-
- public List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException
- {
- return null;
- } //____________________________
-
// this method will typically run in the invoker's thread
// (which btw might be the same as the run() thread if this not launched in a separate thread
public void waitUntilReady()
@@ -627,11 +619,50 @@
} while (someChildPending);
} //____________________________
+ public void xxx()
+ {
+ boolean someChildPending = true;
+ do
+ {
+ Collection<Boolean> allStarted = null;
+ synchronized (_synchAllReady)
+ {
+ allStarted = (null==_childrenStarted) ? null : _childrenStarted.values();
+ }
+ if (null== allStarted)
+ break;
+ someChildPending = false;
+ for (Boolean curr : allStarted)
+ if (Boolean.TRUE.equals(curr))
+ someChildPending = true;
+ if (someChildPending)
+ try { Thread.sleep(_pauseTimeMillis); }
+ catch (InterruptedException e) { return; }
+
+ } while (someChildPending);
+ } //____________________________
+
+ public void waitForCompletion()
+ {
+ try { Thread.sleep(2000); }
+ catch (InterruptedException e) {}
+ boolean bRunning = true;
+ while (bRunning && null!=_childrenStarted && _childrenStarted.size()>0)
+ {
+ bRunning = false;
+ for (AbstractManagedListener curr :_childrenStarted.keySet())
+ {
+ if (curr.hasActiveTreads())
+ bRunning = true;
+ }
+ }
+ } //____________________________
+
// Child processes must let us know when they're ready
public void update(Observable o, Object arg)
{
if (null!=_childrenStarted && (arg instanceof Boolean))
- _childrenStarted.put(o.getClass(), (Boolean)arg);
+ _childrenStarted.put((AbstractManagedListener)o, (Boolean)arg);
} //____________________________
@@ -643,7 +674,7 @@
private State _status = State.Uninitialised;
public State getState() { return _status; }
- private Map<Class,Boolean> _childrenStarted;
+ private Map<AbstractManagedListener,Boolean> _childrenStarted;
private Map<String,Long> _paramFileTimeStamps=new ConcurrentHashMap<String,Long>();
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java 2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractManagedListener.java 2006-12-16 09:40:54 UTC (rev 8363)
@@ -83,14 +83,14 @@
_eprCategoryName= _config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
_eprName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- String attr = _config.getAttribute(ListenerTagNames.MAX_THREADS);
+ String attr = _config.getAttribute(ListenerTagNames.MAX_THREADS_TAG);
if (!Util.isNullString(attr))
try { _maxThreads = Integer.parseInt(attr); }
catch (Exception e) { attr = null; }
if (Util.isNullString(attr))
{
_maxThreads = _defaultMaxThreads;
- _logger.debug("Missing or invalid "+ListenerTagNames.MAX_THREADS+" attribute"
+ _logger.debug("Missing or invalid "+ListenerTagNames.MAX_THREADS_TAG+" attribute"
+" - Using default value of <"+_maxThreads+">");
}
} //________________________________
@@ -108,16 +108,6 @@
_controller.unRegister(_eprCategoryName, _eprName, _epr);
} //________________________________
- public boolean ensureThreadAvailable()
- {
- while ( (_qRunningThreads >= _maxThreads)
- &&_controller.continueLooping())
- try { Thread.sleep(_pauseLapseInMillis); }
- catch (InterruptedException e) { return false; }
-
- return _controller.continueLooping();
- } //________________________________
-
/**
* Wait until the registration process finished, a PickupCourier was obtained
* , and the pool thread is instantiated
@@ -138,6 +128,11 @@
updateThreadCount((Integer)arg);
} //________________________________
+ public boolean hasActiveTreads()
+ {
+ return _qRunningThreads > 0;
+ } //________________________________
+
protected void resetThreadCount()
{
synchronized (_synchThreads)
@@ -148,7 +143,7 @@
{
synchronized (_synchThreads)
{ _qRunningThreads += i.intValue();}
- _logger.debug("Thread pool ("+getClass().getSimpleName()+") used ="+_qRunningThreads+"/"+_maxThreads);
+// _logger.debug("Thread pool ("+getClass().getSimpleName()+") used ="+_qRunningThreads+"/"+_maxThreads);
} //________________________________
protected String obtainAttribute(String p_sAtt, String p_sDefault)
@@ -161,7 +156,7 @@
return (null != sVal) ? sVal : p_sDefault;
} // ________________________________
-
+
/**
* Loops until controlling process determines
* <br/>Invokes appropriate Courier to obtain incoming ESB Messages
@@ -184,8 +179,13 @@
while (_controller.continueLooping())
{
// Only pickup a message when a thread is available
- if (! ensureThreadAvailable())
- break;
+ if (_qRunningThreads >= _maxThreads)
+ {
+// _logger.debug("+++++++++++"+Thread.currentThread()+_qRunningThreads);
+ try { Thread.sleep(_pauseLapseInMillis); }
+ catch (InterruptedException e) { break;}
+ continue;
+ }
long lWait = _controller.millisToWait();
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java 2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerManager.java 2006-12-16 09:40:54 UTC (rev 8363)
@@ -22,7 +22,6 @@
package org.jboss.soa.esb.listeners;
import java.text.SimpleDateFormat;
-import java.util.List;
import java.util.Observer;
import org.jboss.soa.esb.addressing.EPR;
@@ -75,22 +74,4 @@
*/
public abstract void unRegister(String serviceCategoryName, String serviceName, EPR epr) throws RegistryException;
- /**
- * Obtain an EPR under requested category and name
- *
- * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
- * @param serviceName - name of the service ("
- * @throws RegistryException
- */
- public abstract EPR getEpr(String serviceCategoryName, String serviceName) throws RegistryException;
-
- /**
- * Obtain a list of EPRs under requested category and name
- *
- * @param serviceCategoryName - name of the category of the service ('Content Based Routing')
- * @param serviceName - name of the service ("
- * @throws RegistryException
- */
- public abstract List<EPR> getEprs(String serviceCategoryName, String serviceName) throws RegistryException;
-
}
\ No newline at end of file
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2006-12-16 09:40:54 UTC (rev 8363)
@@ -27,8 +27,12 @@
public class ListenerTagNames
{
+ /** EPRs */
+ public static final String URL_TAG = "URL";
+ public static final String PROTOCOL_TAG = "protocol";
+
/** Threading */
- public static final String MAX_THREADS = "maxThreads";
+ public static final String MAX_THREADS_TAG = "maxThreads";
/** Registry */
public static final String SERVICE_CATEGORY_NAME_TAG = "service-category";
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2006-12-16 09:40:54 UTC (rev 8363)
@@ -1,6 +1,24 @@
package org.jboss.soa.esb.listeners;
+import java.io.File;
+import java.net.URL;
+import java.util.Collection;
+
import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.eprs.FTPEpr;
+import org.jboss.soa.esb.addressing.eprs.FileEpr;
+import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.NamingContext;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.Registry;
+import org.jboss.soa.esb.services.registry.RegistryFactory;
+import org.jboss.soa.esb.util.Util;
public class ListenerUtil
{
@@ -23,9 +41,7 @@
ListenerManager manager = ListenerManagerFactory.getInstance().getListenerManager(parametersName);
if (inNewThread)
{
- _logger.debug(Thread.currentThread()+ " Waiting for all child listeners to start");
- manager.waitUntilReady();
- _logger.debug(Thread.currentThread()+ " All child listeners ready");
+ new Thread(manager).start();
}
else
{
@@ -38,5 +54,194 @@
return manager;
} //________________________________
+ public static EPR tryToDeliver(Message message, String category, String name)
+ throws Exception
+ {
+ Courier courier = null;
+ Registry reg = RegistryFactory.getRegistry();
+ Collection<EPR> all = reg.findEPRs(category, name);
+ if (null!=all)
+ for (EPR epr : all)
+ {
+ try
+ {
+ courier = CourierFactory.getCourier(epr);
+ if (!courier.deliver(message))
+ continue;
+ return epr;
+ }
+ finally { CourierUtil.cleanCourier(courier); }
+ }
+ String service = "["+category+","+name+"]";
+ String txt = (null==all || all.size()<1)
+ ? "No EPRs registered for "+service
+ : "Unable to deliver message to registered EPRs for "+service
+ ;
+ throw new Exception(txt);
+ } //________________________________
+
+ public static EPR assembleEpr(ConfigTree tree)
+ throws ConfigurationException
+ {
+ String protocol = tree.getAttribute(ListenerTagNames.PROTOCOL_TAG);
+ ConfigTree eprElement = tree.getFirstChild(protocol+"EPR");
+
+ try
+ {
+ if ("jms" .equals(protocol)) return jmsEprFromElement(eprElement);
+ if ("file" .equals(protocol)) return fileEprFromElement(eprElement);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Problem",e);
+ throw new ConfigurationException("xx",e);
+ }
+
+ throw new ConfigurationException("Unknown protocol <"+protocol+">");
+
+ } //________________________________
+
+ public static JMSEpr jmsEprFromElement(ConfigTree tree)
+ throws ConfigurationException
+ {
+ try
+ {
+
+ String name = tree.getAttribute(JMSEpr.DESTINATION_NAME_TAG,null);
+
+ String type = getAttrAndWarn(tree, JMSEpr.DESTINATION_TYPE_TAG,"queue");
+ String jndiURL = getAttrAndWarn(tree, JMSEpr.JNDI_URL_TAG ,"localhost");
+ String jndiContextFactory
+ = getAttrAndWarn(tree,JMSEpr.JNDI_CONTEXT_FACTORY,NamingContext.JBOSS_INITIAL_CONTEXT_FACTORY);
+ String jndiPkgPrefix
+ =getAttrAndWarn(tree,JMSEpr.JNDI_PKG_PREFIX_TAG,NamingContext.JBOSS_URL_PKG_PREFIX);
+ String jmsFactoryClass
+ = getAttrAndWarn(tree,JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+
+ String selector = tree.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
+ if (Util.isNullString(selector))
+ _logger.debug("No value specified for "+JMSEpr.MESSAGE_SELECTOR_TAG+" attribute"
+ +" - All messages in queue <"+name+"> will be picked up by listener");
+
+ JMSEpr epr = new JMSEpr(type,name,jmsFactoryClass
+ ,jndiURL,jndiContextFactory,jndiPkgPrefix,selector);
+
+ return epr;
+ }
+ catch (Exception e) { throw new ConfigurationException(e); }
+ } //________________________________
+
+
+ public static FileEpr fileEprFromElement(ConfigTree tree)
+ throws Exception
+ {
+ URL url = new URL(tree.getAttribute(ListenerTagNames.URL_TAG,null));
+ String protocol = url.getProtocol();
+
+ if ("file".equals(protocol))
+ {
+ if (!new File(url.getFile()).isDirectory())
+ throw new ConfigurationException("Attribute "
+ +ListenerTagNames.URL_TAG+" must reference a directory");
+ }
+
+ FileEpr epr =
+ ("file".equals(protocol)) ? new FileEpr(url)
+ :("ftp".equals(protocol)) ? new FTPEpr (url)
+ : null
+ ;
+ if (null==epr)
+ throw new ConfigurationException("Unsupported file protocol : "+protocol);
+
+ epr.setInputSuffix(tree.getAttribute(FileEpr.INPUT_SUFFIX_TAG,null));
+
+ boolean bErrorDel = Boolean.parseBoolean
+ (getAttrAndWarn(tree,FileEpr.ERROR_DEL_TAG,"true"));
+ epr.setErrorDelete(bErrorDel);
+
+ String errorDir = tree.getAttribute(FileEpr.ERROR_DIR_TAG);
+ String errorSuffix = tree.getAttribute(FileEpr.ERROR_SUFFIX_TAG);
+ if (bErrorDel)
+ {
+ if (null!= errorDir || null!=errorSuffix)
+ _logger.warn("If you don't specify "+FileEpr.ERROR_DEL_TAG+"'false' ,"
+ +FileEpr.ERROR_DIR_TAG+" and "+FileEpr.ERROR_SUFFIX_TAG
+ +" will have no effect because files in error will be deleted");
+ }
+ if (null==errorDir)
+ {
+ errorDir = url.getFile();
+ warnDefault(FileEpr.ERROR_DIR_TAG, errorDir);
+ }
+ if (null==errorSuffix)
+ {
+ errorSuffix = ".esbERROR";
+ warnDefault(FileEpr.ERROR_SUFFIX_TAG, errorSuffix);
+ }
+
+
+ boolean bPostDel = Boolean.parseBoolean
+ (getAttrAndWarn(tree,FileEpr.POST_DEL_TAG,"false"));
+ epr.setPostDelete(bPostDel);
+
+ String postDir = tree.getAttribute(FileEpr.POST_DIR_TAG);
+ String postSuffix = tree.getAttribute(FileEpr.POST_SUFFIX_TAG);
+ if (bPostDel)
+ {
+ if (null!= postDir || null!=postSuffix)
+ _logger.warn("If you specify "+FileEpr.POST_DEL_TAG+"'true' ,"
+ +FileEpr.POST_DIR_TAG+" and "+FileEpr.POST_SUFFIX_TAG
+ +" will have no effect because processed input messages will be deleted");
+ }
+ if (null==postDir)
+ {
+ postDir = url.getFile();
+ warnDefault(FileEpr.POST_DIR_TAG, postDir);
+ }
+ if (null==postSuffix)
+ {
+ postSuffix = ".esbDONE";
+ warnDefault(FileEpr.POST_SUFFIX_TAG, postSuffix);
+ }
+
+ if (epr instanceof FTPEpr)
+ {
+ FTPEpr ftp = (FTPEpr)epr;
+ ftp.setUserName(getAttrAndWarn(tree, FTPEpr.USERNAME_TAG, null));
+ ftp.setPassword(getAttrAndWarn(tree, FTPEpr.PASSWORD_TAG, ""));
+ ftp.setPassive(Boolean.valueOf(getAttrAndWarn(tree,FTPEpr.PASSIVE_TAG,"false")));
+ }
+ return epr;
+ } //________________________________
+
+
+ public static String getAttrAndWarn(ConfigTree tree, String tag,String defaultValue)
+ throws ConfigurationException
+ {
+ String value = null;
+ try
+ {
+ value = tree.getAttribute(tag);
+ if (null==value)
+ if (null==defaultValue)
+ throw new ConfigurationException("Missing or invalid "+tag+" attribute");
+ else
+ {
+ warnDefault(tag,defaultValue);
+ value = defaultValue;
+ }
+ return value;
+ }
+ catch (Exception e) { throw new ConfigurationException(e); }
+ } //________________________________
+
+ private static final boolean LOGWARN=false;
+ private static void warnDefault(String tag, String defaultValue)
+ {
+ if (LOGWARN)
+ _logger.debug("No value specified for "+tag+" attribute"
+ +" - Using default value: '"+defaultValue+"'");
+ } //________________________________
+
private static final Logger _logger = Logger.getLogger(ListenerUtil.class);
}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2006-12-16 09:40:54 UTC (rev 8363)
@@ -81,12 +81,12 @@
for (String currMethod : saMethodList)
{
+// _logger.debug("Attempting to invoke "+_currentClass.getName()+" method "+currMethod);
Method method = _currentClass.getMethod(currMethod,new Class[] {Message.class});
// The processing result of each action feeds into the processing of the next action...
try
{
- _logger.debug("Invoking "+_currentClass.getName()+" method "+method.toString());
Message next = (Message)method.invoke(_currentProcessor,new Object[] {_message} );
if(next==null)
{
@@ -130,8 +130,8 @@
finally
{
processingComplete = true;
+ setChanged();
notifyObservers(new Integer(-1));
- setChanged();
}
}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2006-12-16 09:40:54 UTC (rev 8363)
@@ -22,17 +22,17 @@
package org.jboss.soa.esb.listeners.message;
-import java.lang.reflect.Method;
-
import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
+import org.jboss.soa.esb.couriers.CourierUtil;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.AbstractManagedListener;
import org.jboss.soa.esb.listeners.ListenerManager;
import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.ListenerUtil;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.util.Util;
@@ -72,10 +72,16 @@
{
super.checkMyParms();
+ // make sure a protocol was specified
+ obtainAttribute(ListenerTagNames.PROTOCOL_TAG, null);
+
if (Util.isNullString(_eprCategoryName))
throw new ConfigurationException("Missing or invalid "+ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
if (Util.isNullString(_eprName))
throw new ConfigurationException("Missing or invalid "+ListenerTagNames.SERVICE_NAME_TAG);
+
+ _epr = ListenerUtil.assembleEpr(_config);
+// _logger.debug(EPRHelper.toXMLString(_epr));
} // ________________________________
@Override
@@ -83,34 +89,6 @@
@Override
- public boolean finalizeRun()
- {
- try
- {
- if (null!=_pickUpCourier)
- try
- {
- Method cleanMethod = _pickUpCourier.getClass().getMethod("cleanup", new Class[] {});
- cleanMethod.invoke(_pickUpCourier, new Object[] {});
- }
- catch (NoSuchMethodException e) {/* OK Just don't invoke it */ }
- catch (Exception e)
- {
- _logger.error("Problems invoking courier.clean() Method",e);
- }
-
- unregisterProcess();
- return true;
- }
- catch (RegistryException re)
- {
- _logger.warn("Could not un register service " + re.getLocalizedMessage());
- return false;
- }
-
- } //________________________________
-
- @Override
public boolean initializeRun()
{
try
@@ -128,6 +106,23 @@
} //________________________________
@Override
+ public boolean finalizeRun()
+ {
+ try
+ {
+ CourierUtil.cleanCourier(_pickUpCourier);
+ unregisterProcess();
+ return true;
+ }
+ catch (RegistryException re)
+ {
+ _logger.warn("Could not un register service " + re.getLocalizedMessage());
+ return false;
+ }
+
+ } //________________________________
+
+ @Override
public void waitForEventAndProcess(long maxWaitMillis)
{
Message message = null;
Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/newgateway/GatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/newgateway/GatewayListener.java 2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/newgateway/GatewayListener.java 2006-12-16 09:40:54 UTC (rev 8363)
@@ -0,0 +1,116 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.newgateway;
+
+import org.jboss.internal.soa.esb.couriers.DeliverOnlyCourier;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.AbstractManagedListener;
+import org.jboss.soa.esb.listeners.ListenerManager;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * Esb abstract Gateway listener.
+ * <p/> Transport specific gateways must extend this class
+ * <br/>Keeps a thread pool to instantiate composer classes when an incoming event is received
+ *
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+
+public abstract class GatewayListener extends AbstractManagedListener
+{
+ /**
+ * public constructor
+ * @param controller ListenerManager - the controlling process
+ * @param config ConfigTree - Containing 'static' configuration for this instance
+ * @throws Exception
+ */
+ public GatewayListener(ListenerManager controller, ConfigTree config)
+ throws ConfigurationException
+ {
+ super (controller, config);
+ } // _______________________________
+
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms() throws ConfigurationException
+ {
+ super.checkMyParms();
+ _targetServiceCategory = obtainAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG,null);
+ _targetServiceName = obtainAttribute(ListenerTagNames.SERVICE_NAME_TAG,null);
+ } // ________________________________
+
+ @Override
+ public boolean isMessageAware() { return false; }
+
+
+ @Override
+ public boolean finalizeRun()
+ {
+ try
+ {
+ CourierUtil.cleanCourier(_deliverCourier);
+ unregisterProcess();
+ return true;
+ }
+ catch (RegistryException re)
+ {
+ _logger.warn("Could not un register service " + re.getLocalizedMessage());
+ return false;
+ }
+
+ } //________________________________
+
+ @Override
+ public boolean initializeRun()
+ {
+ try
+ {
+ if (! Util.isNullString(_eprCategoryName))
+ registerProcess();
+ return true;
+ }
+ catch (Exception re)
+ {
+ _logger.fatal("Could not register service " + re.getLocalizedMessage(),re);
+ return false;
+ }
+
+ } //________________________________
+
+ protected String _targetServiceCategory;
+ protected String _targetServiceName;
+ protected EPR _targetEpr;
+
+ protected DeliverOnlyCourier _deliverCourier;
+}
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierUtil.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierUtil.java 2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierUtil.java 2006-12-16 09:40:54 UTC (rev 8363)
@@ -31,6 +31,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
@@ -39,6 +40,8 @@
import javax.xml.parsers.ParserConfigurationException;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.DeliverOnlyCourier;
+import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.helpers.KeyValuePair;
@@ -50,9 +53,7 @@
{
public static final String CORRELATION_ID_TAG = "messageCorrelationId";
- private CourierUtil()
- {
- }
+ private CourierUtil() {}
public static List<KeyValuePair> propertiesFromSelector(String selector)
throws Exception
@@ -233,5 +234,26 @@
courier.deliver(message);
}
+ public static void cleanCourier (PickUpOnlyCourier courier) { cleanableCleanup(courier);}
+ public static void cleanCourier (DeliverOnlyCourier courier){ cleanableCleanup(courier);}
+ public static void cleanCourier (TwoWayCourier courier) { cleanableCleanup(courier);}
+
+ private static void cleanableCleanup (Object courier)
+ {
+ if (null!=courier)
+ try
+ {
+ Method cleanMethod = courier.getClass().getMethod("cleanup", new Class[] {});
+ cleanMethod.invoke(courier, new Object[] {});
+ }
+ catch (NoSuchMethodException e) {/* OK Just don't invoke it */ }
+ catch (Exception e)
+ {
+ _logger.error("Problems invoking clean() Method for class "
+ +courier.getClass().getSimpleName(),e);
+ }
+ } //________________________________
+
+
protected static Logger _logger = Logger.getLogger(CourierUtil.class);
}
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java 2006-12-16 01:54:48 UTC (rev 8362)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java 2006-12-16 09:40:54 UTC (rev 8363)
@@ -134,6 +134,22 @@
return (null==_attributes) ? null : _attributes.get(name);
} // _______________________________
/**
+ * retrieve the value assigned to an attribute key
+ * @param name String - the search key
+ * @param defaultValue String - the default value to return if attribute is not set
+ * @return String - the value assigned to the specified key
+ */
+ public String getAttribute(String name, String defaultValue)
+ throws Exception
+ {
+ String ret = (null==_attributes) ? null : _attributes.get(name);
+ if (null!=ret)
+ return ret;
+ if (null!=defaultValue)
+ return defaultValue;
+ throw new Exception("Invalid or missing <"+name+"> attribute ");
+ } // _______________________________
+ /**
* obtain the list of all attribute names
* @return Set<String> - the set of keys that have been assigned a non null value
*/
More information about the jboss-svn-commits
mailing list