[jboss-cvs] JBoss Messaging SVN: r6534 - in tags: JBossMessaging_1_4_0_SP3_CP03_1456 and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Apr 23 07:05:20 EDT 2009
Author: gaohoward
Date: 2009-04-23 07:05:20 -0400 (Thu, 23 Apr 2009)
New Revision: 6534
Added:
tags/JBossMessaging_1_4_0_SP3_CP03_1456/
Modified:
tags/JBossMessaging_1_4_0_SP3_CP03_1456/.classpath
tags/JBossMessaging_1_4_0_SP3_CP03_1456/build-messaging.xml
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/FailoverCommandCenter.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossConnection.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossConnectionFactory.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossMessageConsumer.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossMessageProducer.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossSession.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/container/ClientConsumer.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/ServerPeer.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/wireformat/RequestSupport.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java
tags/JBossMessaging_1_4_0_SP3_CP03_1456/tests/build.properties
Log:
make a branch for message stuck issue
Copied: tags/JBossMessaging_1_4_0_SP3_CP03_1456 (from rev 5664, tags/JBossMessaging_1_4_0_SP3_CP03)
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/.classpath
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/.classpath 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/.classpath 2009-04-23 11:05:20 UTC (rev 6534)
@@ -19,7 +19,7 @@
<classpathentry kind="src" path="docs/examples/topic/src"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src"/>
- <classpathentry kind="lib" path="thirdparty/oswego-concurrent/lib/concurrent.jar" sourcepath="/extra/work-clebert/concurrent/concurrent/src"/>
+ <classpathentry kind="lib" path="thirdparty/oswego-concurrent/lib/concurrent.jar" sourcepath="/home/howard/projects/jboss/messaging/1.4/sp3_cp03/thirdparty/oswego-concurrent/lib/concurrent-src.zip"/>
<classpathentry kind="lib" path="thirdparty/jgroups/lib/jgroups.jar"/>
<classpathentry kind="lib" path="thirdparty/apache-log4j/lib/log4j.jar"/>
<classpathentry kind="lib" path="thirdparty/junit/lib/junit.jar"/>
@@ -39,7 +39,7 @@
<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/retrotranslator-runtime.jar"/>
<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/retrotranslator-transformer.jar"/>
<classpathentry kind="lib" path="thirdparty/trove/lib/trove.jar"/>
- <classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar"/>
+ <classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar" sourcepath="/home/howard/projects/jboss/remoting/2.2.2.SP10/src/main"/>
<classpathentry kind="lib" path="thirdparty/jboss/jbossts14/lib/jbossjta.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/build-messaging.xml
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/build-messaging.xml 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/build-messaging.xml 2009-04-23 11:05:20 UTC (rev 6534)
@@ -389,6 +389,7 @@
<include name="org/jboss/messaging/core/impl/message/**/*.class"/>
<include name="org/jboss/messaging/core/contract/**/*.class"/>
<include name="org/jboss/jms/server/remoting/**/*.class"/>
+ <include name="org/jboss/jms/debug/**/*.class"/>
</fileset>
<fileset dir="${build.etc}">
<include name="VERSION"/>
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -9,6 +9,9 @@
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.debug.ClientFailoverTracker;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.CreateConnectionResult;
import org.jboss.logging.Logger;
@@ -71,8 +74,15 @@
JMSRemotingConnection remotingConnection)
throws Exception
{
+
+ ClientFailoverTracker tracker = TrackerFactory.getClientFailoverTracker();
log.debug("failure detected by " + source, reason);
-
+
+ tracker.report("failure detected by " + source + " rmoting " + remotingConnection +
+ " client: " + remotingConnection.getRemotingClient().getSessionId(), reason, true);
+ tracker.report("connection, client id: " + state.getClientID() + " server id: " +
+ state.getServerID() + "conn id: " + state.getDelegate().getID() +
+ " conn remoting " + ((ClientConnectionDelegate)state.getDelegate()).getRemotingConnection(), null, false);
// generate a FAILURE_DETECTED event
broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILURE_DETECTED, source));
@@ -105,11 +115,15 @@
failoverEvent = FailoverEvent.FAILOVER_ALREADY_COMPLETED;
+ tracker.report("somebody already done failover", null, false);
//Return true since failover already completed ok
return true;
}
+ tracker.report("setting remoting connection failed. remoting session id: " +
+ remotingConnection.getRemotingClient().getSessionId(), null, false);
remotingConnection.setFailed();
+ tracker.report("done set failed.", null, false);
}
// Note - failover doesn't occur until _after_ the above check - so the next comment
@@ -121,7 +135,7 @@
broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, this));
int failedNodeID = state.getServerID();
-
+
ConnectionFactoryDelegate clusteredDelegate = state.getClusteredConnectionFactoryDelegate();
// try recreating the connection
@@ -141,10 +155,15 @@
// recursively synchronize state
ClientConnectionDelegate newDelegate = (ClientConnectionDelegate)res.getDelegate();
+ tracker.report("new conn Delegate got: " + newDelegate.getID() + " clientID: (cannot call it)" +
+ " server ID: " + newDelegate.getServerID(), null, false);
+
log.trace("Synchronizing state");
state.getDelegate().synchronizeWith(newDelegate);
log.trace("Synchronized state");
+ tracker.report("state synchronized", null, false);
+
//Now restart the connection if appropriate
//Note! we mus start the connection while the valve is still closed
//Otherwise If a consumer closing is waiting on failover to complete
@@ -157,7 +176,9 @@
if (state.isStarted())
{
log.trace("Starting new connection");
+ tracker.report("restart the new connection" , null, false);
newDelegate.startAfterFailover();
+ tracker.report("restarted" , null, false);
log.trace("Started new connection");
}
@@ -165,10 +186,12 @@
remotingConnection.getConnectionListener().getJMSExceptionListener() != null)
{
log.trace("Adding Exception Listener to new connection");
+ tracker.report("setting exception listener on new delegate: " + newDelegate.getID(), null, false);
newDelegate.setExceptionListener(remotingConnection.getConnectionListener().getJMSExceptionListener());
}
log.trace("Opening valve");
valve.open();
+ tracker.report("valve opened", null, false);
log.trace("Opened valve");
valveOpened = true;
@@ -179,12 +202,14 @@
log.trace("failureDetected() complete");
+ tracker.report("failover end with: " + failoverSuccessful, null, false);
return failoverSuccessful;
}
catch (Exception e)
{
log.error("Failover failed", e);
+ tracker.report("failover failed." , e, false);
throw e;
}
finally
@@ -193,6 +218,7 @@
{
log.trace("finally opening valve");
valve.open();
+ tracker.report("finally valve opened.", null, false);
log.trace("valve opened");
}
@@ -200,14 +226,16 @@
{
log.debug(this + " completed successful failover");
broadcastFailoverEvent(new FailoverEvent(failoverEvent, this));
+ tracker.report("finally failover ok", null, false);
}
else
{
log.debug(this + " aborted failover");
+ tracker.report("finally abort failover", null, false);
ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)state.getDelegate();
connDelegate.closing(-1);
connDelegate.close();
-
+ tracker.report("aborted conn: " + connDelegate.getID(), null, false);
broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_FAILED, this));
}
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossConnection.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/client/JBossConnection.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossConnection.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -45,8 +45,11 @@
import javax.jms.XATopicSession;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.SessionDelegate;
@@ -130,6 +133,11 @@
{
delegate.closing(-1);
delegate.close();
+
+ JMSObjectTracker tracker = TrackerFactory.getClientJMSObjTracker();
+
+ tracker.connectionEvent(((ClientConnectionDelegate)delegate).getID(), null, "closed.", false);
+ tracker.removeConnection(((ClientConnectionDelegate)delegate).getID());
}
public ConnectionConsumer createConnectionConsumer(Destination destination,
@@ -268,6 +276,11 @@
SessionDelegate sessionDelegate =
delegate.createSessionDelegate(transacted, acknowledgeMode, isXA);
+
+ JMSObjectTracker tracker = TrackerFactory.getClientJMSObjTracker();
+
+ tracker.sessionEvent(((ClientSessionDelegate)sessionDelegate).getID(), null, " created. isXA: " + isXA + " type: " + type, true);
+
return new JBossSession(sessionDelegate, type);
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossConnectionFactory.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/client/JBossConnectionFactory.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossConnectionFactory.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -39,6 +39,8 @@
import javax.naming.NamingException;
import javax.naming.Reference;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.CreateConnectionResult;
import org.jboss.jms.referenceable.SerializableObjectRefAddr;
@@ -62,6 +64,7 @@
private static final Logger log = Logger.getLogger(JBossConnectionFactory.class);
+
// Static ---------------------------------------------------------------------------------------
// Attributes -----------------------------------------------------------------------------------
@@ -203,7 +206,11 @@
// connection factory and the client code version
CreateConnectionResult res = delegate.createConnectionDelegate(username, password, -1);
-
+
+ JMSObjectTracker tracker = TrackerFactory.getClientJMSObjTracker();
+
+ tracker.connectionEvent(res.getDelegate().getID(), null, "created. user: " + username + " isXA: " + isXA + " type: " + type, true);
+
return new JBossConnection(res.getDelegate(), type);
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossMessageConsumer.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/client/JBossMessageConsumer.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossMessageConsumer.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -32,6 +32,9 @@
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
+import org.jboss.jms.client.delegate.ClientConsumerDelegate;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.ConsumerDelegate;
/**
@@ -49,7 +52,7 @@
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
-
+
protected ConsumerDelegate delegate;
// Constructors --------------------------------------------------
@@ -93,8 +96,14 @@
public void close() throws JMSException
{
+
+ JMSObjectTracker tracker = TrackerFactory.getClientJMSObjTracker();
+
+ tracker.consumerEvent(((ClientConsumerDelegate)delegate).getID(), null, "closing.", false);
delegate.closing(-1);
delegate.close();
+ tracker.consumerEvent(((ClientConsumerDelegate)delegate).getID(), null, "closed.", false);
+ tracker.removeConsumer(((ClientConsumerDelegate)delegate).getID());
}
// QueueReceiver implementation ----------------------------------
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/client/JBossMessageProducer.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossMessageProducer.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -33,8 +33,14 @@
import javax.jms.Topic;
import javax.jms.TopicPublisher;
+import org.jboss.jms.client.delegate.ClientProducerDelegate;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.MessageTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.ProducerDelegate;
import org.jboss.jms.destination.JBossDestination;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.message.MessageProxy;
import org.jboss.logging.Logger;
/**
@@ -128,6 +134,9 @@
{
delegate.closing(-1);
delegate.close();
+ JMSObjectTracker tracker = TrackerFactory.getClientJMSObjTracker();
+ tracker.producerEvent(((ClientProducerDelegate)delegate).getID(), null, "closed", false);
+ tracker.removeProducer(((ClientProducerDelegate)delegate).getID());
}
public void send(Message message) throws JMSException
@@ -160,8 +169,19 @@
{
throw new InvalidDestinationException("Not a JBossDestination:" + destination);
}
+
+ delegate.send((JBossDestination)destination, m, deliveryMode, priority, timeToLive);
- delegate.send((JBossDestination)destination, m, deliveryMode, priority, timeToLive);
+ //some tests pass foreign messages, so check it
+ MessageTracker mTracker = TrackerFactory.getClientMessageTracker();
+ if (m instanceof MessageProxy)
+ {
+ mTracker.messageEvent("" + ((MessageProxy)m).getMessage().getMessageID(), "message sent. " + ((ClientProducerDelegate)delegate).getID(), false);
+ }
+ else
+ {
+ mTracker.messageEvent("" + m.getJMSMessageID(), "alien message sent. " + ((ClientProducerDelegate)delegate).getID(), false);
+ }
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/client/JBossSession.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/JBossSession.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -53,8 +53,13 @@
import javax.jms.XATopicSession;
import javax.transaction.xa.XAResource;
+import org.jboss.jms.client.delegate.ClientConsumerDelegate;
+import org.jboss.jms.client.delegate.ClientProducerDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.SessionState;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.BrowserDelegate;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.ProducerDelegate;
@@ -171,8 +176,13 @@
public void close() throws JMSException
{
+ JMSObjectTracker tracker = TrackerFactory.getClientJMSObjTracker();
+ tracker.sessionEvent(((ClientSessionDelegate)delegate).getID(), null, " closing. ", false);
delegate.closing(-1);
+ tracker.sessionEvent(((ClientSessionDelegate)delegate).getID(), null, " closing done. ", false);
delegate.close();
+ tracker.sessionEvent(((ClientSessionDelegate)delegate).getID(), null, " closed. ", false);
+ tracker.removeSession(((ClientSessionDelegate)delegate).getID());
}
public void recover() throws JMSException
@@ -207,6 +217,10 @@
}
ProducerDelegate producerDelegate = delegate.createProducerDelegate((JBossDestination)d);
+
+ JMSObjectTracker tracker = TrackerFactory.getClientJMSObjTracker();
+ tracker.producerEvent(((ClientProducerDelegate)producerDelegate).getID(), null, "producer created. sess: " + ((ClientSessionDelegate)delegate).getID(), true);
+
return new JBossMessageProducer(producerDelegate);
}
@@ -236,7 +250,9 @@
ConsumerDelegate cd = delegate.
createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false, true);
-
+
+ JMSObjectTracker tracker = TrackerFactory.getClientJMSObjTracker();
+ tracker.consumerEvent(((ClientConsumerDelegate)cd).getID(), null, "created. ", true);
return new JBossMessageConsumer(cd);
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/client/container/ClientConsumer.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/container/ClientConsumer.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -30,6 +30,8 @@
import javax.jms.MessageListener;
import javax.jms.Session;
+import org.jboss.jms.debug.MessageTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.Cancel;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.DefaultCancel;
@@ -66,7 +68,6 @@
private static final int WAIT_TIMEOUT = 30000;
-
static
{
log = Logger.getLogger(ClientConsumer.class);
@@ -922,6 +923,9 @@
//Add it to the buffer
buffer.addLast(proxy, proxy.getJMSPriority());
+
+ MessageTracker mTracker = TrackerFactory.getClientMessageTracker();
+ mTracker.messageEvent("" + proxy.getMessage().getMessageID(), "message arrives at client", false);
lastDeliveryId = proxy.getDeliveryId();
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -35,6 +35,8 @@
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.state.HierarchicalState;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.IDBlock;
import org.jboss.jms.delegate.SessionDelegate;
@@ -75,7 +77,7 @@
private static final Logger log = Logger.getLogger(ClientConnectionDelegate.class);
// Attributes -----------------------------------------------------------------------------------
-
+
private int serverID;
private transient JMSRemotingConnection remotingConnection;
@@ -151,13 +153,19 @@
RequestSupport req = new CloseRequest(id, version);
doInvoke(client, req);
+
}
public long closing(long sequence) throws JMSException
{
RequestSupport req = new ClosingRequest(sequence, id, version);
- return ((Long)doInvoke(client, req)).longValue();
+ long result = ((Long)doInvoke(client, req)).longValue();
+
+ JMSObjectTracker tracker = TrackerFactory.getClientJMSObjTracker();
+ tracker.connectionEvent(getID(), null, "Closing returned from server. " + result, false);
+
+ return result;
}
// ConnectionDelegate implementation ------------------------------------------------------------
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -31,6 +31,8 @@
import org.jboss.jms.client.container.JMSClientVMIdentifier;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.CreateConnectionResult;
import org.jboss.jms.delegate.TopologyResult;
@@ -194,6 +196,12 @@
connectionDelegate.setRemotingConnection(remotingConnection);
connectionDelegate.setVersionToUse(version);
+
+ JMSObjectTracker tracker = TrackerFactory.getClientJMSObjTracker();
+
+ tracker.connectionEvent(connectionDelegate.getID(), null, "conn delegate created: " +
+ "remoting id: " + remotingConnection.getRemotingClient().getSessionId() +
+ "clientVmId: " + JMSClientVMIdentifier.instance, false);
}
else
{
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -26,6 +26,8 @@
import java.util.HashMap;
import java.util.Map;
+import org.jboss.jms.debug.ClientFailoverTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.wireformat.JMSWireFormat;
import org.jboss.logging.Logger;
@@ -346,6 +348,8 @@
{
log.trace(this + " stop");
+ ClientFailoverTracker tracker = TrackerFactory.getClientFailoverTracker();
+
// explicitly remove the callback listener, to avoid race conditions on server
// (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
@@ -355,6 +359,7 @@
}
catch(Throwable ignore)
{
+ tracker.report("failed removing callback listeer" + client.getSessionId(), ignore, false);
// very unlikely to get an exception on a local remove (I suspect badly designed API),
// but we're failed anyway, so we don't care too much
@@ -367,25 +372,30 @@
try
{
+ tracker.report("Disconnecting client " + client.getSessionId(), null, false);
client.disconnect();
}
catch (Throwable ignore)
- {
+ {
+ tracker.report("failed to disconnect."+ client.getSessionId(), ignore, false);
log.trace(this + " failed to disconnect the client", ignore);
}
try
{
- onewayClient.disconnect();
+ tracker.report("Disconnecting oneway client: " + onewayClient.getSessionId(), null, false);
+ onewayClient.disconnect();
}
catch (Throwable ignore)
- {
+ {
+ tracker.report("failed to disconnect onway" + onewayClient.getSessionId(), ignore, false);
log.trace(this + " failed to disconnect the client", ignore);
}
client = null;
onewayClient = null;
+ tracker.report("remoting client closed", null, false);
log.trace(this + " closed");
}
@@ -420,6 +430,10 @@
*/
public synchronized void setFailed()
{
+
+ ClientFailoverTracker tracker = TrackerFactory.getClientFailoverTracker();
+ tracker.report("call setFail on : " + this, null, false);
+
failed = true;
// Remoting has the bad habit of letting the job of cleaning after a failed connection up to
@@ -429,23 +443,31 @@
try
{
+ tracker.report("setFailed, set 0 timeout on client: " + client.getSessionId(), null, false);
client.setDisconnectTimeout(0);
+ tracker.report("done set 0 timeout", null, false);
}
catch (Throwable ignore)
{
+ tracker.report("error setting 0 timeout on client " + client.getSessionId(), ignore, false);
log.trace(this + " failed to set disconnect timeout", ignore);
}
try
{
+ tracker.report("setFailed, set 0 timeout oneway client: " + onewayClient.getSessionId(), null, false);
onewayClient.setDisconnectTimeout(0);
+ tracker.report("done set 0 timeout oneway", null, false);
}
catch (Throwable ignore)
- {
+ {
+ tracker.report("error setting 0 timeout on oneway client", ignore, false);
log.trace(this + " failed to set disconnect timeout", ignore);
}
+ tracker.report(" stopping " + this, null, false);
stop();
+ tracker.report(" stopped " + this, null, false);
}
/**
@@ -459,7 +481,7 @@
return false;
}
- client.addConnectionListener(listener);
+ client.addConnectionListener(listener, serverLocator.getParameters());
remotingConnectionListener = listener;
return true;
@@ -467,7 +489,7 @@
public synchronized void addPlainConnectionListener(ConnectionListener listener)
{
- client.addConnectionListener(listener);
+ client.addConnectionListener(listener, serverLocator.getParameters());
}
public synchronized void removePlainConnectionListener(ConnectionListener listener)
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/server/ServerPeer.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/ServerPeer.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -1178,7 +1178,8 @@
{
if (sessions.remove(id) == null)
{
- throw new IllegalStateException("Cannot find session with id " + id + " to remove");
+ if (log.isTraceEnabled()) { log.trace("Cannot find session with id " + id + " to remove"); }
+ // throw new IllegalStateException("Cannot find session with id " + id + " to remove");
}
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -31,6 +31,8 @@
import javax.jms.JMSException;
+import org.jboss.jms.debug.ServerFailoverTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.ConnectionEndpoint;
import org.jboss.jms.server.ConnectionManager;
import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
@@ -116,6 +118,14 @@
activeConnectionEndpoints.add(endpoint);
+ ServerFailoverTracker tracker = TrackerFactory.getServerFailoverTracker();
+ if (endpoint instanceof ServerConnectionEndpoint)
+ {
+ tracker.report("register conn: " + ((ServerConnectionEndpoint)endpoint).getId() +
+ "clientVM id: " + jmsClientVMID +
+ "remoting id: " + remotingClientSessionID, null, false);
+ }
+
log.debug("registered connection " + endpoint + " as " +
Util.guidToString(remotingClientSessionID));
}
@@ -144,14 +154,24 @@
public synchronized ConnectionEndpoint unregisterConnection(String jmsClientVMId,
String remotingClientSessionID)
{
+
Map<String, ConnectionEndpoint> endpoints = this.jmsClients.get(jmsClientVMId);
+ ServerFailoverTracker tracker = TrackerFactory.getServerFailoverTracker();
+
+ tracker.report("unregistering connection, clientVM: " + jmsClientVMId +
+ "remoting ID: " + remotingClientSessionID, null, false);
+
if (endpoints != null)
{
ConnectionEndpoint e = endpoints.remove(remotingClientSessionID);
if (e != null)
{
+
+ tracker.report("endpint found: " + e +
+ "remoting ID: " + remotingClientSessionID +
+ "clientVM: " + jmsClientVMId, null, false);
activeConnectionEndpoints.remove(e);
}
@@ -160,13 +180,18 @@
if (endpoints.isEmpty())
{
+ tracker.report("no more ednpoints for client: " + jmsClientVMId, null, false);
jmsClients.remove(jmsClientVMId);
}
remotingSessions.remove(remotingClientSessionID);
+ tracker.report("endpoint removed: " + e, null, false);
return e;
}
+
+ tracker.report("no endpoint to remove for " + jmsClientVMId +
+ " remoting session: " + remotingClientSessionID, null, false);
return null;
}
@@ -218,17 +243,17 @@
* @param t - plan for it to be null!
*/
public void handleConnectionException(Throwable t, Client client)
- {
- if (t instanceof ClientDisconnectedException)
- {
- // This is OK
- if (trace) { log.trace(this + " notified that client " + client + " has disconnected"); }
- return;
- }
- else
- {
- if (trace) { log.trace(this + " detected failure on client " + client, t); }
- }
+ {
+ ServerFailoverTracker tracker = TrackerFactory.getServerFailoverTracker();
+
+ if (t instanceof ClientDisconnectedException)
+ {
+ tracker.report("normal disconnection" + client.getSessionId(), t, false);
+ }
+ else
+ {
+ tracker.report("server failure detected for client: " + client.getSessionId(), t, true);
+ }
String remotingSessionID = client.getSessionId();
@@ -236,6 +261,7 @@
{
handleClientFailure(remotingSessionID);
}
+ tracker.report("handle complete for client: " + remotingSessionID, t, false);
}
// ClusterNotificationListener implementation ---------------------------------------------------
@@ -421,13 +447,19 @@
{
String jmsClientID = remotingSessions.get(jmsSessionID);
- log.warn("A problem has been detected " +
- "with the connection to remote client " +
- jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
- "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
+// log.warn("A problem has been detected " +
+// "with the connection to remote client " +
+// jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
+// "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
+
+ ServerFailoverTracker tracker = TrackerFactory.getServerFailoverTracker();
+
+ tracker.report("clean up for clientVM: " + jmsClientID +
+ " on remoting client: " + jmsSessionID, null, false);
if (jmsClientID != null)
{
+ tracker.report("We have client: " + jmsClientID, null, false);
Map<String, ConnectionEndpoint> endpoints = jmsClients.get(jmsClientID);
if (endpoints != null)
@@ -449,29 +481,37 @@
// sce could also be a mock test.. so this test is required
if (conn instanceof ServerConnectionEndpoint)
{
+ tracker.report("we close callback: " + ((ServerConnectionEndpoint)conn).getId(), null, false);
//Remoting is dumb and doesn't clean up it's state after itself - so we have to do it.
((ServerConnectionEndpoint)conn).closeCallbackClient();
+
}
try
{
+ tracker.report("cleanup closing conn: " + conn.toString(), null, false);
conn.closing(-1);
}
catch (Throwable ignore)
{
+ tracker.report("error cleanup closing conn: " + conn.toString(), ignore, false);
}
try
{
+ tracker.report("cleanup close conn: " + conn.toString(), null, false);
conn.close();
}
catch (Throwable ignore)
{
+ tracker.report("error cleanup close conn: " + conn.toString(), ignore, false);
}
-
+ tracker.report("returning after cleanup conn" + conn.toString(), null, false);
return;
}
}
}
+ tracker.report("getting handlers info: " + jmsClientID +
+ " and remote sessid: " + jmsSessionID, null, false);
Map<String, InvokerCallbackHandler> handlers = cfHandlers.get(jmsClientID);
@@ -485,22 +525,28 @@
{
try
{
+ tracker.report("disconnection callback client for " + jmsSessionID, null, false);
((ServerInvokerCallbackHandler)entry.getValue()).getCallbackClient().disconnect();
}
catch (Throwable ignore)
{
+ tracker.report("error disconn cb for " + jmsSessionID, ignore, false);
}
try
{
+ tracker.report("try destroy cb for " + jmsSessionID, null, false);
((ServerInvokerCallbackHandler)entry.getValue()).destroy();
}
catch (Throwable ignore)
{
+ tracker.report("error destroying cb for " + jmsSessionID, ignore, false);
}
for (ServerConnectionFactoryEndpoint ep: connectionFactories)
{
+ tracker.report("remove facotyr cb for: " + jmsSessionID +
+ " ep: " + ep, null, false);
ep.removeCallbackhandler(entry.getValue());
}
@@ -512,9 +558,13 @@
if (found)
{
+ tracker.report("unregister cf cb: " + jmsClientID +
+ " jmsSessionID: " + jmsSessionID, null, false);
unregisterConnectionFactoryCallback(jmsClientID, jmsSessionID);
}
}
+ tracker.report("done clean cb " + jmsClientID +
+ " jmsSessionID: " + jmsSessionID, null, false);
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -39,6 +39,8 @@
import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.remoting.CallbackManager;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.ConnectionEndpoint;
import org.jboss.jms.delegate.IDBlock;
import org.jboss.jms.delegate.SessionDelegate;
@@ -93,10 +95,14 @@
private static boolean trace = log.isTraceEnabled();
// Attributes -----------------------------------------------------------------------------------
-
+
private String id;
- private volatile boolean closed;
+ public String getId() {
+ return id;
+}
+
+private volatile boolean closed;
private volatile boolean started;
private String clientID;
@@ -273,6 +279,10 @@
ClientSessionDelegate d = new ClientSessionDelegate(sessionID, dupsOKBatchSize);
+ JMSObjectTracker tracker = TrackerFactory.getServerJMSObjTracker();
+
+ tracker.sessionEvent(sessionID, null, "server session created. conntion: " + this.id, true);
+
log.trace("created " + d);
return d;
@@ -364,12 +374,16 @@
public void close() throws JMSException
{
+ JMSObjectTracker tracker = TrackerFactory.getServerJMSObjTracker();
try
{
+ synchronized (this)
+ {
if (trace) { log.trace(this + " close()"); }
if (closed)
{
+ tracker.connectionEvent(id, null, "already closed", false);
log.warn("Connection is already closed");
return;
}
@@ -437,14 +451,22 @@
temporaryDestinations.clear();
}
+ closed = true;
+ }
+
+ //we put this outside the sync loop to avoid dead lock where
+ //SimpleConnectionManager.handleClientFailure() holds itself and then tries to call this close(), which requires lock on this
+ //meanwhile this close() (called from client) holds itself and call unregisterConnection(), which requires lock on SimpleConnectionManager.
cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
Dispatcher.instance.unregisterTarget(id, this);
-
- closed = true;
+
+ tracker.connectionEvent(id, null, "server connection closed.", false);
+ tracker.removeConnection(id);
}
catch (Throwable t)
{
+ tracker.connectionEvent(id, null, "server conn closing error " + t, false);
throw ExceptionUtil.handleJMSInvocation(t, this + " close");
}
}
@@ -474,6 +496,7 @@
public void sendTransaction(TransactionRequest request,
boolean checkForDuplicates) throws JMSException
{
+ JMSObjectTracker tracker = TrackerFactory.getServerJMSObjTracker();
try
{
if (closed)
@@ -487,7 +510,9 @@
Transaction tx = tr.createTransaction();
processTransaction(request.getState(), tx, checkForDuplicates);
+ tracker.connectionEvent(id, null, "1pc tx " + tx.getId(), false);
tx.commit();
+ tracker.connectionEvent(id, null, "1pc tx done. " + tx.getId(), false);
}
else if (request.getRequestType() == TransactionRequest.TWO_PHASE_PREPARE_REQUEST)
{
@@ -523,6 +548,7 @@
}
catch (Throwable t)
{
+ tracker.connectionEvent(id, null, "ex sending tx " + t, false);
throw ExceptionUtil.handleJMSInvocation(t, this + " sendTransaction");
}
}
@@ -649,7 +675,10 @@
{
if (sessions.remove(sessionId) == null)
{
- throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
+ //Here not to throw exception, because it is possible that the session close can be
+ //called from server side (SimpleConnectionManager) and client side at the same time.
+ if (trace) { log.trace("Cannot find session with id " + sessionId + " to remove"); }
+ //throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
}
}
}
@@ -685,6 +714,8 @@
boolean sendMessage(JBossMessage msg, Transaction tx, boolean checkForDuplicates) throws Exception
{
+
+ JMSObjectTracker tracker = TrackerFactory.getServerJMSObjTracker();
JBossDestination dest = (JBossDestination)msg.getJMSDestination();
if (!dest.isDirect())
@@ -752,8 +783,10 @@
else if (dest.isQueue())
{
if (trace) { log.trace(this + " routing " + msg + " to queue"); }
+ tracker.connectionEvent(id, null, "routing message : " + ref.getMessage().getMessageID(), false);
if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
{
+ tracker.connectionEvent(id, null, "fail to route msg: " + ref.getMessage().getMessageID() + " with tx: " + tx, false);
throw new JMSException("Failed to route " + ref + " to " + dest.getName());
}
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -30,6 +30,8 @@
import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
import org.jboss.jms.delegate.CreateConnectionResult;
import org.jboss.jms.delegate.TopologyResult;
@@ -159,6 +161,8 @@
ServerInvokerCallbackHandler callbackHandler)
throws JMSException
{
+
+ JMSObjectTracker tracker = TrackerFactory.getServerJMSObjTracker();
try
{
if (failedNodeID == -1)
@@ -169,6 +173,8 @@
remotingSessionID, clientVMID,
versionToUse,
callbackHandler);
+ tracker.connectionEvent(cd.getID(), null, "created standard. ", true);
+
return new CreateConnectionResult(cd);
}
else
@@ -193,6 +199,7 @@
remotingSessionID, clientVMID,
versionToUse,
callbackHandler);
+ tracker.connectionEvent(cd.getID(), null, "created failover. ", true);
return new CreateConnectionResult(cd);
}
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -25,6 +25,8 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.ConsumerEndpoint;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
@@ -64,7 +66,7 @@
// Static ---------------------------------------------------------------------------------------
// Attributes -----------------------------------------------------------------------------------
-
+
private boolean trace = log.isTraceEnabled();
private String id;
@@ -378,6 +380,9 @@
public void close() throws JMSException
{
+
+ JMSObjectTracker tracker = TrackerFactory.getServerJMSObjTracker();
+ tracker.consumerEvent(this.id, null, "close()", false);
try
{
if (trace)
@@ -386,7 +391,6 @@
}
localClose();
-
sessionEndpoint.removeConsumer(id);
}
catch (Throwable t)
@@ -517,6 +521,8 @@
void localClose() throws Throwable
{
+
+ JMSObjectTracker tracker = TrackerFactory.getServerJMSObjTracker();
if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
if (remote)
@@ -573,7 +579,8 @@
}
}
}
-
+ tracker.consumerEvent(id, null, "closed.", false);
+ tracker.removeConsumer(id);
}
void start()
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -40,6 +40,9 @@
import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.MessageTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.delegate.Ack;
import org.jboss.jms.delegate.BrowserDelegate;
import org.jboss.jms.delegate.Cancel;
@@ -325,11 +328,15 @@
public void close() throws JMSException
{
+
+ JMSObjectTracker tracker = TrackerFactory.getServerJMSObjTracker();
+ tracker.sessionEvent(this.id, null, "close()", false);
try
{
localClose();
connectionEndpoint.removeSession(id);
+
}
catch (Throwable t)
{
@@ -1119,16 +1126,25 @@
{
if (consumers.remove(consumerId) == null)
{
- throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
+ if (trace) { log.trace("Cannot find consumer with id " + consumerId + " to remove"); }
+ //don't throw, as it maybe called twice from client and server's connection failure handler.
+ //throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
}
}
}
void localClose() throws Throwable
{
+
+ JMSObjectTracker tracker = TrackerFactory.getServerJMSObjTracker();
+ MessageTracker mTracker = TrackerFactory.getServerMessageTracker();
+ tracker.sessionEvent(id, null, "localClose()", false);
+
if (closed)
{
- throw new IllegalStateException("Session is already closed");
+ //don't throw the exception as it maybe called twice
+ return;
+ //throw new IllegalStateException("Session is already closed");
}
if (trace) log.trace(this + " close()");
@@ -1167,6 +1183,9 @@
//Note we don't maintain order using a LinkedHashMap since then we lose
//concurrency since we would have to lock it exclusively
+ synchronized (deliveries)
+ {
+ tracker.sessionEvent(id, null, "sync deliveries.", false);
List entries = new ArrayList(deliveries.entrySet());
//Sort them in reverse delivery id order
@@ -1199,6 +1218,7 @@
rec.del.cancel();
+ mTracker.messageEvent("" + rec.del.getReference().getMessage().getMessageID(), " msg canceled", false);
channels.add(rec.del.getObserver());
}
@@ -1215,17 +1235,25 @@
executor.shutdownAfterProcessingCurrentlyQueuedTasks();
deliveries.clear();
+ }
sp.removeSession(id);
Dispatcher.instance.unregisterTarget(id, this);
closed = true;
+
+ tracker.sessionEvent(id, null, "locally closed", false);
+ tracker.removeSession(id);
}
void cancelDelivery(long deliveryId) throws Throwable
{
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+ DeliveryRecord rec = null;
+ synchronized(deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+ }
if (rec == null)
{
@@ -1410,6 +1438,8 @@
void performDelivery(MessageReference ref, long deliveryID, ServerConsumerEndpoint consumer)
{
+
+ MessageTracker mTracker = TrackerFactory.getServerMessageTracker();
if (consumer == null)
{
if (trace) { log.trace(this + " consumer is null, cannot perform delivery"); }
@@ -1436,9 +1466,13 @@
{
// one way invocation, no acknowledgment sent back by the client
if (trace) { log.trace(this + " submitting message " + ref.getMessage() + " to the remoting layer to be sent asynchronously"); }
+
+ mTracker.messageEvent("" + ref.getMessage().getMessageID(), "deliver msg to consumer: " + consumer.getID() + " from session: " + this.id, false);
callbackHandler.handleCallbackOneway(callback);
+ mTracker.messageEvent("" + ref.getMessage().getMessageID(), "sent to remoting layer ", false);
+
//We store the delivery id so we know to wait for any deliveries in transit on close
consumer.setLastDeliveryID(deliveryID);
}
@@ -1569,7 +1603,11 @@
private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
{
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+ DeliveryRecord rec = null;
+ synchronized (deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+ }
if (rec == null)
{
@@ -1715,16 +1753,33 @@
private boolean acknowledgeDeliveryInternal(Ack ack) throws Throwable
{
+
+ MessageTracker mTracker = TrackerFactory.getServerMessageTracker();
if (trace) { log.trace(this + " acknowledging delivery " + ack); }
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+ DeliveryRecord rec = null;
+
+ //I put synchronized here to prevent the following:
+ //a clustered server node detects connection failure and cancel deliveries.
+ //but the consumer on it get through to here
+ //if not synchronized, the remove may get the record before the above cancel action clear up the deliveries map.
+ //so the cancel action makes the message back to queue and this method cause the delivery count to decrement.
+ //as the cancel will decrease the delivery count once, so this will result the delivery count being decremented twice
+ //for one same message.
+ synchronized (deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+ }
+
if (rec == null)
{
//This can happen if an ack comes in after failover
log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
return false;
}
+
+ mTracker.messageEvent(rec.del.getReference().getMessage().getMessageID()+"", "acking from session: " + id, false);
ServerConsumerEndpoint consumer = rec.getConsumer();
@@ -2129,6 +2184,10 @@
}
log.trace(this + " created and registered " + ep);
+
+ JMSObjectTracker tracker = TrackerFactory.getServerJMSObjTracker();
+
+ tracker.consumerEvent(consumerID, null, "created. session: " + id, true);
return stub;
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/wireformat/RequestSupport.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/jms/wireformat/RequestSupport.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/jms/wireformat/RequestSupport.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -24,6 +24,8 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import org.jboss.jms.debug.JMSObjectTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.server.ServerPeer;
import org.jboss.remoting.InvocationRequest;
@@ -40,6 +42,8 @@
protected byte version;
+ // protected JMSObjectTracker tracker = TrackerFactory.getServerJMSObjTracker();
+
public RequestSupport()
{
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -28,6 +28,8 @@
import java.util.ListIterator;
import java.util.Set;
+import org.jboss.jms.debug.MessageTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.jms.server.MessagingTimeoutFactory;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Channel;
@@ -186,6 +188,7 @@
public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx)
{
+ MessageTracker mTracker = TrackerFactory.getServerMessageTracker();
if (!isActive())
{
if (trace) { log.trace(this + " is not active, returning null delivery for " + ref); }
@@ -214,6 +217,7 @@
// Each channel has its own copy of the reference
ref = ref.copy();
+ mTracker.messageEvent("" + ref.getMessage().getMessageID(), "in queue " + this.getChannelID());
try
{
if (tx == null)
@@ -241,6 +245,8 @@
}
else
{
+ mTracker.messageEvent("" + ref.getMessage().getMessageID(), "to tx: " + tx.getId(), false);
+
if (trace) { log.trace(this + " adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx)); }
// add to post commit callback
@@ -285,7 +291,8 @@
public void cancel(Delivery del) throws Throwable
{
- //We may need to update the delivery count in the database
+ MessageTracker mTracker = TrackerFactory.getServerMessageTracker();
+ //We may need to update the delivery count in the database
MessageReference ref = del.getReference();
@@ -296,6 +303,7 @@
if (!del.isRecovered())
{
+ mTracker.messageEvent("" + del.getReference().getMessage().getMessageID(), "cancel msg and dec count: " + deliveringCount.get(), false);
deliveringCount.decrement();
}
@@ -456,10 +464,13 @@
*/
public int getMessageCount()
{
- synchronized (lock)
+ MessageTracker mTracker = TrackerFactory.getServerMessageTracker();
+ synchronized (lock)
{
if (trace) { log.trace("Getting message count mr: "+ messageRefs.size() + " dc " + getDeliveringCount() + " sc " + getScheduledCount()); }
+ mTracker.messageEvent("nullidgetmsgcount: ", " refs: " + messageRefs.size() + " dels: " +
+ getDeliveringCount() + " schds: " + getScheduledCount(), false);
return messageRefs.size() + getDeliveringCount() + getScheduledCount();
}
}
@@ -738,8 +749,11 @@
protected void acknowledgeInternal(Delivery d, Transaction tx, boolean persist) throws Exception
{
+ MessageTracker mTracker = TrackerFactory.getServerMessageTracker();
+
if (tx == null)
{
+ mTracker.messageEvent("" + d.getReference().getMessage().getMessageID(), "acknowledging non-tx", false);
if (persist && recoverable && d.getReference().getMessage().isReliable())
{
pm.removeReference(channelID, d.getReference(), null);
@@ -747,11 +761,15 @@
if (!d.isRecovered())
{
+ mTracker.messageEvent("" + d.getReference().getMessage().getMessageID(), " decrease count", false);
deliveringCount.decrement();
+ mTracker.messageEvent("" + d.getReference().getMessage().getMessageID(), "msg finished with count: " + deliveringCount.get(), false);
+ mTracker.removeMessage("" + d.getReference().getMessage().getMessageID());
}
}
else
{
+ mTracker.messageEvent("" + d.getReference().getMessage().getMessageID(), "acknowledging tx", false);
this.getCallback(tx).addDelivery(d);
if (trace) { log.trace(this + " added " + d + " to memory on transaction " + tx); }
@@ -878,6 +896,7 @@
public void afterCommit(boolean onePhase) throws Exception
{
+ MessageTracker mTracker = TrackerFactory.getServerMessageTracker();
try
{
// We add the references to the state (or schedule them if appropriate)
@@ -923,7 +942,9 @@
if (!del.isRecovered())
{
+ mTracker.messageEvent("" + del.getReference().getMessage().getMessageID(), "decrease count in tx commit", false);
deliveringCount.decrement();
+ mTracker.removeMessage("" + del.getReference().getMessage().getMessageID());
}
}
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java 2009-04-23 11:05:20 UTC (rev 6534)
@@ -23,6 +23,8 @@
import java.util.Iterator;
+import org.jboss.jms.debug.MessageTracker;
+import org.jboss.jms.debug.TrackerFactory;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Delivery;
import org.jboss.messaging.core.contract.DeliveryObserver;
@@ -72,10 +74,13 @@
public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
{
+
+ MessageTracker mTracker = TrackerFactory.getServerMessageTracker();
//First try the local distributor
if (trace) { log.trace(this + " first trying with local distributor"); }
+ mTracker.messageEvent("" + ref.getMessage().getMessageID(), "1st to local", false);
Delivery del = localDistributor.handle(observer, ref, tx);
if (trace) { log.trace(this + " local distributor returned " + del); }
@@ -86,6 +91,7 @@
if (trace) { log.trace(this + " trying with remote distributor"); }
+ mTracker.messageEvent("" + ref.getMessage().getMessageID(), "next to remote", false);
del = remoteDistributor.handle(observer, ref, tx);
if (trace) { log.trace(this + " remote distributor returned " + del); }
Modified: tags/JBossMessaging_1_4_0_SP3_CP03_1456/tests/build.properties
===================================================================
--- tags/JBossMessaging_1_4_0_SP3_CP03/tests/build.properties 2009-01-20 01:38:41 UTC (rev 5664)
+++ tags/JBossMessaging_1_4_0_SP3_CP03_1456/tests/build.properties 2009-04-23 11:05:20 UTC (rev 6534)
@@ -3,4 +3,7 @@
#
#test.bind.address=192.168.1.2
+test.bind.address=localhost
+jgroups.bind_addr=localhost
+
More information about the jboss-cvs-commits
mailing list