Author: objectiser
Date: 2009-02-11 15:22:47 -0500 (Wed, 11 Feb 2009)
New Revision: 490
Modified:
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/ServiceValidatorManager.java
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/ValidatorFilter.java
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/pi4soa/ValidatorConfigGenerator.java
Log:
Added support for dynamically defined reply-to destinations - rather than having to put
the ValidationAction in the jboss-esb.xml configuration.
Modified:
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/ServiceValidatorManager.java
===================================================================
---
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/ServiceValidatorManager.java 2009-02-10
22:37:26 UTC (rev 489)
+++
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/ServiceValidatorManager.java 2009-02-11
20:22:47 UTC (rev 490)
@@ -89,10 +89,39 @@
* is unknown
*/
public java.util.List<ServiceValidator> getInputServiceValidators(Endpoint
endpoint) {
- return(m_inputValidators.get(endpoint));
+ java.util.List<ServiceValidator> ret=
+ m_inputValidators.get(endpoint);
+
+ if (ret == null) {
+ ret = m_replyToManager.getInputServiceValidators(endpoint);
+ }
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Return input validators '"+endpoint+"':
"+ret);
+ }
+
+ return(ret);
}
/**
+ * This method determines whether the supplied endpoint will be
+ * associated with a dynamic reply-to endpoint.
+ *
+ * @param endpoint The endpoint
+ * @return Whether the endpoint has a dynamic reply-to
+ */
+ public boolean isInputDynamicReplyTo(Endpoint endpoint) {
+ boolean ret=m_inputDynaReplyTos.contains(endpoint);
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Is input endpoint '"+endpoint+
+ "' a dynamic reply-to: "+ret);
+ }
+
+ return(ret);
+ }
+
+ /**
* This method returns the list of service validators associated
* with the supplied output endpoint.
*
@@ -101,10 +130,72 @@
* is unknown
*/
public java.util.List<ServiceValidator> getOutputServiceValidators(Endpoint
endpoint) {
- return(m_outputValidators.get(endpoint));
+ java.util.List<ServiceValidator> ret=
+ m_outputValidators.get(endpoint);
+
+ if (ret == null) {
+ ret = m_replyToManager.getOutputServiceValidators(endpoint);
+ }
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Return output validators '"+endpoint+"':
"+ret);
+ }
+
+ return(ret);
}
/**
+ * This method determines whether the supplied endpoint will be
+ * associated with a dynamic reply-to endpoint.
+ *
+ * @param endpoint The endpoint
+ * @return Whether the endpoint has a dynamic reply-to
+ */
+ public boolean isOutputDynamicReplyTo(Endpoint endpoint) {
+ boolean ret=m_outputDynaReplyTos.contains(endpoint);
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Is output endpoint '"+endpoint+
+ "' a dynamic reply-to: "+ret);
+ }
+
+ return(ret);
+ }
+
+ /**
+ * This method registers a list of service validators against
+ * a dynamic 'reply-to' endpoint.
+ *
+ * @param endpoint The endpoint
+ * @param validators The list of service validators
+ */
+ public void registerInputReplyToValidators(Endpoint endpoint,
+ java.util.List<ServiceValidator> validators) {
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Register input reply-to '"+endpoint+"':
"+validators);
+ }
+
+ m_replyToManager.registerInputDynamicReplyTo(endpoint, validators);
+ }
+
+ /**
+ * This method registers a list of service validators against
+ * a dynamic 'reply-to' endpoint.
+ *
+ * @param endpoint The endpoint
+ * @param validators The list of service validators
+ */
+ public void registerOutputReplyToValidators(Endpoint endpoint,
+ java.util.List<ServiceValidator> validators) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Register output reply-to '"+endpoint+"':
"+validators);
+ }
+
+ m_replyToManager.registerOutputDynamicReplyTo(endpoint, validators);
+ }
+
+ /**
* This method returns a ServiceValidator associated with the
* supplied validator name.
*
@@ -224,6 +315,12 @@
java.util.Set<Endpoint> existingOutputEndpoints=
new java.util.HashSet<Endpoint>(m_outputValidators.keySet());
+ java.util.Set<Endpoint> existingInputDynaReplyTos=
+ new java.util.HashSet<Endpoint>(m_inputDynaReplyTos);
+
+ java.util.Set<Endpoint> existingOutputDynaReplyTos=
+ new java.util.HashSet<Endpoint>(m_outputDynaReplyTos);
+
java.io.InputStream is=ValidatorFilter.class.getClassLoader().
getResourceAsStream(CONFIG_FILE);
@@ -232,9 +329,25 @@
}
try {
- updateConfiguration(ConfigTree.fromInputStream(is), existingValidatorNames,
- existingInputEndpoints, existingOutputEndpoints);
+ ConfigTree config=ConfigTree.fromInputStream(is);
+ if (config.getName().equals(VALIDATOR_NODE)) {
+ String active=config.getAttribute(ACTIVE_ATTR);
+
+ if (active != null && active.equalsIgnoreCase("true")) {
+
+ logger.info("Setting validators into active mode");
+ m_active = true;
+ } else {
+ logger.info("Setting validators into passive mode");
+ m_active = false;
+ }
+ }
+
+ updateConfiguration(config, existingValidatorNames,
+ existingInputEndpoints, existingOutputEndpoints,
+ existingInputDynaReplyTos, existingOutputDynaReplyTos);
+
// Work through choreography files in the models directory
java.io.File[] files=m_modelsDir.listFiles();
@@ -247,13 +360,15 @@
ValidatorConfigFactory.getValidatorConfig(files[i]);
if (vm != null) {
- ConfigTree config=vm.getConfiguration();
+ ConfigTree vConfig=vm.getConfiguration();
- if (config != null) {
- updateConfiguration(config,
+ if (vConfig != null) {
+ updateConfiguration(vConfig,
existingValidatorNames,
existingInputEndpoints,
- existingOutputEndpoints);
+ existingOutputEndpoints,
+ existingInputDynaReplyTos,
+ existingOutputDynaReplyTos);
} else {
logger.severe("Failed to obtain configuration for model '"+
files[i].getName()+"'");
@@ -275,6 +390,13 @@
m_inputValidators.remove(key);
}
+ iter = existingInputDynaReplyTos.iterator();
+
+ while (iter.hasNext()) {
+ Endpoint key=iter.next();
+ m_inputDynaReplyTos.remove(key);
+ }
+
// Any remaining output keys need to be removed
iter = existingOutputEndpoints.iterator();
@@ -283,6 +405,13 @@
m_outputValidators.remove(key);
}
+ iter = existingOutputDynaReplyTos.iterator();
+
+ while (iter.hasNext()) {
+ Endpoint key=iter.next();
+ m_outputDynaReplyTos.remove(key);
+ }
+
// Any remaining service description names need to
// have their associated monitors removed
java.util.Iterator<ValidatorName> sviter=
@@ -315,11 +444,15 @@
* @param existingValidatorNames The list of validator names
* @param existingInputEndpoints The list of input endpoints
* @param existingOutputEndpoints The list of output endpoints
+ * @param existingInputDynaReplyTos The set of input dynamic replyTo endpoints
+ * @param existingOutputDynaReplyTos The set of output dynamic replyTo endpoints
*/
protected void updateConfiguration(ConfigTree config,
java.util.Set<ValidatorName> existingValidatorNames,
java.util.Set<Endpoint> existingInputEndpoints,
- java.util.Set<Endpoint> existingOutputEndpoints) {
+ java.util.Set<Endpoint> existingOutputEndpoints,
+ java.util.Set<Endpoint> existingInputDynaReplyTos,
+ java.util.Set<Endpoint> existingOutputDynaReplyTos) {
logger.info("Update Service Validator Configuration");
if (config != null) {
@@ -443,6 +576,20 @@
}
existingInputEndpoints.remove(endpoint);
+
+ // Check if dynamic replyTo
+ if (inputs[j].getAttribute(DYNAMIC_REPLY_TO_ATTR,
+ "false").equalsIgnoreCase("true")) {
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("Input endpoint '"+
+ endpoint+"' has dynamic replyTo destination");
+ }
+
+ m_inputDynaReplyTos.add(endpoint);
+
+ existingInputDynaReplyTos.remove(endpoint);
+ }
}
}
@@ -474,6 +621,20 @@
}
existingOutputEndpoints.remove(endpoint);
+
+ // Check if dynamic replyTo
+ if (outputs[j].getAttribute(DYNAMIC_REPLY_TO_ATTR,
+ "false").equalsIgnoreCase("true")) {
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("Output endpoint '"+
+ endpoint+"' has dynamic replyTo destination");
+ }
+
+ m_outputDynaReplyTos.add(endpoint);
+
+ existingOutputDynaReplyTos.remove(endpoint);
+ }
}
}
}
@@ -499,6 +660,8 @@
private static final Logger logger =
Logger.getLogger("org.jboss.soa.overlord.validator.jbossesb");
private static final String EPR_ATTR = "epr";
+ private static final String DYNAMIC_REPLY_TO_ATTR = "dynamicReplyTo";
+
private static final String OUTPUT_NODE = "output";
private static final String INPUT_NODE = "input";
private static final String SERVICE_NODE = "service";
@@ -517,7 +680,10 @@
new java.util.HashMap<ValidatorName,ServiceValidator>();
private java.util.Map<Endpoint,java.util.List<ServiceValidator>>
m_inputValidators=new
java.util.Hashtable<Endpoint,java.util.List<ServiceValidator>>();
private java.util.Map<Endpoint,java.util.List<ServiceValidator>>
m_outputValidators=new
java.util.Hashtable<Endpoint,java.util.List<ServiceValidator>>();
+ private java.util.Set<Endpoint> m_inputDynaReplyTos=new
java.util.HashSet<Endpoint>();
+ private java.util.Set<Endpoint> m_outputDynaReplyTos=new
java.util.HashSet<Endpoint>();
private boolean m_active=false;
+ private DynamicReplyToEndpointManager m_replyToManager=new
DynamicReplyToEndpointManager();
/**
* This class is responsible for monitoring the models folder,
@@ -592,4 +758,171 @@
private long m_lastUpdate=0;
}
+
+ public class DynamicReplyToEndpointManager extends Thread {
+
+ /**
+ * The default constructor
+ */
+ public DynamicReplyToEndpointManager() {
+ setDaemon(true);
+
+ start();
+ }
+
+ /**
+ * The run method is responsible for ensuring the dynamic
+ * 'reply-to' endpoints are cleaned up periodically.
+ */
+ public void run() {
+
+ while(true) {
+
+ try {
+ synchronized(this) {
+ wait(10000);
+ }
+ } catch(Exception e) {
+ logger.severe("Failed to wait");
+ }
+
+ // Shift main entries to an emptied 'pending delete'
+ // map
+ synchronized(m_inputs) {
+ if (logger.isLoggable(Level.FINEST)) {
+ java.util.Iterator<Endpoint> iter=
+ m_inputsPendingDelete.keySet().iterator();
+ while (iter.hasNext()) {
+ Endpoint endpoint=iter.next();
+ java.util.List<ServiceValidator> validators=
+ m_inputsPendingDelete.get(endpoint);
+ logger.finest("Deleting input (reply-to) validators '"+
+ endpoint+"': "+validators);
+ }
+ }
+
+ m_inputsPendingDelete.clear();
+ m_inputsPendingDelete.putAll(m_inputs);
+ m_inputs.clear();
+ }
+
+ synchronized(m_outputs) {
+ if (logger.isLoggable(Level.FINEST)) {
+ java.util.Iterator<Endpoint> iter=
+ m_outputsPendingDelete.keySet().iterator();
+ while (iter.hasNext()) {
+ Endpoint endpoint=iter.next();
+ java.util.List<ServiceValidator> validators=
+ m_outputsPendingDelete.get(endpoint);
+ logger.finest("Deleting output (reply-to) validators '"+
+ endpoint+"': "+validators);
+ }
+ }
+
+ m_outputsPendingDelete.clear();
+ m_outputsPendingDelete.putAll(m_outputs);
+ m_outputs.clear();
+ }
+ }
+ }
+
+ /**
+ * This method returns the list of service validators
+ * associated with the dynamic reply-to endpoint.
+ *
+ * @param endpoint The input endpoint
+ * @return The list of service validators, or null if
+ * not found
+ */
+ public java.util.List<ServiceValidator> getInputServiceValidators(Endpoint
endpoint) {
+ java.util.List<ServiceValidator> ret=null;
+
+ synchronized(m_inputs) {
+ ret = m_inputs.get(endpoint);
+
+ if (ret == null) {
+ ret = m_inputsPendingDelete.get(endpoint);
+ }
+ }
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("Return input (reply-to) validators
'"+endpoint+"': "+ret);
+ }
+
+ return(ret);
+ }
+
+ /**
+ * This method returns the list of service validators
+ * associated with the dynamic reply-to endpoint.
+ *
+ * @param endpoint The output endpoint
+ * @return The list of service validators, or null if
+ * not found
+ */
+ public java.util.List<ServiceValidator> getOutputServiceValidators(Endpoint
endpoint) {
+ java.util.List<ServiceValidator> ret=null;
+
+ synchronized(m_outputs) {
+ ret = m_outputs.get(endpoint);
+
+ if (ret == null) {
+ ret = m_outputsPendingDelete.get(endpoint);
+ }
+ }
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("Return output (reply-to) validators
'"+endpoint+"': "+ret);
+ }
+
+ return(ret);
+ }
+
+ /**
+ * This method registers a list of service validators against
+ * a dynamic 'reply-to' endpoint.
+ *
+ * @param endpoint The endpoint
+ * @param validators The list of service validators
+ */
+ public void registerInputDynamicReplyTo(Endpoint endpoint,
+ java.util.List<ServiceValidator> validators) {
+
+ synchronized(m_inputs) {
+ m_inputs.put(endpoint, validators);
+
+ // May not be necessary, as entry would not be
+ // used - but could save memory?
+ m_inputsPendingDelete.remove(endpoint);
+ }
+ }
+
+ /**
+ * This method registers a list of service validators against
+ * a dynamic 'reply-to' endpoint.
+ *
+ * @param endpoint The endpoint
+ * @param validators The list of service validators
+ */
+ public void registerOutputDynamicReplyTo(Endpoint endpoint,
+ java.util.List<ServiceValidator> validators) {
+
+ synchronized(m_outputs) {
+ m_outputs.put(endpoint, validators);
+
+ // May not be necessary, as entry would not be
+ // used - but could save memory?
+ m_outputsPendingDelete.remove(endpoint);
+ }
+ }
+
+ private java.util.Map<Endpoint,java.util.List<ServiceValidator>> m_inputs=
+ new java.util.HashMap<Endpoint,java.util.List<ServiceValidator>>();
+ private java.util.Map<Endpoint,java.util.List<ServiceValidator>>
m_outputs=
+ new java.util.HashMap<Endpoint,java.util.List<ServiceValidator>>();
+ private java.util.Map<Endpoint,java.util.List<ServiceValidator>>
m_inputsPendingDelete=
+ new java.util.HashMap<Endpoint,java.util.List<ServiceValidator>>();
+ private java.util.Map<Endpoint,java.util.List<ServiceValidator>>
m_outputsPendingDelete=
+ new java.util.HashMap<Endpoint,java.util.List<ServiceValidator>>();
+ }
}
Modified:
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/ValidatorFilter.java
===================================================================
---
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/ValidatorFilter.java 2009-02-10
22:37:26 UTC (rev 489)
+++
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/ValidatorFilter.java 2009-02-11
20:22:47 UTC (rev 490)
@@ -56,20 +56,46 @@
java.util.List<ServiceValidator> validators=
ServiceValidatorManager.instance().getOutputServiceValidators(endpoint);
- for (int i=0; validators != null &&
+ if (validators != null && validators.size() > 0) {
+ boolean validated=false;
+ Exception ex=null;
+
+ for (int i=0; validators != null &&
i < validators.size(); i++) {
+ try {
+ validators.get(i).messageSent(msg);
+ validated = true;
+ } catch(Exception t) {
+ // Ignore
+ ex = t;
+ }
+ }
- try {
- validators.get(i).messageSent(msg);
-
- } catch(Throwable t) {
+ // Only raise exception if none of the service
+ // validators were able to validate the message
+ if (validated == false) {
logger.log(java.util.logging.Level.SEVERE,
- "Failed to handle sent message", t);
+ "Failed to handle sent message", ex);
if (ServiceValidatorManager.instance().isActive()) {
- throw new CourierException("Failed to handle sent message", t);
+ throw new CourierException("Failed to handle sent message", ex);
}
- }
+ } else {
+
+ // Check whether a dynamic reply is expected
+ if (ServiceValidatorManager.instance().isOutputDynamicReplyTo(endpoint)) {
+
+ // Register interest in the 'reply-to' endpoint
+ Endpoint replyTo=getReplyToEndpoint(msg);
+
+ if (replyTo != null) {
+ ServiceValidatorManager.instance().registerInputReplyToValidators(replyTo,
+ validators);
+ } else {
+ logger.severe("Unable to get 'reply-to' endpoint for message:
"+msg);
+ }
+ }
+ }
}
}
@@ -91,22 +117,48 @@
if (endpoint != null) {
java.util.List<ServiceValidator> validators=
- ServiceValidatorManager.instance().getInputServiceValidators(endpoint);
+ ServiceValidatorManager.instance().getInputServiceValidators(endpoint);
- for (int i=0; validators != null &&
+ if (validators != null && validators.size() > 0) {
+ boolean validated=false;
+ Exception ex=null;
+
+ for (int i=0; validators != null &&
i < validators.size(); i++) {
+ try {
+ validators.get(i).messageReceived(msg);
+ validated = true;
+ } catch(Exception t) {
+ // Ignore
+ ex = t;
+ }
+ }
- try {
- validators.get(i).messageReceived(msg);
-
- } catch(Throwable t) {
+ // Only raise exception if none of the service
+ // validators were able to validate the message
+ if (validated == false) {
logger.log(java.util.logging.Level.SEVERE,
- "Failed to handle received message", t);
+ "Failed to handle received message", ex);
if (ServiceValidatorManager.instance().isActive()) {
- throw new CourierException("Failed to handle received message",
t);
+ throw new CourierException("Failed to handle received message",
ex);
}
- }
+ } else {
+
+ // Check whether a dynamic reply is expected
+ if (ServiceValidatorManager.instance().isInputDynamicReplyTo(endpoint)) {
+
+ // Register interest in the 'reply-to' endpoint
+ Endpoint replyTo=getReplyToEndpoint(msg);
+
+ if (replyTo != null) {
+ ServiceValidatorManager.instance().registerOutputReplyToValidators(replyTo,
+ validators);
+ } else {
+ logger.severe("Unable to get 'reply-to' endpoint for message:
"+msg);
+ }
+ }
+ }
}
}
@@ -145,5 +197,37 @@
return(ret);
}
+ /**
+ * This method returns an endpoint associated with the 'to'
+ * destination of the supplied message.
+ *
+ * @param msg The message
+ * @return The endpoint, or null if not relevant
+ */
+ protected Endpoint getReplyToEndpoint(Message msg) {
+ Endpoint ret=null;
+
+ if (msg != null && msg.getHeader() != null &&
+ msg.getHeader().getCall() != null &&
+ msg.getHeader().getCall().getReplyTo() != null &&
+ msg.getHeader().getCall().getReplyTo().getAddr() != null) {
+
+ String key=msg.getHeader().getCall().getReplyTo().getAddr().getAddress();
+ int ind=-1;
+
+ if (key.startsWith(JMS_PROTOCOL_PREFIX) &&
+ ((ind=key.indexOf(QUEUE_PREFIX)) != -1 ||
+ (ind=key.indexOf(TOPIC_PREFIX)) != -1)) {
+ ret = new Endpoint(JMS_PROTOCOL_PREFIX+key.substring(ind));
+ }
+ }
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("Reply-To Destination for message '"+msg+"' is:
"+ret);
+ }
+
+ return(ret);
+ }
+
private static final Logger logger =
Logger.getLogger("org.jboss.soa.overlord.validator.jbossesb");
}
Modified:
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/pi4soa/ValidatorConfigGenerator.java
===================================================================
---
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/pi4soa/ValidatorConfigGenerator.java 2009-02-10
22:37:26 UTC (rev 489)
+++
cdl/trunk/validator/jbossesb/src/main/java/org/jboss/soa/overlord/validator/jbossesb/pi4soa/ValidatorConfigGenerator.java 2009-02-11
20:22:47 UTC (rev 490)
@@ -123,6 +123,7 @@
public class InputOutputAnalyser extends DefaultCDLVisitor {
+ private static final String DYNAMIC_REPLY_TO_ATTR = "dynamicReplyTo";
private static final String JBOSSESB_ANNOTATION = "jbossesb";
private static final String JBOSSESB_ELEMENT = "jbossesb";
private static final String DESTINATION_ELEMENT = "destination";
@@ -231,8 +232,7 @@
org.w3c.dom.Element dest, boolean from) {
String elemName=null;
- if (dest != null &&
- dest.getAttribute("temporary").equalsIgnoreCase("true") ==
false) {
+ if (dest != null) {
if (from) {
if (details.getAction() == ExchangeActionType.REQUEST) {
elemName = OUTPUT_ELEMENT;
@@ -247,6 +247,8 @@
}
}
+ org.w3c.dom.Element elem=m_service.getOwnerDocument().createElement(elemName);
+
// Parameter has been stored in a structured manner
// to support use of templates and presentations,
// so need to extract the value
@@ -254,9 +256,21 @@
m_templateProcessor.getTemplateParameters(dest.getAttribute(NAME_ATTR));
if (params != null && params.size() > 0) {
- org.w3c.dom.Element elem=m_service.getOwnerDocument().createElement(elemName);
elem.setAttribute(EPR_ATTR, params.get(0).getValue());
-
+ }
+
+ if (dest.hasAttribute(DYNAMIC_REPLY_TO_ATTR)) {
+ params=m_templateProcessor.getTemplateParameters(dest.getAttribute(DYNAMIC_REPLY_TO_ATTR));
+
+ if (params != null && params.size() > 0) {
+
+ if (params.get(0).getValue().equalsIgnoreCase("true")) {
+ elem.setAttribute(DYNAMIC_REPLY_TO_ATTR, "true");
+ }
+ }
+ }
+
+ if (elem.hasAttribute(EPR_ATTR)) {
m_service.appendChild(elem);
}
}