[savara-commits] savara SVN: r679 - in branches/experimental/2.0.x: tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Feb 16 16:16:53 EST 2011
Author: objectiser
Date: 2011-02-16 16:16:53 -0500 (Wed, 16 Feb 2011)
New Revision: 679
Added:
branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/
branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelatingServiceTracker.java
branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationNotifier.java
branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionImpl.java
branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionManager.java
branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/ServiceCorrelatorImpl.java
Modified:
branches/experimental/2.0.x/integration/jboss/common/src/main/configs/pi4soa.xml
branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/ServiceTrackerClient.java
branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/TxnMonitor.java
Log:
Needed to take a copy of the pi4soa service correlator to enable it to support events from service descriptions based, and global/located based, endpoints - to support old and new style activity events.
Modified: branches/experimental/2.0.x/integration/jboss/common/src/main/configs/pi4soa.xml
===================================================================
--- branches/experimental/2.0.x/integration/jboss/common/src/main/configs/pi4soa.xml 2011-02-16 17:58:08 UTC (rev 678)
+++ branches/experimental/2.0.x/integration/jboss/common/src/main/configs/pi4soa.xml 2011-02-16 21:16:53 UTC (rev 679)
@@ -21,6 +21,7 @@
-->
<pi4soa>
+<!--
<monitor>
<serviceTracker class="org.savara.validator.pi4soa.JMSServiceTracker" >
<jmsConnectionFactory>ConnectionFactory</jmsConnectionFactory>
@@ -28,4 +29,5 @@
<recordMessagePayload>true</recordMessagePayload>
</serviceTracker>
</monitor>
+-->
</pi4soa>
\ No newline at end of file
Modified: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/ServiceTrackerClient.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/ServiceTrackerClient.java 2011-02-16 17:58:08 UTC (rev 678)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/ServiceTrackerClient.java 2011-02-16 21:16:53 UTC (rev 679)
@@ -65,7 +65,8 @@
for (Analysis a : m_activity.getAnalysis()) {
if (a instanceof ProtocolAnalysis) {
- ret = ((ProtocolAnalysis)a).getProtocol();
+ ret = ((ProtocolAnalysis)a).getProtocol()+"@"+
+ ((ProtocolAnalysis)a).getRole();
break;
}
}
@@ -160,7 +161,7 @@
ret.setMessageIdentities(getPrimaryIdentities());
ret.setOperationName(m_activity.getOperationName());
ret.setFaultName(m_activity.getFaultName());
- ret.setServiceType(m_activity.getDestinationType());
+ //ret.setServiceType(m_activity.getDestinationType());
if (m_activity.getParameter().size() > 0) {
ret.setType(m_activity.getParameter().get(0).getType());
Modified: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/TxnMonitor.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/TxnMonitor.java 2011-02-16 17:58:08 UTC (rev 678)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/TxnMonitor.java 2011-02-16 21:16:53 UTC (rev 679)
@@ -30,6 +30,7 @@
import org.pi4soa.service.correlator.ServiceCorrelatorFactory;
import org.pi4soa.service.correlator.ServiceCorrelatorListener;
import org.pi4soa.service.tracker.jms.JMSServiceTrackerClient;
+import org.savara.tools.monitor.correlator.ServiceCorrelatorImpl;
/**
* The TxnMonitor class is a generic transaction monitor class that
@@ -48,7 +49,7 @@
public void initialize() throws ServiceException
{
- m_correlator=ServiceCorrelatorFactory.getServiceCorrelator();
+ m_correlator=new ServiceCorrelatorImpl(); //ServiceCorrelatorFactory.getServiceCorrelator();
m_correlator.addServiceCorrelatorListener(new CorrelatorListener());
// Obtain service tracker client
Added: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelatingServiceTracker.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelatingServiceTracker.java (rev 0)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelatingServiceTracker.java 2011-02-16 21:16:53 UTC (rev 679)
@@ -0,0 +1,491 @@
+/*
+ * Copyright 2005 Pi4 Technologies Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ * Change History:
+ * Aug 25, 2005 : Initial version created by gary
+ */
+package org.savara.tools.monitor.correlator;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.pi4soa.cdl.CDLType;
+import org.pi4soa.cdl.ExchangeDetails;
+import org.pi4soa.cdl.util.CDLTypeUtil;
+import org.pi4soa.service.Channel;
+import org.pi4soa.service.Message;
+import org.pi4soa.service.behavior.MessageDefinition;
+import org.pi4soa.service.behavior.Receive;
+import org.pi4soa.service.behavior.Send;
+import org.pi4soa.service.behavior.ServiceDescription;
+import org.pi4soa.service.session.Session;
+import org.pi4soa.service.session.internal.InternalSession;
+import org.pi4soa.service.tracker.ServiceTracker;
+
+/**
+ * This class provides an implementation of the service tracker
+ * interface, used to detect tracker events from distributed
+ * service endpoints and correlate them back to the choreography
+ * description being monitored.
+ *
+ */
+public class CorrelatingServiceTracker implements ServiceTracker {
+
+ private static final String UNEXPECTED_MESSAGE_EXCEPTION = "UnexpectedMessageException";
+
+ private static final String UNHANDLED_EXCEPTION = "Unhandled exception: ";
+
+ private static final String CORRELATER_REPORTED = "[Correlater reported] ";
+
+ /**
+ * This is the constructor for the correlating service
+ * tracker.
+ *
+ * @param mgr The correlation session manager
+ * @param notifier The correlation notifier
+ */
+ public CorrelatingServiceTracker(CorrelationSessionManager mgr,
+ CorrelationNotifier notifier) {
+ m_sessionManager = mgr;
+ m_notifier = notifier;
+ }
+
+ /**
+ * This method initializes the service tracker.
+ *
+ */
+ public void initialize() {
+
+ }
+
+ /**
+ * This method indicates that a new service instance
+ * has started.
+ *
+ * @param service The service
+ * @param session The session
+ */
+ public void serviceStarted(ServiceDescription service,
+ Session session) {
+
+ // The CorrelationSessionManager overrides the 'addSession'
+ // method on the superclass DefaultSessionManager, to
+ // intercept the addition of new service sessions
+ // which are used to create or associated with existing
+ // correlation sessions. This overridden method is used
+ // instead of this tracker method, because by the time
+ // the overridden method is called, the session has
+ // identity information, whereas this method is invoked
+ // just after the newly created service session is
+ // instantiated.
+ }
+
+ /**
+ * This method indicates that a service instance
+ * has finished.
+ *
+ * @param service The service
+ * @param session The session
+ */
+ public void serviceFinished(ServiceDescription service,
+ Session session) {
+
+ // The overridden 'removeSession' method in the
+ // CorrelationSessionManager is used, instead of this
+ // tracker callback, to remove a session from the
+ // associated correlation session - to remain
+ // symmetrical with the way that the session is associated
+ // (see note above).
+ }
+
+ /**
+ * This method indicates that a new sub session
+ * has started within an existing service instance.
+ *
+ * @param parent The parent session
+ * @param session The session
+ */
+ public void subSessionStarted(Session parent, Session session) {
+ }
+
+ /**
+ * This method indicates that an existing
+ * sub session has finished.
+ *
+ * @param parent The parent session
+ * @param session The session
+ */
+ public void subSessionFinished(Session parent, Session session) {
+ }
+
+ /**
+ * This method registers the fact that a message has been
+ * sent.
+ *
+ * @param activity The behavioral activity, or null if a stateless service
+ * @param session The session, or null if a stateless service
+ * @param channel The channel, or null if a stateless service
+ * @param mesg The message that has been handled
+ */
+ public void sentMessage(Send activity, Session session,
+ Channel channel, Message mesg) {
+
+ // Get the correlation session for the service session
+ CorrelationSessionImpl corrSession=
+ getCorrelationSession(session, mesg);
+
+ if (corrSession != null) {
+
+ // Derive the choreography description node associated
+ // with the behavioral send activity
+ CDLType cdlType=CDLTypeUtil.getCDLType(
+ corrSession.getChoreographyDescription(),
+ activity.getGlobalDescriptionURI());
+
+ if (cdlType == null) {
+ logger.severe("Failed to locate CDL type associated " +
+ "with behavior activity '"+activity+
+ "' in choreography '"+
+ corrSession.getChoreographyDescription()+"'");
+
+ } else if (cdlType instanceof ExchangeDetails) {
+
+ corrSession.exchangeInitiated((ExchangeDetails)cdlType,
+ channel);
+
+ // Notify any registered listeners
+ m_notifier.exchangeInitiated((ExchangeDetails)cdlType,
+ channel, mesg, corrSession,
+ session.getId().getServiceDescriptionName());
+ } else {
+ logger.severe("CDL type '"+cdlType+
+ "', associated with behavior activity '"+
+ activity+"' is not an ExchangeDetails type");
+ }
+ }
+ }
+
+ /**
+ * This method registers the fact that a message has been
+ * sent from a stateless service.
+ *
+ * @param defn The message definition
+ * @param mesg The message that has been handled
+ */
+ public void sentMessage(MessageDefinition defn, Message mesg) {
+
+ // Get the correlation session for the service session
+ CorrelationSessionImpl corrSession=
+ getCorrelationSession(null, mesg);
+
+ if (corrSession != null) {
+
+ // TODO: Not sure if we need to report a stateless
+ // message event to the correlation session??
+ //corrSession.exchangeInitiated((ExchangeDetails)cdlType,
+ // channel);
+
+ // Notify any registered listeners
+ m_notifier.exchangeInitiated(null, null, mesg, corrSession,
+ defn.getServiceDescription().getName());
+ }
+ }
+
+ /**
+ * This method registers the fact that a message has been
+ * received.
+ *
+ * @param activity The behavioral activity, or null if a stateless service
+ * @param session The session, or null if a stateless service
+ * @param channel The channel, or null if a stateless service
+ * @param mesg The message that has been handled
+ */
+ public void receivedMessage(Receive activity, Session session,
+ Channel channel, Message mesg) {
+
+ // Get the correlation session for the service session
+ CorrelationSessionImpl corrSession=
+ getCorrelationSession(session, mesg);
+
+ if (corrSession != null) {
+
+ // Derive the choreography description node associated
+ // with the behavioral receive activity
+ CDLType cdlType=CDLTypeUtil.getCDLType(
+ corrSession.getChoreographyDescription(),
+ activity.getGlobalDescriptionURI());
+
+ if (cdlType == null) {
+ logger.severe("Failed to locate CDL type associated " +
+ "with behavior activity '"+activity+
+ "' in choreography '"+
+ corrSession.getChoreographyDescription()+"'");
+
+ } else if (cdlType instanceof ExchangeDetails) {
+
+ corrSession.exchangeCompleted((ExchangeDetails)cdlType,
+ channel);
+
+ // Notify any registered listeners
+ m_notifier.exchangeCompleted((ExchangeDetails)cdlType,
+ channel, mesg, corrSession,
+ session.getId().getServiceDescriptionName());
+
+ } else {
+ logger.severe("CDL type '"+cdlType+
+ "', associated with behavior activity '"+
+ activity+"' is not an ExchangeDetails type");
+ }
+ }
+ }
+
+ /**
+ * This method registers the fact that a message has been
+ * received from a stateless service.
+ *
+ * @param defn The message definition
+ * @param mesg The message that has been handled
+ */
+ public void receivedMessage(MessageDefinition defn, Message mesg) {
+
+ // Get the correlation session for the service session
+ CorrelationSessionImpl corrSession=
+ getCorrelationSession(null, mesg);
+
+ if (corrSession != null) {
+
+ // TODO: Not sure if we need to report a stateless
+ // message event to the correlation session??
+ //corrSession.exchangeCompleted((ExchangeDetails)cdlType,
+ // channel);
+
+ // Notify any registered listeners
+ m_notifier.exchangeCompleted(null, null, mesg, corrSession,
+ defn.getServiceDescription().getName());
+ }
+ }
+
+ /**
+ * This method registers that a message was not expected.
+ *
+ * @param session The session, or null if a stateless service
+ * @param mesg The message that was not expected
+ * @param reason The optional reason why the message was
+ * considered to be unexpected
+ * @deprecated Use unexpectedMessage(ServiceDescription sdesc,
+ * Session session, Message mesg, String reason)
+ */
+ public void unexpectedMessage(Session session, Message mesg,
+ String reason) {
+ }
+
+ /**
+ * This method registers that a message was not expected.
+ *
+ * @param sdesc The service description, if known
+ * @param session The session, or null if a stateless service,
+ * or cannot be associated with a session
+ * @param mesg The message that was not expected
+ * @param reason The optional reason why the message was
+ * considered to be unexpected
+ */
+ public void unexpectedMessage(ServiceDescription sdesc,
+ Session session, Message mesg, String reason) {
+ // Get the correlation session for the service session
+ CorrelationSessionImpl corrSession=
+ getCorrelationSession(session, null);
+
+ if (corrSession != null) {
+ m_notifier.error(CORRELATER_REPORTED+mesg,
+ UNEXPECTED_MESSAGE_EXCEPTION, corrSession,
+ session.getId().getServiceDescriptionName());
+ }
+ }
+
+ /**
+ * This method registers that an exception was not handled.
+ *
+ * @param session The session, or null if a stateless service
+ * @param excType The exception type
+ */
+ public void unhandledException(Session session, String excType) {
+ // Get the correlation session for the service session
+ CorrelationSessionImpl corrSession=
+ getCorrelationSession(session, null);
+
+ if (corrSession != null) {
+ m_notifier.error(CORRELATER_REPORTED+
+ UNHANDLED_EXCEPTION+excType,
+ excType, corrSession,
+ session.getId().getServiceDescriptionName());
+ }
+ }
+
+ /**
+ * This method reports information regarding the processing
+ * of a service session. The details can either be specified
+ * as a textual string (unstructured data),
+ * or as a structured XML fragment.<p>
+ *
+ * @param session The session, or null if a stateless service
+ * @param details The details
+ * @param type The optional type
+ */
+ public void information(Session session, String details) {
+ // Get the correlation session for the service session
+ CorrelationSessionImpl corrSession=
+ getCorrelationSession(session, null);
+
+ if (corrSession != null) {
+ m_notifier.information(CORRELATER_REPORTED+details,
+ corrSession,
+ session.getId().getServiceDescriptionName());
+ }
+ }
+
+ /**
+ * This method reports information regarding the processing
+ * of a service session. The details can either be specified
+ * as a textual string (unstructured data),
+ * or as a structured XML fragment.<p>
+ *
+ * @param session The session, or null if a stateless service
+ * @param details The details
+ * @param exc The optional exception
+ */
+ public void warning(Session session, String details, Throwable exc) {
+
+ // Get the correlation session for the service session
+ CorrelationSessionImpl corrSession=
+ getCorrelationSession(session, null);
+
+ if (corrSession != null) {
+ m_notifier.warning(CORRELATER_REPORTED+details,
+ exc.toString(), corrSession,
+ session.getId().getServiceDescriptionName());
+ }
+ }
+
+ /**
+ * This method reports information regarding the processing
+ * of a service session. The details can either be specified
+ * as a textual string (unstructured data),
+ * or as a structured XML fragment.<p>
+ *
+ * @param session The session, or null if a stateless service
+ * @param details The details
+ * @param exc The optional exception
+ */
+ public void error(Session session, String details, Throwable exc) {
+ // Get the correlation session for the service session
+ CorrelationSessionImpl corrSession=
+ getCorrelationSession(session, null);
+
+ if (corrSession != null) {
+ m_notifier.error(CORRELATER_REPORTED+details,
+ exc.toString(), corrSession,
+ session.getId().getServiceDescriptionName());
+ }
+ }
+
+ /**
+ * This method closes the service tracker.
+ *
+ */
+ public void close() {
+ }
+
+ /**
+ * This method returns the top level session associated with
+ * the supplied session.
+ *
+ * @param subsession The subsession
+ * @return The top level session
+ */
+ protected Session getTopLevelSession(Session subsession) {
+ Session ret=subsession;
+
+ while (ret instanceof InternalSession &&
+ ((InternalSession)ret).getParent() != null) {
+ ret = ((InternalSession)ret).getParent();
+ }
+
+ return(ret);
+ }
+
+ /**
+ * This method determines which correlation session should be
+ * used based on the supplied session and/or message.
+ *
+ * @param session The session, or null if stateless session
+ * @param mesg The message
+ * @return The correlation session, or null if not found
+ */
+ protected CorrelationSessionImpl getCorrelationSession(Session session,
+ Message mesg) {
+ CorrelationSessionImpl ret=null;
+
+ if (session != null) {
+
+ Session topLevelSession=getTopLevelSession(session);
+
+ if (topLevelSession != null) {
+ ret = m_sessionManager.getCorrelationSessionImpl(topLevelSession, mesg);
+ } else {
+ logger.warning("Failed to find top level session for '"+
+ session+"'");
+ }
+
+ if (ret == null) {
+ logger.severe("Failed to locate correlation session " +
+ "for service session '"+topLevelSession+"'");
+ }
+ } else if (mesg != null) {
+
+ // Stateless service
+ ret = m_sessionManager.getCorrelationSessionImpl(
+ mesg.getMessageIdentities());
+
+ if (ret == null) {
+ StringBuffer buf=new StringBuffer();
+
+ for (int i=0; mesg.getMessageIdentities() != null &&
+ i < mesg.getMessageIdentities().size(); i++) {
+
+ if (i > 0) {
+ buf.append(',');
+ }
+ buf.append(mesg.getMessageIdentities().get(i).getId());
+ }
+
+ logger.severe("Failed to locate correlation session " +
+ "for stateless service with identities '"+
+ buf.toString()+"'");
+
+ } else if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Found correlation session '"+ret+
+ "' for stateless service");
+ }
+ }
+
+ return(ret);
+ }
+
+ private static Logger logger = Logger.getLogger("org.pi4soa.service.correlator.impl");
+
+ private CorrelationSessionManager m_sessionManager=null;
+ private CorrelationNotifier m_notifier=null;
+}
Added: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationNotifier.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationNotifier.java (rev 0)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationNotifier.java 2011-02-16 21:16:53 UTC (rev 679)
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2005 Pi4 Technologies Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ * Change History:
+ * Sep 9, 2005 : Initial version created by gary
+ */
+package org.savara.tools.monitor.correlator;
+
+import java.util.Vector;
+
+import org.pi4soa.cdl.ExchangeDetails;
+import org.pi4soa.service.Channel;
+import org.pi4soa.service.Message;
+import org.pi4soa.service.correlator.CorrelationSession;
+import org.pi4soa.service.correlator.ServiceCorrelatorListener;
+
+public class CorrelationNotifier implements ServiceCorrelatorListener {
+
+ /**
+ * This method indicates that the supplied choreography
+ * description has been registered with the service
+ * correlator.
+ *
+ * @param cdlpack The choreography description
+ */
+ public void choreographyDescriptionRegistered(org.pi4soa.cdl.Package cdlpack) {
+
+ synchronized(m_listeners) {
+ for (int i=0; i < m_listeners.size(); i++) {
+ ServiceCorrelatorListener l=
+ (ServiceCorrelatorListener)m_listeners.get(i);
+
+ l.choreographyDescriptionRegistered(cdlpack);
+ }
+ }
+ }
+
+ /**
+ * This method indicates that the supplied choreography
+ * description has been unregistered from the service
+ * correlator.
+ *
+ * @param cdlpack The choreography description
+ */
+ public void choreographyDescriptionUnregistered(org.pi4soa.cdl.Package cdlpack) {
+
+ synchronized(m_listeners) {
+ for (int i=0; i < m_listeners.size(); i++) {
+ ServiceCorrelatorListener l=
+ (ServiceCorrelatorListener)m_listeners.get(i);
+
+ l.choreographyDescriptionRegistered(cdlpack);
+ }
+ }
+ }
+
+ /**
+ * This method indicates that a new correlated session has
+ * been started.
+ *
+ * @param session The session
+ */
+ public void sessionStarted(CorrelationSession session) {
+
+ synchronized(m_listeners) {
+ for (int i=0; i < m_listeners.size(); i++) {
+ ServiceCorrelatorListener l=
+ (ServiceCorrelatorListener)m_listeners.get(i);
+
+ l.sessionStarted(session);
+ }
+ }
+ }
+
+ /**
+ * This method indicates that a correlated session has
+ * been finished.
+ *
+ * @param session The session
+ */
+ public void sessionFinished(CorrelationSession session) {
+
+ synchronized(m_listeners) {
+ for (int i=0; i < m_listeners.size(); i++) {
+ ServiceCorrelatorListener l=
+ (ServiceCorrelatorListener)m_listeners.get(i);
+
+ l.sessionFinished(session);
+ }
+ }
+ }
+
+ /**
+ * This method is invoked to indicate that a choreography
+ * exchange has been initiated by one participant.
+ *
+ * @param exchange The exchange details
+ * @param channel The channel associated with the exchange
+ * @param message The message
+ * @param session The session
+ * @param serviceDescriptionName The name of the service
+ * description that caused this exchange to
+ * be initiated
+ */
+ public void exchangeInitiated(ExchangeDetails exchange,
+ Channel channel, Message message,
+ CorrelationSession session, String serviceDescriptionName) {
+
+ synchronized(m_listeners) {
+ for (int i=0; i < m_listeners.size(); i++) {
+ ServiceCorrelatorListener l=
+ (ServiceCorrelatorListener)m_listeners.get(i);
+
+ l.exchangeInitiated(exchange,
+ channel, message, session,
+ serviceDescriptionName);
+ }
+ }
+ }
+
+ /**
+ * This method is invoked to indicate that a choreography
+ * exchange has been completed by the target participant.
+ *
+ * @param exchange The exchange details
+ * @param channel The channel associated with the exchange
+ * @param message The message
+ * @param session The session
+ * @param serviceDescriptionName The name of the service
+ * description that caused this exchange to
+ * be completed
+ */
+ public void exchangeCompleted(ExchangeDetails exchange,
+ Channel channel, Message message,
+ CorrelationSession session, String serviceDescriptionName) {
+
+ synchronized(m_listeners) {
+ for (int i=0; i < m_listeners.size(); i++) {
+ ServiceCorrelatorListener l=
+ (ServiceCorrelatorListener)m_listeners.get(i);
+
+ l.exchangeCompleted(exchange,
+ channel, message, session,
+ serviceDescriptionName);
+ }
+ }
+ }
+
+ /**
+ * This method is invoked to indicate that a message
+ * was unexpected at a participant.
+ *
+ * @param message The message
+ * @param session The session
+ * @param serviceDescriptionName The name of the service
+ * description that caused this unexpected
+ * message error
+ */
+ public void unexpectedMessage(Message message,
+ CorrelationSession session, String serviceDescriptionName) {
+
+ synchronized(m_listeners) {
+ for (int i=0; i < m_listeners.size(); i++) {
+ ServiceCorrelatorListener l=
+ (ServiceCorrelatorListener)m_listeners.get(i);
+
+ l.unexpectedMessage(message, session,
+ serviceDescriptionName);
+ }
+ }
+ }
+
+ /**
+ * An error occurred related to the specified correlation
+ * session.
+ *
+ * @param mesg The error message
+ * @param exception The optional exception details
+ * @param session The correlation session
+ * @param serviceDescriptionName The service name
+ */
+ public void error(String mesg, String exception,
+ CorrelationSession session, String serviceDescriptionName) {
+
+ synchronized(m_listeners) {
+ for (int i=0; i < m_listeners.size(); i++) {
+ ServiceCorrelatorListener l=
+ (ServiceCorrelatorListener)m_listeners.get(i);
+
+ l.error(mesg, exception, session,
+ serviceDescriptionName);
+ }
+ }
+ }
+
+ /**
+ * A warning occurred related to the specified correlation
+ * session.
+ *
+ * @param mesg The warning message
+ * @param exception The optional exception details
+ * @param session The correlation session
+ * @param serviceDescriptionName The service name
+ */
+ public void warning(String mesg, String exception,
+ CorrelationSession session, String serviceDescriptionName) {
+
+ synchronized(m_listeners) {
+ for (int i=0; i < m_listeners.size(); i++) {
+ ServiceCorrelatorListener l=
+ (ServiceCorrelatorListener)m_listeners.get(i);
+
+ l.warning(mesg, exception, session,
+ serviceDescriptionName);
+ }
+ }
+ }
+
+ /**
+ * An information event occurred related to the specified correlation
+ * session.
+ *
+ * @param mesg The information message
+ * @param session The correlation session
+ * @param serviceDescriptionName The service name
+ */
+ public void information(String mesg, CorrelationSession session,
+ String serviceDescriptionName) {
+
+ synchronized(m_listeners) {
+ for (int i=0; i < m_listeners.size(); i++) {
+ ServiceCorrelatorListener l=
+ (ServiceCorrelatorListener)m_listeners.get(i);
+
+ l.information(mesg, session,
+ serviceDescriptionName);
+ }
+ }
+ }
+
+ /**
+ * This method adds a listener for notifications regarding
+ * correlated sessions.
+ *
+ * @param l The listener
+ */
+ public void addServiceCorrelatorListener(ServiceCorrelatorListener l) {
+ synchronized(m_listeners) {
+ m_listeners.add(l);
+ }
+ }
+
+ /**
+ * This method removes a listener for notifications regarding
+ * correlated sessions.
+ *
+ * @param l The listener
+ */
+ public void removeServiceCorrelatorListener(ServiceCorrelatorListener l) {
+ synchronized(m_listeners) {
+ m_listeners.remove(l);
+ }
+ }
+
+ private Vector m_listeners=new Vector();
+}
Added: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionImpl.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionImpl.java (rev 0)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionImpl.java 2011-02-16 21:16:53 UTC (rev 679)
@@ -0,0 +1,277 @@
+/*
+ * Copyright 2005 Pi4 Technologies Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ * Change History:
+ * Sep 8, 2005 : Initial version created by gary
+ */
+package org.savara.tools.monitor.correlator;
+
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.pi4soa.cdl.ExchangeDetails;
+import org.pi4soa.service.Channel;
+import org.pi4soa.service.Identity;
+import org.pi4soa.service.correlator.CorrelationSession;
+import org.pi4soa.service.session.Session;
+
+/**
+ * This class provides an implementation of the correlation
+ * session.
+ *
+ */
+public class CorrelationSessionImpl implements CorrelationSession {
+
+ /**
+ * This is the constructor for the correlation session.
+ *
+ * @param cdl The choreography description being correlated
+ */
+ public CorrelationSessionImpl(org.pi4soa.cdl.Package cdl) {
+ m_choreographyDescription = cdl;
+ }
+
+ /**
+ * This method returns the choreography description associated
+ * with the correlated session.
+ *
+ * @return The
+ */
+ public org.pi4soa.cdl.Package getChoreographyDescription() {
+ return(m_choreographyDescription);
+ }
+
+ /**
+ * This method associates a service session with the correlation
+ * session.
+ *
+ * @param session The service session
+ */
+ public void associateServiceSession(Session session) {
+ logger.fine("Associate service session '"+session+"'");
+
+ m_serviceSessions.add(session);
+ }
+
+ /**
+ * This method disassociates a service session from
+ * the correlation session.
+ *
+ * @param session The service session
+ */
+ public void disassociateServiceSession(Session session) {
+ logger.fine("Disassociate service session name '"+session+"'");
+
+ m_serviceSessions.remove(session);
+
+ // Apply identities to past values - this is used when
+ // all of the sessions are removed, but the session
+ // is not necessarily completed
+ java.util.Set<Identity> ids=session.getPrimaryIdentities();
+
+ java.util.Iterator<Identity> iter=ids.iterator();
+ while (iter.hasNext()) {
+ Identity cur=iter.next();
+
+ if (m_pastIdentities.contains(cur) == false) {
+
+ logger.info("Correlation session '"+this+
+ "' - adding identity to past list: "+cur);
+
+ m_pastIdentities.add(cur);
+ }
+ }
+ }
+
+ /**
+ * This method disassociates a service session from
+ * the correlation session.
+ *
+ * @param serviceName The service name
+ */
+ public void disassociateServiceSession(String serviceName) {
+ logger.fine("Disassociate service session name '"+serviceName+"'");
+
+ for (int i=m_serviceSessions.size()-1; i >= 0; i--) {
+ Session session=(Session)m_serviceSessions.get(i);
+
+ if (session.getId().getServiceDescriptionName().
+ equals(serviceName)) {
+
+ disassociateServiceSession(session);
+ }
+ }
+ }
+
+ /**
+ * This method returns the number of service sessions
+ * associated with the correlation session.
+ *
+ * @return The number of service sessions
+ */
+ public int getNumberOfServiceSessions() {
+ return(m_serviceSessions.size());
+ }
+
+ /**
+ * This method is invoked to indicate that a choreography
+ * exchange has been initiated by one participant.
+ *
+ * @param exchange The exchange details
+ * @param channel The channel associated with the exchange
+ */
+ public void exchangeInitiated(ExchangeDetails exchange, Channel channel) {
+
+ synchronized(m_initiated) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Exchange initiated="+exchange+" on channel="+channel);
+ }
+
+ m_initiated.add(exchange);
+ }
+ }
+
+ /**
+ * This method is invoked to indicate that a choreography
+ * exchange has been completed by a target participant.
+ *
+ * @param exchange The exchange details
+ * @param channel The channel associated with the exchange
+ */
+ public void exchangeCompleted(ExchangeDetails exchange, Channel channel) {
+
+ synchronized(m_initiated) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Exchange completed="+exchange+" on channel="+channel);
+ }
+
+ if (m_initiated.contains(exchange)) {
+ m_completed.add(exchange);
+
+ m_initiated.remove(exchange);
+ } else {
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("NOTE: Completed exchange="+
+ exchange+" has not been initiated");
+ }
+ }
+ }
+ }
+
+ /**
+ * This method returns the identities associated with this
+ * correlation session.
+ *
+ * @return The identities
+ */
+ public java.util.List<Identity> getIdentities() {
+ java.util.List<Identity> ret=new java.util.Vector<Identity>();
+
+ for (int i=0; i < m_pastIdentities.size(); i++) {
+ Identity pastId=(Identity)m_pastIdentities.get(i);
+
+ if (ret.contains(pastId) == false) {
+ ret.add(pastId);
+ }
+ }
+
+ for (int i=0; i < m_serviceSessions.size(); i++) {
+ Session session=(Session)m_serviceSessions.get(i);
+
+ java.util.Set<Identity> ids=session.getPrimaryIdentities();
+
+ java.util.Iterator<Identity> iter=ids.iterator();
+
+ while (iter.hasNext()) {
+ Identity cur=iter.next();
+
+ if (ret.contains(cur) == false) {
+ ret.add(cur);
+ }
+ }
+
+ if (session.getSessionIdentity() != null &&
+ ret.contains(session.getSessionIdentity()) == false) {
+ ret.add(session.getSessionIdentity());
+ }
+ }
+
+ return(ret);
+ }
+
+ /**
+ * This method determines whether the session is identified by
+ * the supplied ids.
+ *
+ * @param ids The identities
+ * @return Whether the session is associated with the supplied id
+ */
+ public boolean isIdentifiedBy(java.util.Collection<Identity> ids) {
+ boolean ret=false;
+
+ if (ids != null) {
+ java.util.Iterator<Identity> iter=ids.iterator();
+
+ while (ret == false && iter.hasNext()) {
+ Identity cur=iter.next();
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("Is identified by="+cur);
+ }
+
+ for (int i=0; ret == false &&
+ i < m_serviceSessions.size(); i++) {
+ Session session=(Session)m_serviceSessions.get(i);
+
+ ret = session.isIdentifiedBy(cur);
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("Checked id="+cur+
+ " against session="+session+" = "+ret);
+ }
+ }
+
+ for (int i=0; ret == false &&
+ i < m_pastIdentities.size(); i++) {
+ Identity pastId=(Identity)m_pastIdentities.get(i);
+
+ ret = pastId.equals(cur);
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("Checked against past id="+pastId+" = "+ret);
+ }
+ }
+ }
+ }
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("Is correlation session ("+this+
+ ") identified by "+ids+" = "+ret);
+ }
+
+ return(ret);
+ }
+
+ private static Logger logger = Logger.getLogger("org.pi4soa.service.correlator.impl");
+
+ private org.pi4soa.cdl.Package m_choreographyDescription=null;
+ private Vector m_serviceSessions=new Vector();
+ private Vector m_initiated=new Vector();
+ private Vector m_completed=new Vector();
+ private java.util.List<Identity> m_pastIdentities=new Vector<Identity>();
+}
Added: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionManager.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionManager.java (rev 0)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/CorrelationSessionManager.java 2011-02-16 21:16:53 UTC (rev 679)
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2005 Pi4 Technologies Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ * Change History:
+ * Sep 8, 2005 : Initial version created by gary
+ */
+package org.savara.tools.monitor.correlator;
+
+import java.util.Hashtable;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.pi4soa.service.Identity;
+import org.pi4soa.service.session.DefaultSessionManager;
+import org.pi4soa.service.session.Session;
+import org.pi4soa.service.session.SessionManagerException;
+
+/**
+ * This class provides the management of service and correlation
+ * sessions.
+ *
+ */
+public class CorrelationSessionManager extends DefaultSessionManager {
+
+ /**
+ * This is the constructor for the correlation session manager.
+ *
+ * @param notifier The notifier
+ */
+ public CorrelationSessionManager(CorrelationNotifier notifier) {
+ m_notifier = notifier;
+ }
+
+ /**
+ * This method adds a new session instance to the session manager,
+ * and returns a reference that can be used to retrieve the
+ * session at a later time.
+ *
+ * @param session The session
+ * @return The session reference for the added session
+ * @exception SessionManagerException Failed to add session
+ */
+ @Override
+ public void addSession(Session session)
+ throws SessionManagerException {
+ super.addSession(session);
+
+ // Synchronize on correlation session list, as
+ // other methods use these lists (pendingId and
+ // correlSession) at the same time
+ synchronized(m_correlationSessions) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Adding session '"+session+
+ "' to pending identification list");
+ }
+
+ // Add session to 'pending identification' list
+ m_pendingIdentification.add(session);
+ }
+ }
+
+ /**
+ * This method returns the correlation session implementation
+ * associated with the supplied session. If more than one
+ * correlation session is found to have the same identity
+ * information, then a merge of the correlation sessions will be
+ * performed.
+ *
+ * @param session The service session
+ * @param mesg The optional message
+ * @return The correlation session
+ */
+ protected CorrelationSessionImpl getCorrelationSessionImpl(Session session,
+ org.pi4soa.service.Message mesg) {
+ CorrelationSessionImpl corrSess=null;
+
+ synchronized(m_correlationSessions) {
+ boolean pendingStarted=false;
+
+ // TODO: Could check which choreography session this
+ // session can be added to - then generate 'participant'
+ // joined session messages?
+
+ // Find an appropriate correlation session to associate
+ // with the session
+ java.util.Set<Identity> primIds=session.getPrimaryIdentities();
+
+ corrSess = getCorrelationSessionImpl(primIds);
+
+ if (corrSess == null && mesg != null) {
+ // Check if message identities can find the
+ // correlation session
+ corrSess = getCorrelationSessionImpl(mesg.getMessageIdentities());
+ }
+
+ if (corrSess == null) {
+
+ if (primIds == null || primIds.size() == 0) {
+ logger.severe("Correlation session is going to be " +
+ "created with no primary ids available");
+ } else {
+ String str="Primary ids: ";
+
+ java.util.Iterator<Identity> iter=primIds.iterator();
+ while (iter.hasNext()) {
+ str += iter.next().getId();
+ }
+
+ logger.info(str);
+ }
+
+ // Find choreography for this session
+ org.pi4soa.cdl.Package cdlpack=
+ getChoreographyForService(session.getId().
+ getServiceDescriptionName());
+
+ if (cdlpack == null) {
+ logger.severe("Unable to find choreography for service '"+
+ session.getId().getServiceDescriptionName());
+ } else {
+ corrSess = new CorrelationSessionImpl(cdlpack);
+ m_correlationSessions.add(corrSess);
+
+ pendingStarted = true;
+
+ logger.info("New correlation session="+corrSess);
+ }
+ } else {
+
+ if (corrSess.getNumberOfServiceSessions() == 0) {
+
+ if (m_pendingCloseSessions.remove(corrSess)) {
+ logger.fine("Removed correlation session " +
+ "from pending close list: "+corrSess);
+ }
+ }
+ }
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Check if session '"+session+
+ "' is pending identification: "+
+ m_pendingIdentification.contains(session));
+ }
+
+ if (corrSess != null && m_pendingIdentification.contains(session)) {
+ corrSess.associateServiceSession(session);
+
+ logger.info("Associating service session '"+
+ session+"' with correlation session '"+
+ corrSess+"'");
+
+ m_pendingIdentification.remove(session);
+
+ // TODO: IDEAS: choreography session is used to store the
+ // list of initiated and completed activities (exchanges)
+ // recorded by the send and receive events for the
+ // associated CDL exchange activity. Possibly the list
+ // of completed activities is retained to act as an
+ // audit trail? Not sure if service sessions are necessary
+ // to be recorded against correlation session, but may
+ // provide some relevant details about participants joining
+ // choreography and then leaving.
+ }
+
+ if (pendingStarted) {
+ m_notifier.sessionStarted(corrSess);
+ }
+ }
+
+ return(corrSess);
+ }
+
+ /**
+ * This method removes a session instance from the session manager.
+ *
+ * @param session The session
+ * @exception SessionManagerException Failed to remove session
+ */
+ @Override
+ public void removeSession(Session session) throws SessionManagerException {
+
+ synchronized(m_correlationSessions) {
+ // TODO: May want to make this configurable, because we
+ // could just rely on the tracker events from each participant
+ // to inform us when they have completed
+
+ CorrelationSessionImpl corrSess=
+ getCorrelationSessionImpl(session.getPrimaryIdentities());
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Remove session '"+session+
+ "': correlation session="+corrSess);
+ }
+
+ if (corrSess != null) {
+ corrSess.disassociateServiceSession(session);
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Removing session '"+session+
+ "' from pending identification list");
+ }
+
+ // Remove from pending identification list just in case
+ m_pendingIdentification.remove(session);
+
+ if (corrSess.getNumberOfServiceSessions() == 0) {
+ pendingClose(corrSess);
+ }
+ }
+ }
+
+ super.removeSession(session);
+ }
+
+ /**
+ * This method removes the supplied correlation session.
+ *
+ * @param cs The correlation session
+ */
+ public void pendingClose(CorrelationSessionImpl cs) {
+
+ logger.info("Moving correlation session to pending close list: "+
+ cs);
+
+ m_pendingCloseSessions.add(cs);
+
+ //m_correlationSessions.remove(corrSess);
+
+ //m_notifier.sessionFinished(corrSess);
+ //synchronized(m_correlationSessions) {
+ //m_correlationSessions.remove(cs);
+ //}
+ }
+
+ /**
+ * This method returns the correlation session implementation
+ * associated with the supplied session. If more than one
+ * correlation session is found to have the same identity
+ * information, then a merge of the correlation sessions will be
+ * performed.
+ *
+ * @param session The service session
+ * @return The correlation session
+ */
+ protected CorrelationSessionImpl getCorrelationSessionImpl(java.util.Collection<Identity> ids) {
+ CorrelationSessionImpl ret=null;
+
+ synchronized(m_correlationSessions) {
+ // TODO: Get correlation session for primary identities
+ // associated with the supplied session. If none, then
+ // create one - not sure how we get the choreography
+ // description??? - if more than one, then we need to
+ // merge the correlation sessions
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Number of sessions="+m_correlationSessions.size());
+ }
+
+ Vector sessions=new Vector();
+ for (int i=0; i < m_correlationSessions.size(); i++) {
+ CorrelationSessionImpl cs=(CorrelationSessionImpl)
+ m_correlationSessions.get(i);
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Checking correlation session="+cs);
+ }
+
+ if (cs.isIdentifiedBy(ids)) {
+ sessions.add(cs);
+ }
+ }
+
+ if (sessions.size() > 1) {
+
+ logger.severe("MULTIPLE CORRELATION SESSIONS DETECTED - currently not handled");
+
+ // Merge sessions
+
+ // TODO: MERGING
+
+ } else if (sessions.size() == 1) {
+ ret = (CorrelationSessionImpl)sessions.get(0);
+ }
+ }
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Returning correlation session="+ret);
+ }
+
+ return(ret);
+ }
+
+ /**
+ * This method returns the list of correlation sessions.
+ *
+ * @return The correlation sessions
+ */
+ public java.util.List getCorrelationSessions() {
+ return(m_correlationSessions);
+ }
+
+ /**
+ * This method associates the supplied choreography with the
+ * service name.
+ *
+ * @param serviceName The service name
+ * @param cdlpack The choreography
+ */
+ public void registerServiceChoreography(String serviceName,
+ org.pi4soa.cdl.Package cdlpack) {
+ m_serviceChoreographies.put(serviceName, cdlpack);
+ }
+
+ /**
+ * This method disassociates the supplied choreography from the
+ * service name.
+ *
+ * @param serviceName The service name
+ * @param cdlpack The choreography
+ */
+ public void unregisterServiceChoreography(String serviceName,
+ org.pi4soa.cdl.Package cdlpack) {
+ m_serviceChoreographies.remove(serviceName);
+ }
+
+ /**
+ * This method returns the choreography description associated
+ * with the supplied service name. If no choreography description
+ * can be located, then a null is returned.
+ *
+ * @param serviceName The service name
+ * @return The choreography, or null if not found
+ */
+ public org.pi4soa.cdl.Package getChoreographyForService(String serviceName) {
+ return((org.pi4soa.cdl.Package)m_serviceChoreographies.get(serviceName));
+ }
+
+ private Vector m_correlationSessions=new Vector();
+ private Hashtable m_serviceChoreographies=new Hashtable();
+ private Vector m_pendingIdentification=new Vector();
+ private Vector m_pendingCloseSessions=new Vector();
+ private CorrelationNotifier m_notifier=null;
+
+ private static Logger logger = Logger.getLogger("org.pi4soa.service.correlator.impl");
+}
Added: branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/ServiceCorrelatorImpl.java
===================================================================
--- branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/ServiceCorrelatorImpl.java (rev 0)
+++ branches/experimental/2.0.x/tools/plugins/org.savara.tools.monitor/src/java/org/savara/tools/monitor/correlator/ServiceCorrelatorImpl.java 2011-02-16 21:16:53 UTC (rev 679)
@@ -0,0 +1,544 @@
+/*
+ * Copyright 2005 Pi4 Technologies Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ * Change History:
+ * Aug 25, 2005 : Initial version created by gary
+ */
+package org.savara.tools.monitor.correlator;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Enumeration;
+import java.util.Hashtable;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.namespace.QName;
+
+import org.pi4soa.cdl.ParticipantType;
+import org.pi4soa.common.util.NamesUtil;
+import org.pi4soa.common.xml.XMLUtils;
+import org.pi4soa.service.Identity;
+import org.pi4soa.service.ServiceException;
+import org.pi4soa.service.behavior.ServiceDescription;
+import org.pi4soa.service.behavior.projection.BehaviorProjection;
+import org.pi4soa.service.correlator.CorrelationSession;
+import org.pi4soa.service.correlator.ServiceCorrelator;
+import org.pi4soa.service.correlator.ServiceCorrelatorListener;
+import org.pi4soa.service.monitor.DefaultMonitorConfiguration;
+import org.pi4soa.service.monitor.ServiceMonitor;
+import org.pi4soa.service.monitor.ServiceMonitorFactory;
+import org.pi4soa.service.tracker.TrackerEvent;
+import org.pi4soa.service.tracker.TrackerRecord;
+
+/**
+ * This class implements the service correlator interface.
+ *
+ */
+public class ServiceCorrelatorImpl implements ServiceCorrelator {
+
+ /**
+ * The default constructor for the service correlator
+ * implementation.
+ *
+ */
+ public ServiceCorrelatorImpl() {
+ initialize();
+ }
+
+ /**
+ * This method initializes the service correlator.
+ *
+ */
+ protected void initialize() {
+
+ m_notifier = new CorrelationNotifier();
+
+ m_sessionManager = new CorrelationSessionManager(m_notifier);
+
+ m_correlatingServiceTracker =
+ new CorrelatingServiceTracker(m_sessionManager, m_notifier);
+ }
+
+ /**
+ * This method registers a choreography description with the
+ * service correlator, to be informed when service tracker
+ * activities associated with the description are correlated.
+ *
+ * @param cdl The choreography description
+ * @exception ServiceException Failed to register
+ */
+ public void register(org.pi4soa.cdl.Package cdl)
+ throws ServiceException {
+
+ synchronized(m_choreographyDescriptions) {
+ if (m_choreographyDescriptions.containsKey(cdl)) {
+ throw new ServiceException("Choreography " +
+ "description already registered");
+ }
+
+ // Generate the endpoint projections and register them
+ // with the monitor (in its service repository)
+ Vector sds=new Vector();
+ java.util.List participants=
+ cdl.getTypeDefinitions().getParticipantTypes();
+
+ java.util.Iterator iter=participants.iterator();
+ while (iter.hasNext()) {
+ ParticipantType partType=(ParticipantType)
+ iter.next();
+
+ // CDL File path not provided, as not required
+ // for a service description that is only being
+ // used for correlation
+ ServiceDescription sd=
+ BehaviorProjection.projectServiceDescription(cdl,
+ partType, null);
+
+ DefaultMonitorConfiguration config=
+ new DefaultMonitorConfiguration();
+ config.setServiceTracker(m_correlatingServiceTracker);
+ config.setSessionManager(m_sessionManager);
+
+ ServiceMonitor serviceMonitor =
+ ServiceMonitorFactory.getServiceMonitor(config);
+ serviceMonitor.getConfiguration().getServiceRepository().
+ addServiceDescription(sd);
+
+ logger.info("Registering service monitor for '"+
+ sd.getFullyQualifiedName()+"'");
+
+ m_serviceMonitors.put(sd.getFullyQualifiedName(),
+ serviceMonitor);
+
+ m_serviceMonitors.put(getLocatedProtocolName(cdl, partType), serviceMonitor);
+
+ sds.add(sd);
+
+ m_sessionManager.registerServiceChoreography(
+ sd.getFullyQualifiedName(), cdl);
+
+ ByteArrayOutputStream baos=new ByteArrayOutputStream();
+ try {
+ org.pi4soa.service.util.ServiceDescriptionManager.save(
+ sd, baos);
+ logger.info(baos.toString());
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // Register choreography description
+ m_choreographyDescriptions.put(cdl, sds);
+ }
+
+ // Notify any registered listeners
+ m_notifier.choreographyDescriptionRegistered(cdl);
+ }
+
+ protected String getLocatedProtocolName(org.pi4soa.cdl.Package cdl, ParticipantType partType) {
+ String ret=null;
+
+ ret = new QName(cdl.getTargetNamespace(), cdl.getName()).toString();
+
+ ret += "@"+XMLUtils.getLocalname(partType.getName());
+
+ return(ret);
+ }
+
+ /**
+ * This method unregisters the supplied choreography
+ * description, to ignore further correlation situations
+ * that occur related to this description.
+ *
+ * @param cdl The choreography description
+ * @exception ServiceException Failed to register
+ */
+ public void unregister(org.pi4soa.cdl.Package cdl)
+ throws ServiceException {
+ boolean unregistered=false;
+
+ synchronized(m_choreographyDescriptions) {
+ if (m_choreographyDescriptions.containsKey(cdl)) {
+
+ // Remove the endpoint projections, associated with
+ // the supplied choreography description and unregister them
+ // from the monitor's service repository
+ Vector sds=(Vector)m_choreographyDescriptions.get(cdl);
+
+ for (int i=0; i < sds.size(); i++) {
+ ServiceDescription sd=(ServiceDescription)sds.get(i);
+
+ ServiceMonitor serviceMonitor=(ServiceMonitor)
+ m_serviceMonitors.get(sd.getFullyQualifiedName());
+
+ serviceMonitor.getConfiguration().getServiceRepository().
+ removeServiceDescription(sd);
+
+ // TODO: Need to close down monitor
+
+ m_serviceMonitors.remove(sd.getFullyQualifiedName());
+
+ m_sessionManager.unregisterServiceChoreography(
+ sd.getFullyQualifiedName(), cdl);
+ }
+
+ m_choreographyDescriptions.remove(cdl);
+
+ unregistered = true;
+ }
+ }
+
+ if (unregistered) {
+ // Notify any registered listeners
+ m_notifier.choreographyDescriptionRegistered(cdl);
+ }
+ }
+
+ /**
+ * This method returns the list of choreography descriptions
+ * registered with the service correlator.
+ *
+ * @return The list of choreography descriptions
+ */
+ public org.pi4soa.cdl.Package[] getChoreographyDescriptions() {
+ org.pi4soa.cdl.Package[] ret=
+ new org.pi4soa.cdl.Package[m_choreographyDescriptions.size()];
+
+ Enumeration iter=m_choreographyDescriptions.elements();
+ int index=0;
+ while (iter.hasMoreElements()) {
+ ret[index++] = (org.pi4soa.cdl.Package)iter.nextElement();
+ }
+
+ return(ret);
+ }
+
+ /**
+ * This method returns the current list of correlated
+ * sessions associated with the specified choreography
+ * description.
+ *
+ * @param cdl The choreography description
+ * @return The list of correlated sessions
+ */
+ public CorrelationSession[] getCorrelationSessions(org.pi4soa.cdl.Package cdl) {
+ CorrelationSession[] ret=null;
+ java.util.Vector tmp=new java.util.Vector();
+
+ java.util.List list=m_sessionManager.getCorrelationSessions();
+ for (int i=0; i < list.size(); i++) {
+ CorrelationSession sess=(CorrelationSession)list.get(i);
+
+ if (sess.getChoreographyDescription() == cdl) {
+ tmp.add(sess);
+ }
+ }
+
+ ret = new CorrelationSession[tmp.size()];
+ tmp.copyInto(ret);
+
+ return(ret);
+ }
+
+ /**
+ * This method returns the correlated
+ * session associated with the specified choreography
+ * description and set of identities.
+ *
+ * @param cdl The choreography description
+ * @return The list of correlated sessions
+ * @exception ServiceException Failed to get single session for
+ * supplied choreography description and ids
+ */
+ public CorrelationSession getCorrelationSession(org.pi4soa.cdl.Package cdl,
+ java.util.List<Identity> ids) throws ServiceException {
+ CorrelationSession ret=null;
+
+ CorrelationSession[] sessions=getCorrelationSessions(cdl);
+ for (int i=0; sessions != null &&
+ i < sessions.length; i++) {
+ if (sessions[i].isIdentifiedBy(ids)) {
+
+ if (ret == null) {
+ ret = sessions[i];
+ } else {
+ throw new ServiceException("Multiple correlation "+
+ "sessions with same identity");
+ }
+ }
+ }
+
+ return(ret);
+ }
+
+ /**
+ * This method adds a listener for notifications regarding
+ * correlated sessions.
+ *
+ * @param l The listener
+ */
+ public void addServiceCorrelatorListener(ServiceCorrelatorListener l) {
+ m_notifier.addServiceCorrelatorListener(l);
+ }
+
+ /**
+ * This method removes a listener for notifications regarding
+ * correlated sessions.
+ *
+ * @param l The listener
+ */
+ public void removeServiceCorrelatorListener(ServiceCorrelatorListener l) {
+ m_notifier.removeServiceCorrelatorListener(l);
+ }
+
+ /**
+ * This method is invoked to handle the supplied service
+ * tracker record.
+ *
+ * @param record The record
+ */
+ public void handleTrackerRecord(TrackerRecord record) {
+
+ // Obtain message related tracker events
+ if (record != null && record.getTrackerEvents() != null) {
+ ServiceMonitor serviceMonitor=null;
+
+ logger.info("Handle correlation record: "+record.toXML());
+
+ if (NamesUtil.isSet(record.getServiceDescriptionName())) {
+ serviceMonitor = (ServiceMonitor)
+ m_serviceMonitors.get(record.getServiceDescriptionName());
+
+ if (serviceMonitor == null) {
+ logger.info("Could not find Service Monitor for '"+
+ record.getServiceDescriptionName()+"'");
+ }
+ } else {
+ logger.info("Tracker record is not associated with a service");
+ }
+
+ for (int i=0; serviceMonitor != null &&
+ i < record.getTrackerEvents().length; i++) {
+ TrackerEvent event=record.getTrackerEvents()[i];
+
+ if (event.getEventType() == null) {
+
+ logger.warning("Tracker event '"+event+
+ "' has no event type");
+
+ } else if (event.getEventType().equals(TrackerEvent.RECEIVED_MESSAGE)) {
+
+ try {
+ serviceMonitor.messageReceived(event.getMessage());
+ } catch(Exception se) {
+ logger.log(Level.SEVERE,
+ "Failed to handle 'messageReceived' tracker event '"+
+ event.toXML()+"': "+se, se);
+
+ // Report error to notifier
+ java.util.List<Identity> ids=record.getPrimaryIdentities();
+
+ if ((ids == null || ids.size() == 0) &&
+ record.getSessionIdentity() != null) {
+ ids = new java.util.Vector<Identity>();
+ ids.add(record.getSessionIdentity());
+ }
+
+ CorrelationSessionImpl cs=
+ m_sessionManager.getCorrelationSessionImpl(ids);
+
+ m_notifier.error("Correlater detected problem: "+se.getMessage(),
+ event.getException(),
+ cs, record.getServiceDescriptionName());
+ }
+ } else if (event.getEventType().equals(TrackerEvent.SENT_MESSAGE)) {
+
+ try {
+ serviceMonitor.messageSent(event.getMessage());
+ } catch(Exception se) {
+ logger.log(Level.SEVERE,
+ "Failed to handle 'messageSent' tracker event '"+
+ event.toXML()+"': "+se, se);
+
+ // Report error to notifier
+ java.util.List<Identity> ids=record.getPrimaryIdentities();
+
+ if ((ids == null || ids.size() == 0) &&
+ record.getSessionIdentity() != null) {
+ ids = new java.util.Vector<Identity>();
+ ids.add(record.getSessionIdentity());
+ }
+
+ CorrelationSessionImpl cs=
+ m_sessionManager.getCorrelationSessionImpl(ids);
+
+ m_notifier.error("Correlater detected problem: "+se.getMessage(),
+ event.getException(),
+ cs, record.getServiceDescriptionName());
+ }
+ } else if (event.getEventType().equals(TrackerEvent.UNEXPECTED_MESSAGE)) {
+
+ java.util.List<Identity> ids=record.getPrimaryIdentities();
+
+ if ((ids == null || ids.size() == 0) &&
+ record.getSessionIdentity() != null) {
+ ids = new java.util.Vector<Identity>();
+ ids.add(record.getSessionIdentity());
+ }
+
+ CorrelationSessionImpl cs=
+ m_sessionManager.getCorrelationSessionImpl(ids);
+
+ m_notifier.unexpectedMessage(event.getMessage(),
+ cs, record.getServiceDescriptionName());
+
+ } else if (event.getEventType().equals(TrackerEvent.ERROR)) {
+
+ java.util.List<Identity> ids=record.getPrimaryIdentities();
+
+ if ((ids == null || ids.size() == 0) &&
+ record.getSessionIdentity() != null) {
+ ids = new java.util.Vector<Identity>();
+ ids.add(record.getSessionIdentity());
+ }
+
+ CorrelationSessionImpl cs=
+ m_sessionManager.getCorrelationSessionImpl(ids);
+
+ m_notifier.error(event.getDetails(),
+ event.getException(),
+ cs, record.getServiceDescriptionName());
+
+ } else if (event.getEventType().equals(TrackerEvent.WARNING)) {
+
+ java.util.List<Identity> ids=record.getPrimaryIdentities();
+
+ if ((ids == null || ids.size() == 0) &&
+ record.getSessionIdentity() != null) {
+ ids = new java.util.Vector<Identity>();
+ ids.add(record.getSessionIdentity());
+ }
+
+ CorrelationSessionImpl cs=
+ m_sessionManager.getCorrelationSessionImpl(ids);
+
+ m_notifier.warning(event.getDetails(),
+ event.getException(),
+ cs, record.getServiceDescriptionName());
+
+ } else if (event.getEventType().equals(TrackerEvent.INFORMATION)) {
+
+ java.util.List<Identity> ids=record.getPrimaryIdentities();
+
+ if ((ids == null || ids.size() == 0) &&
+ record.getSessionIdentity() != null) {
+ ids = new java.util.Vector<Identity>();
+ ids.add(record.getSessionIdentity());
+ }
+
+ CorrelationSessionImpl cs=
+ m_sessionManager.getCorrelationSessionImpl(ids);
+
+ m_notifier.information(event.getDetails(),
+ cs, record.getServiceDescriptionName());
+
+ } else if (event.getEventType().equals(TrackerEvent.UNHANDLED_EXCEPTION)) {
+
+ java.util.List<Identity> ids=record.getPrimaryIdentities();
+
+ if ((ids == null || ids.size() == 0) &&
+ record.getSessionIdentity() != null) {
+ ids = new java.util.Vector<Identity>();
+ ids.add(record.getSessionIdentity());
+ }
+
+ CorrelationSessionImpl cs=
+ m_sessionManager.getCorrelationSessionImpl(ids);
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("Report as error: "+event.getDetails()+
+ " with exception "+event.getException());
+ }
+
+ m_notifier.error(event.getDetails(),
+ event.getException(),
+ cs, record.getServiceDescriptionName());
+
+ } else if (event.getEventType().equals(TrackerEvent.SERVICE_FINISHED)) {
+
+ // TODO: May be want to have a configuration
+ // parameter to indicate whether these tracker
+ // events should be used to close down service
+ // sessions - may be better to only use the
+ // observable exchanges to achieve this?
+ java.util.List<Identity> ids=record.getPrimaryIdentities();
+
+ if ((ids == null || ids.size() == 0) &&
+ record.getSessionIdentity() != null) {
+ ids = new java.util.Vector<Identity>();
+ ids.add(record.getSessionIdentity());
+ }
+
+ CorrelationSessionImpl cs=
+ m_sessionManager.getCorrelationSessionImpl(ids);
+
+ // If session not returned, then assume that
+ // it has already been tied up by the internal
+ // state of the monitored sessions
+ if (cs != null) {
+ cs.disassociateServiceSession(
+ record.getServiceDescriptionName());
+
+ // Check if correlation session has completed
+ if (cs.getNumberOfServiceSessions() == 0) {
+ m_sessionManager.pendingClose(cs);
+ }
+ }
+ } else if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("Tracker event type not handled by correlator: "+
+ event.getEventType());
+ }
+ }
+ }
+
+ // TODO: Could possibly use the 'serviceFinished'
+ // events to indicate that the participant (service)
+ // will no longer be sending further messages -
+ // how can we use this information? Possibly ensuring
+ // that the correlated participant also indicates
+ // the end? Possibly send to the 'correlated session'
+ // which can then enable the information to be
+ // presented
+ }
+
+ /**
+ * This method returns the correlation session manager.
+ *
+ * @return The correlation session manager
+ */
+ protected CorrelationSessionManager getCorrelationSessionManager() {
+ return(m_sessionManager);
+ }
+
+ private static Logger logger = Logger.getLogger("org.pi4soa.service.correlator.impl");
+
+ private Hashtable m_choreographyDescriptions=new Hashtable();
+ private Hashtable m_serviceMonitors=new Hashtable();
+ private CorrelatingServiceTracker m_correlatingServiceTracker=null;
+ private CorrelationSessionManager m_sessionManager=null;
+ private CorrelationNotifier m_notifier=null;
+}
More information about the savara-commits
mailing list