[jboss-svn-commits] JBL Code SVN: r6863 - labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Oct 17 13:10:08 EDT 2006
Author: estebanschifman
Date: 2006-10-17 13:10:05 -0400 (Tue, 17 Oct 2006)
New Revision: 6863
Modified:
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java
labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java
Log:
Include file based registry utility (EPRManager)
Modified: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java 2006-10-17 15:53:45 UTC (rev 6862)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/EsbListenerController.java 2006-10-17 17:10:05 UTC (rev 6863)
@@ -22,10 +22,7 @@
package org.jboss.soa.esb.message.listeners;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.PrintStream;
-import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -36,23 +33,22 @@
import org.jboss.internal.soa.esb.command.CommandQueue;
import org.jboss.internal.soa.esb.command.CommandQueueException;
import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.common.Configuration;
+import org.jboss.soa.esb.addressing.util.EPRManager;
+import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
-import org.jboss.soa.esb.helpers.DomElement;
-import org.jboss.soa.esb.notification.NotificationList;
+import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.parameters.ParamRepositoryException;
import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
-import org.jboss.soa.esb.services.NotificationHandlerFactory;
-import org.jboss.soa.esb.services.NotificationManager;
import org.jboss.soa.esb.util.Util;
import org.xml.sax.SAXException;
+import com.arjuna.common.util.propertyservice.PropertyManager;
+
/**
- * Controlling class that will launch listener child threads for supported
- * transport listener classes, as indicated in the configuration XML tree
- * pointed by arg[0]
+ * Controlling class that will launch 'message aware' listener child threads for supported
+ * transport listener classes, as indicated in the configuration XML tree used in the constructor
+ * If you use the 'main' method, configuration file is expected in arg[0]
*
* <p />
* Can be launched as uppermost controller (it has a main(args) method)
@@ -68,12 +64,12 @@
* End time for this instance can also be set using the PARM_END_TIME attribute
* <p />
*
- * @author Esteban
- *
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
*/
public class EsbListenerController implements Runnable {
- private static Logger m_oLogger = Logger.getLogger(EsbListenerController.class);
+ private static Logger _logger = Logger.getLogger(EsbListenerController.class);
public static void main(String[] args) throws Exception {
EsbListenerController oProc = new EsbListenerController(args[0]);
@@ -81,7 +77,7 @@
EsbListenerController.State oS = oProc.getState();
if (null != oS.getException()) {
- m_oLogger.error("EsbListenerController <" + args[0] + "> FAILED\n", oS
+ _logger.error("EsbListenerController <" + args[0] + "> FAILED\n", oS
.getException());
}
System.exit(oS.getCompletionCode());
@@ -97,14 +93,11 @@
// Attribute name that denotes listener class to be instantiated in a child
// thread
// This attribute is not in the root node but in first level child
- // DomElements
+ // ConfigTrees
public static final String PARM_LISTENER_CLASS = "listenerClass";
- public static final String PARM_ACTIONS = "actions";
- public static final String PARM_MAX_THREADS = "maxThreads";
- public static final String CHLD_EMAIL_PARMS = "EmailProperties";
private String m_sParmsName;
- private DomElement m_oParms;
+ private ConfigTree m_oParms;
private HashMap<String, Object> m_oAtts;
@@ -156,9 +149,6 @@
};
private CommandQueue commandQueue;
-
- private ActionDefinitionFactory actionDefinitionFactory;
-
private static CommandQueue defaultCommandQueue = null;
/**
@@ -182,26 +172,25 @@
}
/**
- * Construct a Listener Manager using the specified listener configuration.
+ * Construct a Listener Controller using the specified listener configuration.
*
* @param config
* The configuration.
* @throws Exception
* Unable to load/use the supplied configuration.
*/
- public EsbListenerController(DomElement config) throws Exception {
+ public EsbListenerController(ConfigTree config) throws Exception {
m_oParms = config;
m_oState = State.Loading_parameters;
- try {
- checkParms(m_oParms);
- setEmailSystemProperties();
- } catch (Exception e) {
- String configSource = config.getAttr("configSource");
+ try { checkParms(m_oParms); }
+ catch (Exception e)
+ {
+ String configSource = config.getAttribute("configSource");
m_oState = State.Exception_thrown;
m_oState.m_oException = e;
- m_oLogger.fatal("Listener configuration and startup error. Config Source: "
+ _logger.fatal("Listener configuration and startup error. Config Source: "
+ (configSource != null ? configSource
: "unknown"), e);
@@ -216,7 +205,7 @@
* @param reposParam
* The name of the repository entry containing the Listener
* configuration.
- * @return Listener Configuration as {@link DomElement}.
+ * @return Listener Configuration as {@link ConfigTree}.
* @throws IOException
* Unable to access the repository.
* @throws ParamRepositoryException
@@ -224,12 +213,12 @@
* @throws SAXException
* Unable to parse the configuration.
*/
- private static DomElement getListenerConfig(String reposParam)
+ private static ConfigTree getListenerConfig(String reposParam)
throws IOException, ParamRepositoryException, SAXException {
String sXml = ParamRepositoryFactory.getInstance().get(reposParam);
- DomElement config = DomElement.fromXml(sXml);
+ ConfigTree config = ConfigTree.fromXml(sXml);
- config.setAttr("configSource", "param-repository:" + reposParam);
+ config.setAttribute("configSource", "param-repository:" + reposParam);
return config;
}
@@ -239,25 +228,25 @@
* values to some of them
*
* @param p_oP
- * DomElement - Where to look for the mandatory/optional
+ * ConfigTree - Where to look for the mandatory/optional
* configuration attributes
* @throws Exception -
* If attributes are wrong or not enough for a proper runtime
* configuration
*/
- public void checkParms(DomElement p_oP) throws Exception {
+ public void checkParms(ConfigTree p_oP) throws Exception {
// We've just loaded - set to false until next reload requested
m_bReloadRequested = false;
commandQueue = createCommandQueue(p_oP);
// Open the command queue...
if (null!=commandQueue)
- commandQueue.open(p_oP);
+ commandQueue.open(org.jboss.soa.esb.helpers.DomElement.fromConfigTree(p_oP));
// if PARM_RELOAD_SECS not set, and no command queue
// then reload every 10 minutes
// If there is a command queue, run until command is received
- String sRldSecs = p_oP.getAttr(PARM_RELOAD_SECS);
+ String sRldSecs = p_oP.getAttribute(PARM_RELOAD_SECS);
m_lNextReload = (null != sRldSecs)
? System.currentTimeMillis() + 1000 * Long.parseLong(sRldSecs)
: (null == commandQueue)
@@ -267,17 +256,15 @@
// if PARM_END_TIME not set try to run forever
// not a good practice if command queue is not set
// Expected date format is "yyyyMMdd hh:mm:ss"
- String sEndT = p_oP.getAttr(PARM_END_TIME);
+ String sEndT = p_oP.getAttribute(PARM_END_TIME);
m_lEndTime = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
sEndT).getTime();
// Read and initialise the action definitions...
- DomElement actionConfig = p_oP.getFirstElementChild("Actions");
+ ConfigTree actionConfig = p_oP.getFirstChild("Actions");
if(actionConfig == null) {
throw new ConfigurationException("No 'Actions' configuration.");
}
- actionDefinitionFactory = new ActionDefinitionFactory(actionConfig);
-
} // ________________________________
/**
@@ -285,14 +272,14 @@
* @param config EsbListenerController config.
* @return EsbListenerController CommandQueue instance.
*/
- private CommandQueue createCommandQueue(DomElement config) {
- String commandQueueClass = config.getAttr("command-queue-class");
+ private CommandQueue createCommandQueue(ConfigTree config) {
+ String commandQueueClass = config.getAttribute("command-queue-class");
if(commandQueueClass != null) {
try {
return (CommandQueue) Class.forName(commandQueueClass).newInstance();
} catch (Exception e) {
- m_oLogger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "]. Defaulting to no Command Queue", e);
+ _logger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "]. Defaulting to no Command Queue", e);
}
}
@@ -321,8 +308,8 @@
public void run() {
while (endNotRequested()) {
m_oState = State.Running;
- for (DomElement oCurr : m_oParms.getAllElemChildren()) {
- String sClass = oCurr.getAttr(PARM_LISTENER_CLASS);
+ for (ConfigTree oCurr : m_oParms.getAllChildren()) {
+ String sClass = oCurr.getAttribute(PARM_LISTENER_CLASS);
if (Util.isNullString(sClass))
continue;
tryToLaunchChildListener(oCurr, sClass);
@@ -336,14 +323,13 @@
if (m_sParmsName != null && timeToReload()) {
try {
m_oState = State.Loading_parameters;
- m_oLogger
+ _logger
.info("Reloading parameters _____________________________________________________");
- DomElement oNew = EsbListenerController.getListenerConfig(m_sParmsName);
+ ConfigTree oNew = EsbListenerController.getListenerConfig(m_sParmsName);
checkParms(oNew);
m_oParms = oNew;
- setEmailSystemProperties();
} catch (Exception e) {
- m_oLogger.error("Failed to reload parameters"
+ _logger.error("Failed to reload parameters"
+ " - Continuing with cached version", e);
}
}
@@ -352,28 +338,29 @@
m_oState = State.Done_OK;
m_oState.m_iCompletionCode = 0;
- m_oLogger
+ _logger
.info("Finishing_____________________________________________________");
// Close the command queue...
try {
commandQueue.close();
} catch (CommandQueueException e) {
- m_oLogger.error("Error closing Command Queue.", e);
+ _logger.error("Error closing Command Queue.", e);
}
} // ________________________________
- private void tryToLaunchChildListener(DomElement p_oP, String p_sClassName) {
- try {
+ private void tryToLaunchChildListener(ConfigTree p_oP, String p_sClassName) {
+ try
+ {
Class oListener = Class.forName(p_sClassName);
- Constructor oConst = oListener.getConstructor(new Class[] {
- this.getClass(), DomElement.class, ActionDefinitionFactory.class });
- Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this,
- p_oP, actionDefinitionFactory });
+ Constructor oConst = oListener.getConstructor(new Class[] {this.getClass(), ConfigTree.class});
+ Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this,p_oP});
new Thread(oRun).start();
- } catch (Exception e) {
- m_oLogger.error("Cannot launch <" + p_sClassName + ">\n", e);
}
+ catch (Exception e)
+ {
+ _logger.error("Cannot launch <" + p_sClassName + ">\n", e);
+ }
} // ________________________________
long millisToWait() {
@@ -384,7 +371,7 @@
long lToGo = millisToWait();
if (null == commandQueue) {
- m_oLogger.debug("About to sleep " + lToGo);
+ _logger.debug("About to sleep " + lToGo);
// No command queue nor topic - Just sleep until time
// exhausted, or thread interrupted
try {
@@ -401,7 +388,7 @@
// that's why time to go is recalculated on each cycle
while ((lToGo = millisToWait()) > 0) {
try {
- m_oLogger.info("Waiting for command ... timeout=" + lToGo + " millis");
+ _logger.info("Waiting for command ... timeout=" + lToGo + " millis");
String oM = commandQueue.receiveCommand(lToGo);
if (null == oM) {
@@ -412,7 +399,7 @@
break;
}
} catch (CommandQueueException eJ) {
- m_oLogger.info("receive on command queue failed", eJ);
+ _logger.info("receive on command queue failed", eJ);
}
}
} // ________________________________
@@ -456,12 +443,12 @@
String sLow = sTxt.trim().toLowerCase();
if (sLow.startsWith("shutdown")) {
m_bEndRequested = true;
- m_oLogger.info("Shutdown has been requested");
+ _logger.info("Shutdown has been requested");
return;
}
if (sLow.startsWith("reload param")) {
m_bReloadRequested = true;
- m_oLogger
+ _logger
.info("Request for parameter reload has been received");
return;
}
@@ -472,10 +459,10 @@
String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
: sa[2];
Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
- m_oLogger.info("New end date set to : " + oEnd);
+ _logger.info("New end date set to : " + oEnd);
m_lEndTime = oEnd.getTime();
} catch (Exception eDat) {
- m_oLogger.info("Problems with endTime command", eDat);
+ _logger.info("Problems with endTime command", eDat);
}
}
} // ________________________________
@@ -525,136 +512,30 @@
public boolean continueLooping() {
return (endNotRequested() && !timeToReload());
} // ________________________________
-
- private static final String[] s_saMailProps = { Environment.SMTP_HOST,
- Environment.SMTP_USERNAME, Environment.SMTP_PASSWORD,
- Environment.SMTP_PORT, Environment.SMTP_FROM,
- Environment.SMTP_AUTH };
-
- private void setEmailSystemProperties() {
- DomElement oEmail = m_oParms.getFirstElementChild(CHLD_EMAIL_PARMS);
- if (null != oEmail)
- for (String sCurr : s_saMailProps) {
- String sProp = oEmail.getAttr(sCurr);
- if (null != sProp)
- ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE).setProperty(sCurr, sProp);
- }
+
+ private static EPRManager getEprManager()
+ {
+ PropertyManager manager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.CORE_MODULE);
+ String sDir = manager.getProperty(Environment.REGISTRY_FILE_HELPER_DIR,".");
+ return EPRManager.getInstance(sDir);
+ }
+
+ public void register (String name, EPR address)
+ {
+ try { getEprManager().saveEPR(name,address); }
+ catch (IOException e)
+ {
+ _logger.fatal("Cannot register service",e);
+ }
} // ________________________________
- /**
- * Find an attribute in the tree (arg 0) or assign default value (arg 2)
- *
- * @param p_oP
- * DomElement - look for attributes in this Element only
- * @param p_sAtt
- * String - Name of attribute to find
- * @param p_sDefault
- * String -default value if requested attribute is not there
- * @return String - value of attribute, or default value (if null)
- * @throws Exception -
- * If requested attribute not found and no default value
- * supplied by invoker
- */
- public static String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
- throws ConfigurationException {
- String sVal = p_oP.getAttr(p_sAtt);
- if ((null == sVal) && (null == p_sDefault))
- throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
-
- return (null != sVal) ? sVal : p_sDefault;
- } // ________________________________
-
- /**
- * Find child nodes named "NotificationList" that contain an attribute
- * 'type' that starts with "ok" (case insensitive)
- *
- * @param p_oP -
- * DomElement to search for "NotificationList" child Elements
- * @param p_oSer
- * Serializable - Will constitute the body of the notification
- */
- public static void notifyOK(DomElement p_oP, Serializable p_oSer) {
- if(p_oSer == null) {
- return;
- }
-
- try {
- Serializable oNotif = p_oSer;
- for (DomElement oCurr : p_oP
- .getElementChildren(NotificationList.ELEMENT)) {
- NotificationList oNL = new NotificationList(oCurr);
- if (!oNL.isOK())
- continue;
- getNotifHandler().sendNotifications(oCurr, oNotif);
- }
- } catch (Exception e) {
+ public void unRegister (String name)
+ {
+ try { getEprManager().removeEPR(name); }
+ catch (IOException e)
+ {
+ _logger.fatal("Cannot un-register service",e);
}
- } // __________________________________
-
- /**
- * Find child nodes named "NotificationList" that contain an attribute
- * 'type' that starts with "err" (case insensitive) or no 'type' attribute
- * set
- *
- * @param p_oP -
- * DomElement to search for "NotificationList" child Elements
- * @param p_e -
- * Exception if not null, will be appended to the body
- * @param p_oSer
- * Serializable - Will be included at the beginning of the body
- * of the notification
- */
- public static void notifyError(DomElement p_oP, Exception p_e, Serializable p_oSer) {
- if(p_oSer == null) {
- return;
- }
-
- Serializable oNotif = p_oSer;
- ByteArrayOutputStream oBO = new ByteArrayOutputStream();
- PrintStream oPS = new PrintStream(oBO);
- try {
- oPS.println(oNotif.toString());
- if (null != p_e)
- p_e.printStackTrace(oPS);
- oPS.close();
-
- String sMsg = oBO.toString();
- for (DomElement oCurr : p_oP
- .getElementChildren(NotificationList.ELEMENT)) {
- NotificationList oNL = new NotificationList(oCurr);
- if (!oNL.isErr())
- continue;
- getNotifHandler().sendNotifications(oNL, sMsg);
- }
- } catch (Exception e) {
- }
} // ________________________________
- private static NotificationManager s_oNH;
-
- private static final Object s_oSync = new Integer(0);
-
- /**
- * Lazy instantiator of a InotificationHandler
- *
- * @return - a reference to an implementation of the interface or null if it
- * can't be instantiated
- */
- protected static NotificationManager getNotifHandler() {
- if (null != s_oNH)
- return s_oNH;
- synchronized (s_oSync) {
- if (null == s_oNH)
- try {
- s_oNH = NotificationHandlerFactory.getNotifHandler(
- "remote", Configuration.getJndiServerType(),
- Configuration.getJndiServerURL());
- } catch (Exception e) {
- Logger.getLogger(EsbListenerController.class).error(
- "Notification FAILED", e);
- }
- }
- return s_oNH;
- } // ______________________________
-
} // ____________________________________________________________________________
Modified: labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java 2006-10-17 15:53:45 UTC (rev 6862)
+++ labs/jbossesb/workspace/eschifman/trunk/product/core/listeners/src/org/jboss/soa/esb/message/listeners/JmsQueueListener.java 2006-10-17 17:10:05 UTC (rev 6863)
@@ -22,6 +22,8 @@
package org.jboss.soa.esb.message.listeners;
+import java.net.URISyntaxException;
+
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
@@ -39,7 +41,6 @@
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.actions.ActionUtils;
import org.jboss.soa.esb.addressing.helpers.JMSEpr;
-import org.jboss.soa.esb.addressing.EPR;
/**
* Esb Message aware JMS queue listener.
@@ -78,7 +79,7 @@
*/
public void run()
{
- // TODO Register 'this' servicing _epr
+ registerService();
while (_controller.continueLooping())
{
@@ -99,10 +100,20 @@
}
}
- // TODO UN Register 'this' servicing _epr
+ unRegisterService();
} // _______________________________
+
+ private void registerService()
+ {
+ _controller.register(_eprName,_epr);
+ } //________________________________
+ private void unRegisterService()
+ {
+ _controller.unRegister(_eprName);
+ } //________________________________
+
/**
* Check for mandatory and optional attributes in parameter tree
*
@@ -115,15 +126,8 @@
// Default value of obtainAttribute is null - Exception will be thrown
String sQueue = obtainAttribute(JMSEpr.DESTINATION_NAME_TAG, null);
-/*
- String sEpr = obtainAttribute(ListenerPropertyNames.EPR_NAME,null);
- try { _epr = EPRManager.getInstance("myDir").loadEPR(sEpr); }
- catch (IOException e)
- {
- _logger.error(e);
- throw new ConfigurationException("Problems loading EPR",e);
- }
-*/
+ _eprName = obtainAttribute(ListenerPropertyNames.EPR_NAME,null);
+
// No problem if selector is null - everything in queue will be returned
_sSelector = _config.getAttribute(ListenerPropertyNames.MESSAGE_SELECTOR);
@@ -136,9 +140,19 @@
Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
String sFactClass = obtainAttribute(JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
- Exception thrown = null;
+ _epr = new JMSEpr(JMSEpr.QUEUE_TYPE,sQueue,sFactClass);
try
{
+ _epr.getAddr().addExtension("jndiType",sJndiType);
+ _epr.getAddr().addExtension("jndiURL",sJndiURL);
+ if (null!=_sSelector)
+ _epr.getAddr().addExtension("msgSelector",_sSelector);
+ }
+ catch (URISyntaxException e) {}
+
+ Exception thrown = null;
+ try
+ {
Object tmp = oJndiCtx.lookup(sFactClass);
QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
@@ -224,7 +238,8 @@
protected EsbListenerController _controller;
protected ConfigTree _config;
- protected EPR _epr;
+ protected String _eprName;
+ protected JMSEpr _epr;
protected MessageConsumer _receiver;
protected boolean _bError = false;
protected QueueConnection _oQconn;
More information about the jboss-svn-commits
mailing list