[savara-commits] savara SVN: r679 - in branches/experimental/2.0.x: tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Feb 16 16:16:53 EST 2011


Author: objectiser
Date: 2011-02-16 16:16:53 -0500 (Wed, 16 Feb 2011)
New Revision: 679

Added:
   branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/
   branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelatingServiceTracker.java
   branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationNotifier.java
   branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionImpl.java
   branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionManager.java
   branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/ServiceCorrelatorImpl.java
Modified:
   branches/experimental/2.0.x/integration/jboss/common/src/main/configs/pi4soa.xml
   branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/ServiceTrackerClient.java
   branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/TxnMonitor.java
Log:
Needed to take a copy of the pi4soa service correlator to enable it to support events from service descriptions based, and global/located based, endpoints - to support old and new style activity events.

Modified: branches/experimental/2.0.x/integration/jboss/common/src/main/configs/pi4soa.xml
===================================================================
--- branches/experimental/2.0.x/integration/jboss/common/src/main/configs/pi4soa.xml	2011-02-16 17:58:08 UTC (rev 678)
+++ branches/experimental/2.0.x/integration/jboss/common/src/main/configs/pi4soa.xml	2011-02-16 21:16:53 UTC (rev 679)
@@ -21,6 +21,7 @@
 -->
 
 <pi4soa>
+<!-- 
 	<monitor>
 		<serviceTracker class="org.savara.validator.pi4soa.JMSServiceTracker" >
 			<jmsConnectionFactory>ConnectionFactory</jmsConnectionFactory>
@@ -28,4 +29,5 @@
 			<recordMessagePayload>true</recordMessagePayload>
 		</serviceTracker>
 	</monitor>
+-->
 </pi4soa>
\ No newline at end of file

Modified: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/ServiceTrackerClient.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/ServiceTrackerClient.java	2011-02-16 17:58:08 UTC (rev 678)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/ServiceTrackerClient.java	2011-02-16 21:16:53 UTC (rev 679)
@@ -65,7 +65,8 @@
 
 			for (Analysis a : m_activity.getAnalysis()) {
 				if (a instanceof ProtocolAnalysis) {
-					ret = ((ProtocolAnalysis)a).getProtocol();
+					ret = ((ProtocolAnalysis)a).getProtocol()+"@"+
+								((ProtocolAnalysis)a).getRole();
 					break;
 				}
 			}
@@ -160,7 +161,7 @@
 			ret.setMessageIdentities(getPrimaryIdentities());
 			ret.setOperationName(m_activity.getOperationName());
 			ret.setFaultName(m_activity.getFaultName());
-			ret.setServiceType(m_activity.getDestinationType());
+			//ret.setServiceType(m_activity.getDestinationType());
 			
 			if (m_activity.getParameter().size() > 0) {				
 				ret.setType(m_activity.getParameter().get(0).getType());

Modified: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/TxnMonitor.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/TxnMonitor.java	2011-02-16 17:58:08 UTC (rev 678)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/TxnMonitor.java	2011-02-16 21:16:53 UTC (rev 679)
@@ -30,6 +30,7 @@
 import org.pi4soa.service.correlator.ServiceCorrelatorFactory;
 import org.pi4soa.service.correlator.ServiceCorrelatorListener;
 import org.pi4soa.service.tracker.jms.JMSServiceTrackerClient;
+import org.savara.tools.monitor.correlator.ServiceCorrelatorImpl;
 
 /**
  * The TxnMonitor class is a generic transaction monitor class that
@@ -48,7 +49,7 @@
     public void initialize() throws ServiceException
     {
         
-        m_correlator=ServiceCorrelatorFactory.getServiceCorrelator();
+        m_correlator=new ServiceCorrelatorImpl();	//ServiceCorrelatorFactory.getServiceCorrelator();
         m_correlator.addServiceCorrelatorListener(new CorrelatorListener());
         
         // Obtain service tracker client

Added: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelatingServiceTracker.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelatingServiceTracker.java	                        (rev 0)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelatingServiceTracker.java	2011-02-16 21:16:53 UTC (rev 679)
@@ -0,0 +1,491 @@
+/*
+ * Copyright 2005 Pi4 Technologies Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ * Change History:
+ * Aug 25, 2005 : Initial version created by gary
+ */
+package org.savara.tools.monitor.correlator;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.pi4soa.cdl.CDLType;
+import org.pi4soa.cdl.ExchangeDetails;
+import org.pi4soa.cdl.util.CDLTypeUtil;
+import org.pi4soa.service.Channel;
+import org.pi4soa.service.Message;
+import org.pi4soa.service.behavior.MessageDefinition;
+import org.pi4soa.service.behavior.Receive;
+import org.pi4soa.service.behavior.Send;
+import org.pi4soa.service.behavior.ServiceDescription;
+import org.pi4soa.service.session.Session;
+import org.pi4soa.service.session.internal.InternalSession;
+import org.pi4soa.service.tracker.ServiceTracker;
+
+/**
+ * This class provides an implementation of the service tracker
+ * interface, used to detect tracker events from distributed
+ * service endpoints and correlate them back to the choreography
+ * description being monitored.
+ *
+ */
+public class CorrelatingServiceTracker implements ServiceTracker {
+
+	private static final String UNEXPECTED_MESSAGE_EXCEPTION = "UnexpectedMessageException";
+
+	private static final String UNHANDLED_EXCEPTION = "Unhandled exception: ";
+
+	private static final String CORRELATER_REPORTED = "[Correlater reported] ";
+
+	/**
+	 * This is the constructor for the correlating service
+	 * tracker.
+	 *
+	 * @param mgr The correlation session manager
+	 * @param notifier The correlation notifier
+	 */
+	public CorrelatingServiceTracker(CorrelationSessionManager mgr,
+			CorrelationNotifier notifier) {
+		m_sessionManager = mgr;
+		m_notifier = notifier;
+	}
+	
+	/**
+	 * This method initializes the service tracker.
+	 *
+	 */
+	public void initialize() {
+		
+	}
+	
+	/**
+	 * This method indicates that a new service instance
+	 * has started.
+	 * 
+	 * @param service The service
+	 * @param session The session
+	 */
+	public void serviceStarted(ServiceDescription service,
+				Session session) {
+		
+		// The CorrelationSessionManager overrides the 'addSession'
+		// method on the superclass DefaultSessionManager, to
+		// intercept the addition of new service sessions
+		// which are used to create or associated with existing
+		// correlation sessions. This overridden method is used
+		// instead of this tracker method, because by the time
+		// the overridden method is called, the session has
+		// identity information, whereas this method is invoked
+		// just after the newly created service session is
+		// instantiated.
+	}
+
+	/**
+	 * This method indicates that a service instance
+	 * has finished.
+	 * 
+	 * @param service The service
+	 * @param session The session
+	 */
+	public void serviceFinished(ServiceDescription service,
+				Session session) {
+		
+		// The overridden 'removeSession' method in the
+		// CorrelationSessionManager is used, instead of this
+		// tracker callback, to remove a session from the
+		// associated correlation session - to remain
+		// symmetrical with the way that the session is associated
+		// (see note above).
+	}
+
+	/**
+	 * This method indicates that a new sub session
+	 * has started within an existing service instance.
+	 * 
+	 * @param parent The parent session
+	 * @param session The session
+	 */
+	public void subSessionStarted(Session parent, Session session) {
+	}
+	
+	/**
+	 * This method indicates that an existing
+	 * sub session has finished.
+	 * 
+	 * @param parent The parent session
+	 * @param session The session
+	 */
+	public void subSessionFinished(Session parent, Session session) {
+	}
+	
+	/**
+	 * This method registers the fact that a message has been
+	 * sent.
+	 * 
+	 * @param activity The behavioral activity, or null if a stateless service
+	 * @param session The session, or null if a stateless service
+	 * @param channel The channel, or null if a stateless service
+	 * @param mesg The message that has been handled
+	 */
+	public void sentMessage(Send activity, Session session,
+					Channel channel, Message mesg) {
+		
+		// Get the correlation session for the service session
+		CorrelationSessionImpl corrSession=
+					getCorrelationSession(session, mesg);
+		
+		if (corrSession != null) {
+
+			// Derive the choreography description node associated
+			// with the behavioral send activity
+			CDLType cdlType=CDLTypeUtil.getCDLType(
+					corrSession.getChoreographyDescription(),
+						activity.getGlobalDescriptionURI());
+			
+			if (cdlType == null) {
+				logger.severe("Failed to locate CDL type associated " +
+						"with behavior activity '"+activity+
+						"' in choreography '"+
+						corrSession.getChoreographyDescription()+"'");
+				
+			} else if (cdlType instanceof ExchangeDetails) {
+				
+				corrSession.exchangeInitiated((ExchangeDetails)cdlType,
+								channel);
+				
+				// Notify any registered listeners
+				m_notifier.exchangeInitiated((ExchangeDetails)cdlType,
+							channel, mesg, corrSession,
+							session.getId().getServiceDescriptionName());
+			} else {
+				logger.severe("CDL type '"+cdlType+
+						"', associated with behavior activity '"+
+						activity+"' is not an ExchangeDetails type");
+			}
+		}
+	}
+	
+	/**
+	 * This method registers the fact that a message has been
+	 * sent from a stateless service.
+	 * 
+	 * @param defn The message definition
+	 * @param mesg The message that has been handled
+	 */
+	public void sentMessage(MessageDefinition defn, Message mesg) {
+		
+		// Get the correlation session for the service session
+		CorrelationSessionImpl corrSession=
+					getCorrelationSession(null, mesg);
+		
+		if (corrSession != null) {
+
+			// TODO: Not sure if we need to report a stateless
+			// message event to the correlation session??
+			//corrSession.exchangeInitiated((ExchangeDetails)cdlType,
+			//				channel);
+			
+			// Notify any registered listeners
+			m_notifier.exchangeInitiated(null, null, mesg, corrSession,
+						defn.getServiceDescription().getName());
+		}
+	}
+	
+	/**
+	 * This method registers the fact that a message has been
+	 * received.
+	 * 
+	 * @param activity The behavioral activity, or null if a stateless service
+	 * @param session The session, or null if a stateless service
+	 * @param channel The channel, or null if a stateless service
+	 * @param mesg The message that has been handled
+	 */
+	public void receivedMessage(Receive activity, Session session,
+					Channel channel, Message mesg) {
+		
+		// Get the correlation session for the service session
+		CorrelationSessionImpl corrSession=
+				getCorrelationSession(session, mesg);
+
+		if (corrSession != null) {
+
+			// Derive the choreography description node associated
+			// with the behavioral receive activity
+			CDLType cdlType=CDLTypeUtil.getCDLType(
+					corrSession.getChoreographyDescription(),
+						activity.getGlobalDescriptionURI());
+			
+			if (cdlType == null) {
+				logger.severe("Failed to locate CDL type associated " +
+						"with behavior activity '"+activity+
+						"' in choreography '"+
+						corrSession.getChoreographyDescription()+"'");
+				
+			} else if (cdlType instanceof ExchangeDetails) {
+				
+				corrSession.exchangeCompleted((ExchangeDetails)cdlType,
+								channel);
+				
+				// Notify any registered listeners
+				m_notifier.exchangeCompleted((ExchangeDetails)cdlType,
+						channel, mesg, corrSession,
+						session.getId().getServiceDescriptionName());
+				
+			} else {
+				logger.severe("CDL type '"+cdlType+
+						"', associated with behavior activity '"+
+						activity+"' is not an ExchangeDetails type");
+			}
+		}
+	}
+	
+	/**
+	 * This method registers the fact that a message has been
+	 * received from a stateless service.
+	 * 
+	 * @param defn The message definition
+	 * @param mesg The message that has been handled
+	 */
+	public void receivedMessage(MessageDefinition defn, Message mesg) {
+		
+		// Get the correlation session for the service session
+		CorrelationSessionImpl corrSession=
+					getCorrelationSession(null, mesg);
+		
+		if (corrSession != null) {
+
+			// TODO: Not sure if we need to report a stateless
+			// message event to the correlation session??
+			//corrSession.exchangeCompleted((ExchangeDetails)cdlType,
+			//				channel);
+			
+			// Notify any registered listeners
+			m_notifier.exchangeCompleted(null, null, mesg, corrSession,
+						defn.getServiceDescription().getName());
+		}
+	}
+
+	/**
+	 * This method registers that a message was not expected.
+	 * 
+	 * @param session The session, or null if a stateless service
+	 * @param mesg The message that was not expected
+	 * @param reason The optional reason why the message was
+	 * 					considered to be unexpected
+	 * @deprecated Use unexpectedMessage(ServiceDescription sdesc,
+	 *		Session session, Message mesg, String reason)
+	 */
+	public void unexpectedMessage(Session session, Message mesg,
+							String reason) {
+	}
+	
+	/**
+	 * This method registers that a message was not expected.
+	 * 
+	 * @param sdesc The service description, if known
+	 * @param session The session, or null if a stateless service,
+	 * 				or cannot be associated with a session
+	 * @param mesg The message that was not expected
+	 * @param reason The optional reason why the message was
+	 * 					considered to be unexpected
+	 */
+	public void unexpectedMessage(ServiceDescription sdesc,
+			Session session, Message mesg, String reason) {
+		// Get the correlation session for the service session
+		CorrelationSessionImpl corrSession=
+					getCorrelationSession(session, null);
+		
+		if (corrSession != null) {
+			m_notifier.error(CORRELATER_REPORTED+mesg,
+					UNEXPECTED_MESSAGE_EXCEPTION, corrSession,
+					session.getId().getServiceDescriptionName());
+		}		
+	}
+	
+	/**
+	 * This method registers that an exception was not handled.
+	 * 
+	 * @param session The session, or null if a stateless service
+	 * @param excType The exception type
+	 */
+	public void unhandledException(Session session, String excType) {
+		// Get the correlation session for the service session
+		CorrelationSessionImpl corrSession=
+					getCorrelationSession(session, null);
+		
+		if (corrSession != null) {
+			m_notifier.error(CORRELATER_REPORTED+
+					UNHANDLED_EXCEPTION+excType,
+					excType, corrSession,
+					session.getId().getServiceDescriptionName());
+		}
+	}
+	
+	/**
+	 * This method reports information regarding the processing
+	 * of a service session. The details can either be specified
+	 * as a textual string (unstructured data), 
+	 * or as a structured XML fragment.<p>
+	 * 
+	 * @param session The session, or null if a stateless service
+	 * @param details The details
+	 * @param type The optional type
+	 */
+	public void information(Session session, String details) {
+		// Get the correlation session for the service session
+		CorrelationSessionImpl corrSession=
+					getCorrelationSession(session, null);
+		
+		if (corrSession != null) {
+			m_notifier.information(CORRELATER_REPORTED+details,
+					corrSession,
+					session.getId().getServiceDescriptionName());
+		}
+	}
+	
+	/**
+	 * This method reports information regarding the processing
+	 * of a service session. The details can either be specified
+	 * as a textual string (unstructured data), 
+	 * or as a structured XML fragment.<p>
+	 * 
+	 * @param session The session, or null if a stateless service
+	 * @param details The details
+	 * @param exc The optional exception
+	 */
+	public void warning(Session session, String details, Throwable exc) {
+
+		// Get the correlation session for the service session
+		CorrelationSessionImpl corrSession=
+					getCorrelationSession(session, null);
+		
+		if (corrSession != null) {
+			m_notifier.warning(CORRELATER_REPORTED+details,
+					exc.toString(), corrSession,
+					session.getId().getServiceDescriptionName());
+		}
+	}
+	
+	/**
+	 * This method reports information regarding the processing
+	 * of a service session. The details can either be specified
+	 * as a textual string (unstructured data), 
+	 * or as a structured XML fragment.<p>
+	 * 
+	 * @param session The session, or null if a stateless service
+	 * @param details The details
+	 * @param exc The optional exception
+	 */
+	public void error(Session session, String details, Throwable exc) {
+		// Get the correlation session for the service session
+		CorrelationSessionImpl corrSession=
+					getCorrelationSession(session, null);
+		
+		if (corrSession != null) {
+			m_notifier.error(CORRELATER_REPORTED+details,
+					exc.toString(), corrSession,
+					session.getId().getServiceDescriptionName());
+		}
+	}
+	
+	/**
+	 * This method closes the service tracker.
+	 *
+	 */
+	public void close() {
+	}
+
+	/**
+	 * This method returns the top level session associated with
+	 * the supplied session.
+	 * 
+	 * @param subsession The subsession
+	 * @return The top level session
+	 */
+	protected Session getTopLevelSession(Session subsession) {
+		Session ret=subsession;
+		
+		while (ret instanceof InternalSession &&
+				((InternalSession)ret).getParent() != null) {
+			ret = ((InternalSession)ret).getParent();
+		}
+		
+		return(ret);
+	}
+	
+	/**
+	 * This method determines which correlation session should be
+	 * used based on the supplied session and/or message.
+	 * 
+	 * @param session The session, or null if stateless session
+	 * @param mesg The message
+	 * @return The correlation session, or null if not found
+	 */
+	protected CorrelationSessionImpl getCorrelationSession(Session session,
+							Message mesg) {
+		CorrelationSessionImpl ret=null;
+		
+		if (session != null) {
+			
+			Session topLevelSession=getTopLevelSession(session);
+			
+			if (topLevelSession != null) {
+				ret = m_sessionManager.getCorrelationSessionImpl(topLevelSession, mesg);
+			} else {
+				logger.warning("Failed to find top level session for '"+
+						session+"'");
+			}
+			
+			if (ret == null) {
+				logger.severe("Failed to locate correlation session " +
+						"for service session '"+topLevelSession+"'");
+			}
+		} else if (mesg != null) {
+			
+			// Stateless service
+			ret = m_sessionManager.getCorrelationSessionImpl(
+						mesg.getMessageIdentities());
+			
+			if (ret == null) {
+				StringBuffer buf=new StringBuffer();
+				
+				for (int i=0; mesg.getMessageIdentities() != null &&
+						i < mesg.getMessageIdentities().size(); i++) {
+					
+					if (i > 0) {
+						buf.append(',');
+					}
+					buf.append(mesg.getMessageIdentities().get(i).getId());
+				}
+				
+				logger.severe("Failed to locate correlation session " +
+						"for stateless service with identities '"+
+						buf.toString()+"'");
+				
+			} else if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Found correlation session '"+ret+
+							"' for stateless service");
+			}
+		}
+		
+		return(ret);
+	}
+	
+    private static Logger logger = Logger.getLogger("org.pi4soa.service.correlator.impl");
+	
+	private CorrelationSessionManager m_sessionManager=null;
+	private CorrelationNotifier m_notifier=null;
+}

Added: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationNotifier.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationNotifier.java	                        (rev 0)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationNotifier.java	2011-02-16 21:16:53 UTC (rev 679)
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2005 Pi4 Technologies Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ * Change History:
+ * Sep 9, 2005 : Initial version created by gary
+ */
+package org.savara.tools.monitor.correlator;
+
+import java.util.Vector;
+
+import org.pi4soa.cdl.ExchangeDetails;
+import org.pi4soa.service.Channel;
+import org.pi4soa.service.Message;
+import org.pi4soa.service.correlator.CorrelationSession;
+import org.pi4soa.service.correlator.ServiceCorrelatorListener;
+
+public class CorrelationNotifier implements ServiceCorrelatorListener {
+
+	/**
+	 * This method indicates that the supplied choreography
+	 * description has been registered with the service
+	 * correlator.
+	 * 
+	 * @param cdlpack The choreography description
+	 */
+	public void choreographyDescriptionRegistered(org.pi4soa.cdl.Package cdlpack) {
+		
+		synchronized(m_listeners) {
+			for (int i=0; i < m_listeners.size(); i++) {
+				ServiceCorrelatorListener l=
+					(ServiceCorrelatorListener)m_listeners.get(i);
+				
+				l.choreographyDescriptionRegistered(cdlpack);
+			}
+		}
+	}
+	
+	/**
+	 * This method indicates that the supplied choreography
+	 * description has been unregistered from the service
+	 * correlator.
+	 * 
+	 * @param cdlpack The choreography description
+	 */
+	public void choreographyDescriptionUnregistered(org.pi4soa.cdl.Package cdlpack) {
+		
+		synchronized(m_listeners) {
+			for (int i=0; i < m_listeners.size(); i++) {
+				ServiceCorrelatorListener l=
+					(ServiceCorrelatorListener)m_listeners.get(i);
+				
+				l.choreographyDescriptionRegistered(cdlpack);
+			}
+		}
+	}
+	
+	/**
+	 * This method indicates that a new correlated session has
+	 * been started.
+	 * 
+	 * @param session The session
+	 */
+	public void sessionStarted(CorrelationSession session) {
+		
+		synchronized(m_listeners) {
+			for (int i=0; i < m_listeners.size(); i++) {
+				ServiceCorrelatorListener l=
+					(ServiceCorrelatorListener)m_listeners.get(i);
+				
+				l.sessionStarted(session);
+			}
+		}
+	}
+	
+	/**
+	 * This method indicates that a correlated session has
+	 * been finished.
+	 * 
+	 * @param session The session
+	 */
+	public void sessionFinished(CorrelationSession session) {
+		
+		synchronized(m_listeners) {
+			for (int i=0; i < m_listeners.size(); i++) {
+				ServiceCorrelatorListener l=
+					(ServiceCorrelatorListener)m_listeners.get(i);
+				
+				l.sessionFinished(session);
+			}
+		}
+	}
+	
+	/**
+	 * This method is invoked to indicate that a choreography
+	 * exchange has been initiated by one participant.
+	 * 
+	 * @param exchange The exchange details
+	 * @param channel The channel associated with the exchange
+	 * @param message The message
+	 * @param session The session
+	 * @param serviceDescriptionName The name of the service
+	 * 				description that caused this exchange to
+	 * 				be initiated
+	 */
+	public void exchangeInitiated(ExchangeDetails exchange,
+			Channel channel, Message message,
+			CorrelationSession session, String serviceDescriptionName) {
+		
+		synchronized(m_listeners) {
+			for (int i=0; i < m_listeners.size(); i++) {
+				ServiceCorrelatorListener l=
+					(ServiceCorrelatorListener)m_listeners.get(i);
+				
+				l.exchangeInitiated(exchange,
+						channel, message, session,
+						serviceDescriptionName);
+			}
+		}
+	}
+
+	/**
+	 * This method is invoked to indicate that a choreography
+	 * exchange has been completed by the target participant.
+	 * 
+	 * @param exchange The exchange details
+	 * @param channel The channel associated with the exchange
+	 * @param message The message
+	 * @param session The session
+	 * @param serviceDescriptionName The name of the service
+	 * 				description that caused this exchange to
+	 * 				be completed
+	 */
+	public void exchangeCompleted(ExchangeDetails exchange,
+			Channel channel, Message message,
+			CorrelationSession session,	String serviceDescriptionName) {
+		
+		synchronized(m_listeners) {
+			for (int i=0; i < m_listeners.size(); i++) {
+				ServiceCorrelatorListener l=
+					(ServiceCorrelatorListener)m_listeners.get(i);
+				
+				l.exchangeCompleted(exchange,
+						channel, message, session,
+						serviceDescriptionName);
+			}
+		}
+	}
+	
+	/**
+	 * This method is invoked to indicate that a message
+	 * was unexpected at a participant.
+	 * 
+	 * @param message The message
+	 * @param session The session
+	 * @param serviceDescriptionName The name of the service
+	 * 				description that caused this unexpected
+	 * 				message error
+	 */
+	public void unexpectedMessage(Message message,
+			CorrelationSession session, String serviceDescriptionName) {
+		
+		synchronized(m_listeners) {
+			for (int i=0; i < m_listeners.size(); i++) {
+				ServiceCorrelatorListener l=
+					(ServiceCorrelatorListener)m_listeners.get(i);
+				
+				l.unexpectedMessage(message, session,
+						serviceDescriptionName);
+			}
+		}
+	}
+	
+    /**
+     * An error occurred related to the specified correlation
+     * session.
+     * 
+     * @param mesg The error message
+     * @param exception The optional exception details
+     * @param session The correlation session
+     * @param serviceDescriptionName The service name
+     */
+    public void error(String mesg, String exception,
+    		CorrelationSession session, String serviceDescriptionName) {
+		
+		synchronized(m_listeners) {
+			for (int i=0; i < m_listeners.size(); i++) {
+				ServiceCorrelatorListener l=
+					(ServiceCorrelatorListener)m_listeners.get(i);
+				
+				l.error(mesg, exception, session,
+						serviceDescriptionName);
+			}
+		}
+	}
+    
+    /**
+     * A warning occurred related to the specified correlation
+     * session.
+     * 
+     * @param mesg The warning message
+     * @param exception The optional exception details
+     * @param session The correlation session
+     * @param serviceDescriptionName The service name
+     */
+    public void warning(String mesg, String exception,
+    		CorrelationSession session, String serviceDescriptionName) {
+		
+		synchronized(m_listeners) {
+			for (int i=0; i < m_listeners.size(); i++) {
+				ServiceCorrelatorListener l=
+					(ServiceCorrelatorListener)m_listeners.get(i);
+				
+				l.warning(mesg, exception, session,
+						serviceDescriptionName);
+			}
+		}
+	}
+    
+    /**
+     * An information event occurred related to the specified correlation
+     * session.
+     * 
+     * @param mesg The information message
+     * @param session The correlation session
+     * @param serviceDescriptionName The service name
+     */
+    public void information(String mesg, CorrelationSession session,
+			String serviceDescriptionName) {
+		
+		synchronized(m_listeners) {
+			for (int i=0; i < m_listeners.size(); i++) {
+				ServiceCorrelatorListener l=
+					(ServiceCorrelatorListener)m_listeners.get(i);
+				
+				l.information(mesg, session,
+						serviceDescriptionName);
+			}
+		}
+	}
+    
+	/**
+	 * This method adds a listener for notifications regarding
+	 * correlated sessions.
+	 * 
+	 * @param l The listener
+	 */
+	public void addServiceCorrelatorListener(ServiceCorrelatorListener l) {
+		synchronized(m_listeners) {
+			m_listeners.add(l);
+		}
+	}
+	
+	/**
+	 * This method removes a listener for notifications regarding
+	 * correlated sessions.
+	 * 
+	 * @param l The listener
+	 */
+	public void removeServiceCorrelatorListener(ServiceCorrelatorListener l) {
+		synchronized(m_listeners) {
+			m_listeners.remove(l);
+		}
+	}
+	
+	private Vector m_listeners=new Vector();	
+}

Added: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionImpl.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionImpl.java	                        (rev 0)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionImpl.java	2011-02-16 21:16:53 UTC (rev 679)
@@ -0,0 +1,277 @@
+/*
+ * Copyright 2005 Pi4 Technologies Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ * Change History:
+ * Sep 8, 2005 : Initial version created by gary
+ */
+package org.savara.tools.monitor.correlator;
+
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.pi4soa.cdl.ExchangeDetails;
+import org.pi4soa.service.Channel;
+import org.pi4soa.service.Identity;
+import org.pi4soa.service.correlator.CorrelationSession;
+import org.pi4soa.service.session.Session;
+
+/**
+ * This class provides an implementation of the correlation
+ * session.
+ *
+ */
+public class CorrelationSessionImpl implements CorrelationSession {
+
+	/**
+	 * This is the constructor for the correlation session.
+	 * 
+	 * @param cdl The choreography description being correlated
+	 */
+	public CorrelationSessionImpl(org.pi4soa.cdl.Package cdl) {
+		m_choreographyDescription = cdl;
+	}
+	
+	/**
+	 * This method returns the choreography description associated
+	 * with the correlated session.
+	 * 
+	 * @return The 
+	 */
+	public org.pi4soa.cdl.Package getChoreographyDescription() {
+		return(m_choreographyDescription);
+	}
+	
+	/**
+	 * This method associates a service session with the correlation
+	 * session.
+	 * 
+	 * @param session The service session
+	 */
+	public void associateServiceSession(Session session) {
+		logger.fine("Associate service session '"+session+"'");
+		
+		m_serviceSessions.add(session);
+	}
+	
+	/**
+	 * This method disassociates a service session from
+	 * the correlation session.
+	 * 
+	 * @param session The service session
+	 */
+	public void disassociateServiceSession(Session session) {
+		logger.fine("Disassociate service session name '"+session+"'");
+		
+		m_serviceSessions.remove(session);
+		
+		// Apply identities to past values - this is used when
+		// all of the sessions are removed, but the session
+		// is not necessarily completed
+		java.util.Set<Identity> ids=session.getPrimaryIdentities();
+		
+		java.util.Iterator<Identity> iter=ids.iterator();
+		while (iter.hasNext()) {
+			Identity cur=iter.next();
+			
+			if (m_pastIdentities.contains(cur) == false) {
+				
+				logger.info("Correlation session '"+this+
+						"' - adding identity to past list: "+cur);
+				
+				m_pastIdentities.add(cur);
+			}
+		}
+	}
+		
+	/**
+	 * This method disassociates a service session from
+	 * the correlation session.
+	 * 
+	 * @param serviceName The service name
+	 */
+	public void disassociateServiceSession(String serviceName) {
+		logger.fine("Disassociate service session name '"+serviceName+"'");
+		
+		for (int i=m_serviceSessions.size()-1; i >= 0; i--) {
+			Session session=(Session)m_serviceSessions.get(i);
+			
+			if (session.getId().getServiceDescriptionName().
+						equals(serviceName)) {
+				
+				disassociateServiceSession(session);
+			}
+		}
+	}
+	
+	/**
+	 * This method returns the number of service sessions
+	 * associated with the correlation session.
+	 * 
+	 * @return The number of service sessions
+	 */
+	public int getNumberOfServiceSessions() {
+		return(m_serviceSessions.size());
+	}
+	
+	/**
+	 * This method is invoked to indicate that a choreography
+	 * exchange has been initiated by one participant.
+	 * 
+	 * @param exchange The exchange details
+	 * @param channel The channel associated with the exchange
+	 */
+	public void exchangeInitiated(ExchangeDetails exchange, Channel channel) {
+		
+		synchronized(m_initiated) {			
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Exchange initiated="+exchange+" on channel="+channel);
+			}
+
+			m_initiated.add(exchange);
+		}
+	}
+
+	/**
+	 * This method is invoked to indicate that a choreography
+	 * exchange has been completed by a target participant.
+	 * 
+	 * @param exchange The exchange details
+	 * @param channel The channel associated with the exchange
+	 */
+	public void exchangeCompleted(ExchangeDetails exchange, Channel channel) {
+		
+		synchronized(m_initiated) {
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Exchange completed="+exchange+" on channel="+channel);
+			}
+
+			if (m_initiated.contains(exchange)) {
+				m_completed.add(exchange);
+				
+				m_initiated.remove(exchange);
+			} else {
+				
+				if (logger.isLoggable(Level.FINE)) {
+					logger.fine("NOTE: Completed exchange="+
+							exchange+" has not been initiated");
+				}
+			}
+		}
+	}
+	
+	/**
+	 * This method returns the identities associated with this
+	 * correlation session.
+	 * 
+	 * @return The identities
+	 */
+	public java.util.List<Identity> getIdentities() {
+		java.util.List<Identity> ret=new java.util.Vector<Identity>();
+		
+		for (int i=0; i < m_pastIdentities.size(); i++) {
+			Identity pastId=(Identity)m_pastIdentities.get(i);
+		
+			if (ret.contains(pastId) == false) {
+				ret.add(pastId);
+			}
+		}
+		
+		for (int i=0; i < m_serviceSessions.size(); i++) {
+			Session session=(Session)m_serviceSessions.get(i);
+			
+			java.util.Set<Identity> ids=session.getPrimaryIdentities();
+			
+			java.util.Iterator<Identity> iter=ids.iterator();
+			
+			while (iter.hasNext()) {
+				Identity cur=iter.next();
+				
+				if (ret.contains(cur) == false) {
+					ret.add(cur);
+				}
+			}
+			
+			if (session.getSessionIdentity() != null &&
+					ret.contains(session.getSessionIdentity()) == false) {
+				ret.add(session.getSessionIdentity());
+			}
+		}
+
+		return(ret);
+	}
+	
+	/**
+	 * This method determines whether the session is identified by
+	 * the supplied ids.
+	 * 
+	 * @param ids The identities
+	 * @return Whether the session is associated with the supplied id
+	 */
+	public boolean isIdentifiedBy(java.util.Collection<Identity> ids) {
+		boolean ret=false;
+		
+		if (ids != null) {
+			java.util.Iterator<Identity> iter=ids.iterator();
+			
+			while (ret == false && iter.hasNext()) {
+				Identity cur=iter.next();
+				
+				if (logger.isLoggable(Level.FINEST)) {
+					logger.finest("Is identified by="+cur);
+				}
+				
+				for (int i=0; ret == false &&
+						i < m_serviceSessions.size(); i++) {
+					Session session=(Session)m_serviceSessions.get(i);
+					
+					ret = session.isIdentifiedBy(cur);
+					
+					if (logger.isLoggable(Level.FINEST)) {
+						logger.finest("Checked id="+cur+
+								" against session="+session+" = "+ret);
+					}
+				}
+				
+				for (int i=0; ret == false &&
+						i < m_pastIdentities.size(); i++) {
+					Identity pastId=(Identity)m_pastIdentities.get(i);
+			
+					ret = pastId.equals(cur);
+					
+					if (logger.isLoggable(Level.FINEST)) {
+						logger.finest("Checked against past id="+pastId+" = "+ret);
+					}
+				}
+			}
+		}
+		
+		if (logger.isLoggable(Level.FINEST)) {
+			logger.finest("Is correlation session ("+this+
+					") identified by "+ids+" = "+ret);
+		}
+		
+		return(ret);
+	}
+	
+    private static Logger logger = Logger.getLogger("org.pi4soa.service.correlator.impl");		
+	
+	private org.pi4soa.cdl.Package m_choreographyDescription=null;
+	private Vector m_serviceSessions=new Vector();
+	private Vector m_initiated=new Vector();
+	private Vector m_completed=new Vector();
+	private java.util.List<Identity> m_pastIdentities=new Vector<Identity>();
+}

Added: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionManager.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionManager.java	                        (rev 0)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionManager.java	2011-02-16 21:16:53 UTC (rev 679)
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2005 Pi4 Technologies Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ * Change History:
+ * Sep 8, 2005 : Initial version created by gary
+ */
+package org.savara.tools.monitor.correlator;
+
+import java.util.Hashtable;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.pi4soa.service.Identity;
+import org.pi4soa.service.session.DefaultSessionManager;
+import org.pi4soa.service.session.Session;
+import org.pi4soa.service.session.SessionManagerException;
+
+/**
+ * This class provides the management of service and correlation
+ * sessions.
+ *
+ */
+public class CorrelationSessionManager extends DefaultSessionManager {
+
+	/**
+	 * This is the constructor for the correlation session manager.
+	 * 
+	 * @param notifier The notifier
+	 */
+	public CorrelationSessionManager(CorrelationNotifier notifier) {
+		m_notifier = notifier;
+	}
+	
+	/**
+	 * This method adds a new session instance to the session manager,
+	 * and returns a reference that can be used to retrieve the
+	 * session at a later time.
+	 * 
+	 * @param session The session
+	 * @return The session reference for the added session
+	 * @exception SessionManagerException Failed to add session
+	 */
+	@Override
+	public void addSession(Session session)
+					throws SessionManagerException {
+		super.addSession(session);
+		
+		// Synchronize on correlation session list, as
+		// other methods use these lists (pendingId and
+		// correlSession) at the same time
+		synchronized(m_correlationSessions) {
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Adding session '"+session+
+						"' to pending identification list");
+			}
+			
+			// Add session to 'pending identification' list
+			m_pendingIdentification.add(session);
+		}
+	}
+	
+	/**
+	 * This method returns the correlation session implementation
+	 * associated with the supplied session. If more than one
+	 * correlation session is found to have the same identity
+	 * information, then a merge of the correlation sessions will be
+	 * performed.
+	 * 
+	 * @param session The service session
+	 * @param mesg The optional message
+	 * @return The correlation session
+	 */
+	protected CorrelationSessionImpl getCorrelationSessionImpl(Session session,
+							org.pi4soa.service.Message mesg) {
+		CorrelationSessionImpl corrSess=null;
+				
+		synchronized(m_correlationSessions) {
+			boolean pendingStarted=false;
+			
+			// TODO: Could check which choreography session this
+			// session can be added to - then generate 'participant'
+			// joined session messages?
+			
+			// Find an appropriate correlation session to associate
+			// with the session
+			java.util.Set<Identity> primIds=session.getPrimaryIdentities();
+			
+			corrSess = getCorrelationSessionImpl(primIds);
+			
+			if (corrSess == null && mesg != null) {
+				// Check if message identities can find the
+				// correlation session
+				corrSess = getCorrelationSessionImpl(mesg.getMessageIdentities());
+			}
+			
+			if (corrSess == null) {
+				
+				if (primIds == null || primIds.size() == 0) {
+					logger.severe("Correlation session is going to be " +
+							"created with no primary ids available");
+				} else {
+					String str="Primary ids: ";
+					
+					java.util.Iterator<Identity> iter=primIds.iterator();
+					while (iter.hasNext()) {
+						str += iter.next().getId();
+					}
+					
+					logger.info(str);
+				}
+				
+				// Find choreography for this session
+				org.pi4soa.cdl.Package cdlpack=
+						getChoreographyForService(session.getId().
+								getServiceDescriptionName());
+				
+				if (cdlpack == null) {
+					logger.severe("Unable to find choreography for service '"+
+							session.getId().getServiceDescriptionName());
+				} else {
+					corrSess = new CorrelationSessionImpl(cdlpack);
+					m_correlationSessions.add(corrSess);
+					
+					pendingStarted = true;
+					
+					logger.info("New correlation session="+corrSess);
+				}
+			} else {
+				
+				if (corrSess.getNumberOfServiceSessions() == 0) {
+					
+					if (m_pendingCloseSessions.remove(corrSess)) {
+						logger.fine("Removed correlation session " +
+								"from pending close list: "+corrSess);						
+					}
+				}
+			}
+			
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Check if session '"+session+
+						"' is pending identification: "+
+						m_pendingIdentification.contains(session));
+			}
+			
+			if (corrSess != null && m_pendingIdentification.contains(session)) {
+				corrSess.associateServiceSession(session);
+				
+				logger.info("Associating service session '"+
+						session+"' with correlation session '"+
+						corrSess+"'");
+				
+				m_pendingIdentification.remove(session);
+			
+				// TODO: IDEAS: choreography session is used to store the
+				// list of initiated and completed activities (exchanges)
+				// recorded by the send and receive events for the
+				// associated CDL exchange activity. Possibly the list
+				// of completed activities is retained to act as an
+				// audit trail? Not sure if service sessions are necessary
+				// to be recorded against correlation session, but may
+				// provide some relevant details about participants joining
+				// choreography and then leaving.
+			}
+			
+			if (pendingStarted) {
+				m_notifier.sessionStarted(corrSess);
+			}
+		}
+
+		return(corrSess);
+	}
+
+	/**
+	 * This method removes a session instance from the session manager.
+	 * 
+	 * @param session The session
+	 * @exception SessionManagerException Failed to remove session
+	 */
+	@Override
+	public void removeSession(Session session) throws SessionManagerException {
+		
+		synchronized(m_correlationSessions) {
+			// TODO: May want to make this configurable, because we
+			// could just rely on the tracker events from each participant
+			// to inform us when they have completed
+			
+			CorrelationSessionImpl corrSess=
+				getCorrelationSessionImpl(session.getPrimaryIdentities());
+
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Remove session '"+session+
+						"': correlation session="+corrSess);
+			}
+
+			if (corrSess != null) {
+				corrSess.disassociateServiceSession(session);
+				
+				if (logger.isLoggable(Level.FINE)) {
+					logger.fine("Removing session '"+session+
+							"' from pending identification list");
+				}
+
+				// Remove from pending identification list just in case
+				m_pendingIdentification.remove(session);
+				
+				if (corrSess.getNumberOfServiceSessions() == 0) {	
+					pendingClose(corrSess);
+				}
+			}
+		}
+		
+		super.removeSession(session);
+	}
+	
+	/**
+	 * This method removes the supplied correlation session.
+	 * 
+	 * @param cs The correlation session
+	 */
+	public void pendingClose(CorrelationSessionImpl cs) {
+
+		logger.info("Moving correlation session to pending close list: "+
+				cs);
+		
+		m_pendingCloseSessions.add(cs);
+		
+		//m_correlationSessions.remove(corrSess);
+		
+		//m_notifier.sessionFinished(corrSess);
+		//synchronized(m_correlationSessions) {
+			//m_correlationSessions.remove(cs);
+		//}
+	}
+	
+	/**
+	 * This method returns the correlation session implementation
+	 * associated with the supplied session. If more than one
+	 * correlation session is found to have the same identity
+	 * information, then a merge of the correlation sessions will be
+	 * performed.
+	 * 
+	 * @param session The service session
+	 * @return The correlation session
+	 */
+	protected CorrelationSessionImpl getCorrelationSessionImpl(java.util.Collection<Identity> ids) {
+		CorrelationSessionImpl ret=null;
+		
+		synchronized(m_correlationSessions) {
+			// TODO: Get correlation session for primary identities
+			// associated with the supplied session. If none, then
+			// create one - not sure how we get the choreography
+			// description??? - if more than one, then we need to
+			// merge the correlation sessions
+			
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Number of sessions="+m_correlationSessions.size());
+			}
+			
+			Vector sessions=new Vector();
+			for (int i=0; i < m_correlationSessions.size(); i++) {
+				CorrelationSessionImpl cs=(CorrelationSessionImpl)
+						m_correlationSessions.get(i);
+				
+				if (logger.isLoggable(Level.FINE)) {
+					logger.fine("Checking correlation session="+cs);
+				}
+				
+				if (cs.isIdentifiedBy(ids)) {
+					sessions.add(cs);
+				}
+			}
+			
+			if (sessions.size() > 1) {
+				
+				logger.severe("MULTIPLE CORRELATION SESSIONS DETECTED - currently not handled");
+	
+				// Merge sessions
+				
+				// TODO: MERGING
+				
+			} else if (sessions.size() == 1) {
+				ret = (CorrelationSessionImpl)sessions.get(0);
+			}
+		}
+
+		if (logger.isLoggable(Level.FINE)) {
+			logger.fine("Returning correlation session="+ret);
+		}
+		
+		return(ret);
+	}
+	
+	/**
+	 * This method returns the list of correlation sessions.
+	 * 
+	 * @return The correlation sessions
+	 */
+	public java.util.List getCorrelationSessions() {
+		return(m_correlationSessions);
+	}
+	
+	/**
+	 * This method associates the supplied choreography with the
+	 * service name.
+	 * 
+	 * @param serviceName The service name
+	 * @param cdlpack The choreography
+	 */
+	public void registerServiceChoreography(String serviceName,
+			org.pi4soa.cdl.Package cdlpack) {
+		m_serviceChoreographies.put(serviceName, cdlpack);
+	}
+
+	/**
+	 * This method disassociates the supplied choreography from the
+	 * service name.
+	 * 
+	 * @param serviceName The service name
+	 * @param cdlpack The choreography
+	 */
+	public void unregisterServiceChoreography(String serviceName,
+			org.pi4soa.cdl.Package cdlpack) {
+		m_serviceChoreographies.remove(serviceName);
+	}
+
+	/**
+	 * This method returns the choreography description associated
+	 * with the supplied service name. If no choreography description
+	 * can be located, then a null is returned.
+	 * 
+	 * @param serviceName The service name
+	 * @return The choreography, or null if not found
+	 */
+	public org.pi4soa.cdl.Package getChoreographyForService(String serviceName) {
+		return((org.pi4soa.cdl.Package)m_serviceChoreographies.get(serviceName));
+	}
+	
+	private Vector m_correlationSessions=new Vector();
+	private Hashtable m_serviceChoreographies=new Hashtable();
+	private Vector m_pendingIdentification=new Vector();
+	private Vector m_pendingCloseSessions=new Vector();
+	private CorrelationNotifier m_notifier=null;
+	
+    private static Logger logger = Logger.getLogger("org.pi4soa.service.correlator.impl");
+}

Added: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/ServiceCorrelatorImpl.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/ServiceCorrelatorImpl.java	                        (rev 0)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/ServiceCorrelatorImpl.java	2011-02-16 21:16:53 UTC (rev 679)
@@ -0,0 +1,544 @@
+/*
+ * Copyright 2005 Pi4 Technologies Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ * Change History:
+ * Aug 25, 2005 : Initial version created by gary
+ */
+package org.savara.tools.monitor.correlator;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Enumeration;
+import java.util.Hashtable;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.namespace.QName;
+
+import org.pi4soa.cdl.ParticipantType;
+import org.pi4soa.common.util.NamesUtil;
+import org.pi4soa.common.xml.XMLUtils;
+import org.pi4soa.service.Identity;
+import org.pi4soa.service.ServiceException;
+import org.pi4soa.service.behavior.ServiceDescription;
+import org.pi4soa.service.behavior.projection.BehaviorProjection;
+import org.pi4soa.service.correlator.CorrelationSession;
+import org.pi4soa.service.correlator.ServiceCorrelator;
+import org.pi4soa.service.correlator.ServiceCorrelatorListener;
+import org.pi4soa.service.monitor.DefaultMonitorConfiguration;
+import org.pi4soa.service.monitor.ServiceMonitor;
+import org.pi4soa.service.monitor.ServiceMonitorFactory;
+import org.pi4soa.service.tracker.TrackerEvent;
+import org.pi4soa.service.tracker.TrackerRecord;
+
+/**
+ * This class implements the service correlator interface.
+ *
+ */
+public class ServiceCorrelatorImpl implements ServiceCorrelator {
+
+	/**
+	 * The default constructor for the service correlator
+	 * implementation.
+	 *
+	 */
+	public ServiceCorrelatorImpl() {
+		initialize();
+	}
+	
+	/**
+	 * This method initializes the service correlator.
+	 *
+	 */
+	protected void initialize() {
+
+		m_notifier = new CorrelationNotifier();
+
+		m_sessionManager = new CorrelationSessionManager(m_notifier);
+		
+		m_correlatingServiceTracker =
+			new CorrelatingServiceTracker(m_sessionManager, m_notifier);
+	}
+	
+	/**
+	 * This method registers a choreography description with the
+	 * service correlator, to be informed when service tracker
+	 * activities associated with the description are correlated.
+	 * 
+	 * @param cdl The choreography description
+	 * @exception ServiceException Failed to register
+	 */
+	public void register(org.pi4soa.cdl.Package cdl)
+						throws ServiceException {
+		
+		synchronized(m_choreographyDescriptions) {
+			if (m_choreographyDescriptions.containsKey(cdl)) {
+				throw new ServiceException("Choreography " +
+						"description already registered");
+			}
+				
+			// Generate the endpoint projections and register them
+			// with the monitor (in its service repository)
+			Vector sds=new Vector();
+			java.util.List participants=
+					cdl.getTypeDefinitions().getParticipantTypes();
+			
+			java.util.Iterator iter=participants.iterator();
+			while (iter.hasNext()) {
+				ParticipantType partType=(ParticipantType)
+							iter.next();
+				
+				// CDL File path not provided, as not required
+				// for a service description that is only being
+				// used for correlation
+				ServiceDescription sd=
+					BehaviorProjection.projectServiceDescription(cdl,
+							partType, null);
+								
+				DefaultMonitorConfiguration config=
+					new DefaultMonitorConfiguration();
+				config.setServiceTracker(m_correlatingServiceTracker);
+				config.setSessionManager(m_sessionManager);
+				
+				ServiceMonitor serviceMonitor =
+					ServiceMonitorFactory.getServiceMonitor(config);
+				serviceMonitor.getConfiguration().getServiceRepository().
+						addServiceDescription(sd);
+				
+				logger.info("Registering service monitor for '"+
+						sd.getFullyQualifiedName()+"'");
+				
+				m_serviceMonitors.put(sd.getFullyQualifiedName(),
+								serviceMonitor);
+				
+				m_serviceMonitors.put(getLocatedProtocolName(cdl, partType), serviceMonitor);
+				
+				sds.add(sd);
+				
+				m_sessionManager.registerServiceChoreography(
+						sd.getFullyQualifiedName(), cdl);
+
+				ByteArrayOutputStream baos=new ByteArrayOutputStream();
+				try {
+					org.pi4soa.service.util.ServiceDescriptionManager.save(
+							sd,	baos);
+					logger.info(baos.toString());
+				} catch(Exception e) {
+					e.printStackTrace();
+				}	
+			}
+		
+			// Register choreography description
+			m_choreographyDescriptions.put(cdl, sds);
+		}
+		
+		// Notify any registered listeners
+		m_notifier.choreographyDescriptionRegistered(cdl);
+	}
+	
+	protected String getLocatedProtocolName(org.pi4soa.cdl.Package cdl, ParticipantType partType) {
+		String ret=null;
+		
+		ret = new QName(cdl.getTargetNamespace(), cdl.getName()).toString();
+		
+		ret += "@"+XMLUtils.getLocalname(partType.getName());
+		
+		return(ret);
+	}
+	
+	/**
+	 * This method unregisters the supplied choreography
+	 * description, to ignore further correlation situations
+	 * that occur related to this description.
+	 * 
+	 * @param cdl The choreography description
+	 * @exception ServiceException Failed to register
+	 */
+	public void unregister(org.pi4soa.cdl.Package cdl)
+						throws ServiceException {
+		boolean unregistered=false;
+		
+		synchronized(m_choreographyDescriptions) {
+			if (m_choreographyDescriptions.containsKey(cdl)) {
+			
+				// Remove the endpoint projections, associated with
+				// the supplied choreography description and unregister them
+				// from the monitor's service repository
+				Vector sds=(Vector)m_choreographyDescriptions.get(cdl);
+				
+				for (int i=0; i < sds.size(); i++) {
+					ServiceDescription sd=(ServiceDescription)sds.get(i);
+					
+					ServiceMonitor serviceMonitor=(ServiceMonitor)
+							m_serviceMonitors.get(sd.getFullyQualifiedName());
+					
+					serviceMonitor.getConfiguration().getServiceRepository().
+								removeServiceDescription(sd);
+					
+					// TODO: Need to close down monitor
+					
+					m_serviceMonitors.remove(sd.getFullyQualifiedName());
+					
+					m_sessionManager.unregisterServiceChoreography(
+							sd.getFullyQualifiedName(), cdl);
+				}
+				
+				m_choreographyDescriptions.remove(cdl);
+				
+				unregistered = true;
+			}
+		}
+
+		if (unregistered) {
+ 			// Notify any registered listeners
+			m_notifier.choreographyDescriptionRegistered(cdl);
+		}
+	}
+	
+	/**
+	 * This method returns the list of choreography descriptions
+	 * registered with the service correlator.
+	 * 
+	 * @return The list of choreography descriptions
+	 */
+	public org.pi4soa.cdl.Package[] getChoreographyDescriptions() {
+		org.pi4soa.cdl.Package[] ret=
+			new org.pi4soa.cdl.Package[m_choreographyDescriptions.size()];
+		
+		Enumeration iter=m_choreographyDescriptions.elements();
+		int index=0;
+		while (iter.hasMoreElements()) {
+			ret[index++] = (org.pi4soa.cdl.Package)iter.nextElement();
+		}
+		
+		return(ret);                                           
+	}
+	
+	/**
+	 * This method returns the current list of correlated
+	 * sessions associated with the specified choreography
+	 * description.
+	 * 
+	 * @param cdl The choreography description
+	 * @return The list of correlated sessions
+	 */
+	public CorrelationSession[] getCorrelationSessions(org.pi4soa.cdl.Package cdl) {
+		CorrelationSession[] ret=null;
+		java.util.Vector tmp=new java.util.Vector();
+		
+		java.util.List list=m_sessionManager.getCorrelationSessions();
+		for (int i=0; i < list.size(); i++) {
+			CorrelationSession sess=(CorrelationSession)list.get(i);
+			
+			if (sess.getChoreographyDescription() == cdl) {
+				tmp.add(sess);
+			}
+		}
+		
+		ret = new CorrelationSession[tmp.size()];
+		tmp.copyInto(ret);
+		
+		return(ret);
+	}
+	
+	/**
+	 * This method returns the correlated
+	 * session associated with the specified choreography
+	 * description and set of identities.
+	 * 
+	 * @param cdl The choreography description
+	 * @return The list of correlated sessions
+	 * @exception ServiceException Failed to get single session for
+	 * 					supplied choreography description and ids
+	 */
+	public CorrelationSession getCorrelationSession(org.pi4soa.cdl.Package cdl,
+			java.util.List<Identity> ids) throws ServiceException {
+		CorrelationSession ret=null;
+		
+		CorrelationSession[] sessions=getCorrelationSessions(cdl);
+		for (int i=0; sessions != null &&
+					i < sessions.length; i++) {
+			if (sessions[i].isIdentifiedBy(ids)) {
+				
+				if (ret == null) {
+					ret = sessions[i];
+				} else {
+					throw new ServiceException("Multiple correlation "+
+							"sessions with same identity");
+				}
+			}
+		}
+		
+		return(ret);
+	}
+	
+	/**
+	 * This method adds a listener for notifications regarding
+	 * correlated sessions.
+	 * 
+	 * @param l The listener
+	 */
+	public void addServiceCorrelatorListener(ServiceCorrelatorListener l) {
+		m_notifier.addServiceCorrelatorListener(l);
+	}
+	
+	/**
+	 * This method removes a listener for notifications regarding
+	 * correlated sessions.
+	 * 
+	 * @param l The listener
+	 */
+	public void removeServiceCorrelatorListener(ServiceCorrelatorListener l) {
+		m_notifier.removeServiceCorrelatorListener(l);
+	}
+	
+	/**
+	 * This method is invoked to handle the supplied service
+	 * tracker record.
+	 * 
+	 * @param record The record
+	 */
+	public void handleTrackerRecord(TrackerRecord record) {
+		
+		// Obtain message related tracker events
+		if (record != null && record.getTrackerEvents() != null) {
+			ServiceMonitor serviceMonitor=null;
+			
+			logger.info("Handle correlation record: "+record.toXML());
+
+			if (NamesUtil.isSet(record.getServiceDescriptionName())) {
+				serviceMonitor = (ServiceMonitor)
+					m_serviceMonitors.get(record.getServiceDescriptionName());
+			
+				if (serviceMonitor == null) {
+					logger.info("Could not find Service Monitor for '"+
+							record.getServiceDescriptionName()+"'");
+				}
+			} else {
+				logger.info("Tracker record is not associated with a service");
+			}
+			
+			for (int i=0; serviceMonitor != null &&
+						i < record.getTrackerEvents().length; i++) {
+				TrackerEvent event=record.getTrackerEvents()[i];
+				
+				if (event.getEventType() == null) {
+					
+					logger.warning("Tracker event '"+event+
+								"' has no event type");
+					
+				} else if (event.getEventType().equals(TrackerEvent.RECEIVED_MESSAGE)) {
+					
+					try {
+						serviceMonitor.messageReceived(event.getMessage());
+					} catch(Exception se) {
+						logger.log(Level.SEVERE,
+								"Failed to handle 'messageReceived' tracker event '"+
+								event.toXML()+"': "+se, se);
+						
+						// Report error to notifier
+						java.util.List<Identity> ids=record.getPrimaryIdentities();
+						
+						if ((ids == null || ids.size() == 0) &&
+								record.getSessionIdentity() != null) {
+							ids = new java.util.Vector<Identity>();
+							ids.add(record.getSessionIdentity());
+						}
+						
+						CorrelationSessionImpl cs=
+							m_sessionManager.getCorrelationSessionImpl(ids);
+						
+						m_notifier.error("Correlater detected problem: "+se.getMessage(),
+								event.getException(),
+								cs, record.getServiceDescriptionName());
+					}
+				} else if (event.getEventType().equals(TrackerEvent.SENT_MESSAGE)) {
+					
+					try {
+						serviceMonitor.messageSent(event.getMessage());
+					} catch(Exception se) {
+						logger.log(Level.SEVERE,
+								"Failed to handle 'messageSent' tracker event '"+
+								event.toXML()+"': "+se, se);
+						
+						// Report error to notifier
+						java.util.List<Identity> ids=record.getPrimaryIdentities();
+						
+						if ((ids == null || ids.size() == 0) &&
+								record.getSessionIdentity() != null) {
+							ids = new java.util.Vector<Identity>();
+							ids.add(record.getSessionIdentity());
+						}
+						
+						CorrelationSessionImpl cs=
+							m_sessionManager.getCorrelationSessionImpl(ids);
+						
+						m_notifier.error("Correlater detected problem: "+se.getMessage(),
+								event.getException(),
+								cs, record.getServiceDescriptionName());
+					}
+				} else if (event.getEventType().equals(TrackerEvent.UNEXPECTED_MESSAGE)) {
+					
+					java.util.List<Identity> ids=record.getPrimaryIdentities();
+					
+					if ((ids == null || ids.size() == 0) &&
+							record.getSessionIdentity() != null) {
+						ids = new java.util.Vector<Identity>();
+						ids.add(record.getSessionIdentity());
+					}
+					
+					CorrelationSessionImpl cs=
+						m_sessionManager.getCorrelationSessionImpl(ids);
+					
+					m_notifier.unexpectedMessage(event.getMessage(),
+							cs, record.getServiceDescriptionName());
+					
+				} else if (event.getEventType().equals(TrackerEvent.ERROR)) {
+					
+					java.util.List<Identity> ids=record.getPrimaryIdentities();
+					
+					if ((ids == null || ids.size() == 0) &&
+							record.getSessionIdentity() != null) {
+						ids = new java.util.Vector<Identity>();
+						ids.add(record.getSessionIdentity());
+					}
+					
+					CorrelationSessionImpl cs=
+						m_sessionManager.getCorrelationSessionImpl(ids);
+					
+					m_notifier.error(event.getDetails(),
+							event.getException(),
+							cs, record.getServiceDescriptionName());
+					
+				} else if (event.getEventType().equals(TrackerEvent.WARNING)) {
+					
+					java.util.List<Identity> ids=record.getPrimaryIdentities();
+					
+					if ((ids == null || ids.size() == 0) &&
+							record.getSessionIdentity() != null) {
+						ids = new java.util.Vector<Identity>();
+						ids.add(record.getSessionIdentity());
+					}
+					
+					CorrelationSessionImpl cs=
+						m_sessionManager.getCorrelationSessionImpl(ids);
+					
+					m_notifier.warning(event.getDetails(), 
+							event.getException(),
+							cs, record.getServiceDescriptionName());
+					
+				} else if (event.getEventType().equals(TrackerEvent.INFORMATION)) {
+					
+					java.util.List<Identity> ids=record.getPrimaryIdentities();
+					
+					if ((ids == null || ids.size() == 0) &&
+							record.getSessionIdentity() != null) {
+						ids = new java.util.Vector<Identity>();
+						ids.add(record.getSessionIdentity());
+					}
+					
+					CorrelationSessionImpl cs=
+						m_sessionManager.getCorrelationSessionImpl(ids);
+					
+					m_notifier.information(event.getDetails(), 
+							cs, record.getServiceDescriptionName());
+					
+				} else if (event.getEventType().equals(TrackerEvent.UNHANDLED_EXCEPTION)) {
+					
+					java.util.List<Identity> ids=record.getPrimaryIdentities();
+					
+					if ((ids == null || ids.size() == 0) &&
+							record.getSessionIdentity() != null) {
+						ids = new java.util.Vector<Identity>();
+						ids.add(record.getSessionIdentity());
+					}
+					
+					CorrelationSessionImpl cs=
+						m_sessionManager.getCorrelationSessionImpl(ids);
+					
+					if (logger.isLoggable(Level.FINEST)) {
+						logger.finest("Report as error: "+event.getDetails()+
+									" with exception "+event.getException());
+					}
+					
+					m_notifier.error(event.getDetails(),
+							event.getException(),
+							cs, record.getServiceDescriptionName());
+					
+				} else if (event.getEventType().equals(TrackerEvent.SERVICE_FINISHED)) {
+					
+					// TODO: May be want to have a configuration
+					// parameter to indicate whether these tracker
+					// events should be used to close down service
+					// sessions - may be better to only use the
+					// observable exchanges to achieve this?
+					java.util.List<Identity> ids=record.getPrimaryIdentities();
+					
+					if ((ids == null || ids.size() == 0) &&
+							record.getSessionIdentity() != null) {
+						ids = new java.util.Vector<Identity>();
+						ids.add(record.getSessionIdentity());
+					}
+					
+					CorrelationSessionImpl cs=
+						m_sessionManager.getCorrelationSessionImpl(ids);
+
+					// If session not returned, then assume that
+					// it has already been tied up by the internal
+					// state of the monitored sessions
+					if (cs != null) {
+						cs.disassociateServiceSession(
+								record.getServiceDescriptionName());
+						
+						// Check if correlation session has completed
+						if (cs.getNumberOfServiceSessions() == 0) {							
+							m_sessionManager.pendingClose(cs);
+						}
+					}
+				} else if (logger.isLoggable(Level.FINEST)) {
+					logger.finest("Tracker event type not handled by correlator: "+
+							event.getEventType());
+				}
+			}
+		}
+		
+		// TODO: Could possibly use the 'serviceFinished'
+		// events to indicate that the participant (service)
+		// will no longer be sending further messages -
+		// how can we use this information? Possibly ensuring
+		// that the correlated participant also indicates
+		// the end? Possibly send to the 'correlated session'
+		// which can then enable the information to be
+		// presented
+	}
+	
+	/**
+	 * This method returns the correlation session manager.
+	 * 
+	 * @return The correlation session manager
+	 */
+	protected CorrelationSessionManager getCorrelationSessionManager() {
+		return(m_sessionManager);
+	}
+	
+    private static Logger logger = Logger.getLogger("org.pi4soa.service.correlator.impl");		
+	
+	private Hashtable m_choreographyDescriptions=new Hashtable();
+	private Hashtable m_serviceMonitors=new Hashtable();
+	private CorrelatingServiceTracker m_correlatingServiceTracker=null;
+	private CorrelationSessionManager m_sessionManager=null;
+	private CorrelationNotifier m_notifier=null;
+}



More information about the savara-commits mailing list