[jboss-cvs] JBoss Messaging SVN: r7026 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss: jms/client/delegate and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue May 26 10:58:09 EDT 2009
Author: jbertram at redhat.com
Date: 2009-05-26 10:58:09 -0400 (Tue, 26 May 2009)
New Revision: 7026
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
Log:
[JBPAPP-2030]
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2009-05-26 14:58:09 UTC (rev 7026)
@@ -31,7 +31,6 @@
import org.jboss.jms.client.delegate.ClientProducerDelegate;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.BrowserState;
import org.jboss.jms.client.state.ConnectionState;
@@ -92,14 +91,6 @@
Version versionToUse = connectionDelegate.getVersionToUse();
JMSRemotingConnection remotingConnection = connectionDelegate.getRemotingConnection();
- // install the consolidated remoting connection listener; it will be de-installed on
- // connection closing by ConnectionAspect
-
- ConsolidatedRemotingConnectionListener listener =
- new ConsolidatedRemotingConnectionListener();
-
- remotingConnection.addConnectionListener(listener);
-
if (versionToUse == null)
{
throw new IllegalStateException("Connection version is null");
@@ -109,7 +100,8 @@
new ConnectionState(serverID, connectionDelegate,
remotingConnection, versionToUse);
- listener.setConnectionState(connectionState);
+ remotingConnection.getConnectionListener().setConnectionState(connectionState);
+ remotingConnection.getConnectionListener().start();
connectionDelegate.setState(connectionState);
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2009-05-26 14:58:09 UTC (rev 7026)
@@ -30,6 +30,7 @@
import javax.jms.JMSException;
import org.jboss.jms.client.container.JMSClientVMIdentifier;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.CreateConnectionResult;
@@ -149,7 +150,7 @@
try
{
- remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck);
+ remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck, new ConsolidatedRemotingConnectionListener(), sendAcksAsync);
remotingConnection.start();
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2009-05-26 14:58:09 UTC (rev 7026)
@@ -42,6 +42,8 @@
private ExceptionListener jmsExceptionListener;
private ConnectionFailureListener remotingListener;
+
+ private boolean started;
// Constructors ---------------------------------------------------------------------------------
@@ -53,6 +55,11 @@
public void handleConnectionException(Throwable throwable, Client client)
{
+ if (!started)
+ {
+ return;
+ }
+
// forward the exception to delegate listener and JMS ExceptionListeners; synchronize
// to avoid race conditions
@@ -162,6 +169,11 @@
}
return state + ".ConsolidatedListener";
}
+
+ public void start()
+ {
+ started = true;
+ }
// Package protected ----------------------------------------------------------------------------
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2009-05-26 14:58:09 UTC (rev 7026)
@@ -256,10 +256,17 @@
public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck) throws Exception
{
+ this(serverLocatorURI, clientPing, strictTck, null, sendAcksAsync);
+ }
+
+ public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck, ConsolidatedRemotingConnectionListener listener, boolean sendAcksAsync) throws Exception
+ {
serverLocator = new InvokerLocator(serverLocatorURI);
this.clientPing = clientPing;
this.strictTck = strictTck;
-
+ this.sendAcksAsync = sendAcksAsync;
+ this.remotingConnectionListener = listener;
+
log.trace(this + " created");
}
@@ -316,7 +323,14 @@
{
public Object run() throws Exception
{
- client.connect();
+ if (remotingConnectionListener != null)
+ {
+ client.connect(remotingConnectionListener, serverLocator.getParameters());
+ }
+ else
+ {
+ client.connect();
+ }
onewayClient.connect();
return null;
}
@@ -345,7 +359,7 @@
public void stop()
{
log.trace(this + " stop");
-
+
// explicitly remove the callback listener, to avoid race conditions on server
// (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
@@ -421,6 +435,11 @@
public synchronized void setFailed()
{
failed = true;
+
+ if (client == null)
+ {
+ return;
+ }
// Remoting has the bad habit of letting the job of cleaning after a failed connection up to
// the application. Here, we take care of that, by disconnecting the remoting client, and
@@ -429,7 +448,7 @@
try
{
- client.setDisconnectTimeout(0);
+ client.setDisconnectTimeout(0);
}
catch (Throwable ignore)
{
@@ -465,7 +484,7 @@
return true;
}
- public synchronized void addPlainConnectionListener(ConnectionListener listener)
+ public synchronized void addPlainConnectionListener(final ConnectionListener listener)
{
client.addConnectionListener(listener);
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java 2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java 2009-05-26 14:58:09 UTC (rev 7026)
@@ -1178,7 +1178,10 @@
{
if (sessions.remove(id) == null)
{
- throw new IllegalStateException("Cannot find session with id " + id + " to remove");
+ //here we don't throw exception as the session may have been removed already due to server side
+ //failure handler (SimpleConnectionManager), which deemed to be normal behavior in application environment.
+ if (log.isTraceEnabled()) { log.trace("Cannot find session with id " + id + " to remove"); }
+ // throw new IllegalStateException("Cannot find session with id " + id + " to remove");
}
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2009-05-26 14:58:09 UTC (rev 7026)
@@ -29,8 +29,6 @@
import java.util.Map;
import java.util.Set;
-import javax.jms.JMSException;
-
import org.jboss.jms.delegate.ConnectionEndpoint;
import org.jboss.jms.server.ConnectionManager;
import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
@@ -167,6 +165,7 @@
return e;
}
+
return null;
}
@@ -219,8 +218,22 @@
*/
public void handleConnectionException(Throwable t, Client client)
{
+ if (t instanceof ClientDisconnectedException)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Connection is closed normally: " + client);
+ }
+ }
+ else
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Connection is closed on failure event: " + client);
+ }
+ }
String remotingSessionID = client.getSessionId();
-
+
if (remotingSessionID != null)
{
handleClientFailure(remotingSessionID);
@@ -390,7 +403,7 @@
try
{
- ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+ ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
}
catch (Throwable ignore)
{
@@ -410,10 +423,10 @@
{
String jmsClientID = remotingSessions.get(jmsSessionID);
- log.warn("A problem has been detected " +
- "with the connection to remote client " +
- jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
- "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
+ log.trace("A problem has been detected " +
+ "with the connection to remote client " +
+ jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
+ "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
if (jmsClientID != null)
{
@@ -456,7 +469,6 @@
catch (Throwable ignore)
{
}
-
return;
}
}
@@ -482,7 +494,7 @@
try
{
- ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+ ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
}
catch (Throwable ignore)
{
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2009-05-26 14:58:09 UTC (rev 7026)
@@ -95,7 +95,7 @@
// Attributes -----------------------------------------------------------------------------------
private String id;
-
+
private volatile boolean closed;
private volatile boolean started;
@@ -362,15 +362,17 @@
}
}
- //reason for synchronization
- //Sometimes the server side detects a connection failure but
- //client side is normal. So it's possible the client side is calling
- //connection.close() while in the mean time the server side connection
- //failure handler call it also.
- public synchronized void close() throws JMSException
+ public void close() throws JMSException
{
try
{
+ //reason for synchronization
+ //Sometimes the server side detects a connection failure but
+ //client side is normal. So it's possible the client side is calling
+ //connection.close() while in the mean time the server side connection
+ //failure handler call it also.
+ synchronized (this)
+ {
if (trace) { log.trace(this + " close()"); }
if (closed)
@@ -442,11 +444,15 @@
temporaryDestinations.clear();
}
+ closed = true;
+ }
+
+ //we put this outside the sync loop to avoid dead lock where
+ //SimpleConnectionManager.handleClientFailure() holds itself and then tries to call this close(), which requires lock on this
+ //meanwhile this close() (called from client) holds itself and call unregisterConnection(), which requires lock on SimpleConnectionManager.
cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
Dispatcher.instance.unregisterTarget(id, this);
-
- closed = true;
}
catch (Throwable t)
{
@@ -654,7 +660,10 @@
{
if (sessions.remove(sessionId) == null)
{
- throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
+ //Here not to throw exception, because it is possible that the session close can be
+ //called from server side (SimpleConnectionManager) before client side.
+ if (trace) { log.trace("Cannot find session with id " + sessionId + " to remove"); }
+ //throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
}
}
}
@@ -757,7 +766,7 @@
else if (dest.isQueue())
{
if (trace) { log.trace(this + " routing " + msg + " to queue"); }
- if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
+ if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
{
throw new JMSException("Failed to route " + ref + " to " + dest.getName());
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2009-05-26 14:58:09 UTC (rev 7026)
@@ -1119,16 +1119,22 @@
{
if (consumers.remove(consumerId) == null)
{
- throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
+ if (trace) { log.trace("Cannot find consumer with id " + consumerId + " to remove"); }
+ //don't throw, as it maybe called twice from client and server's connection failure handler.
+ //throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
}
}
}
void localClose() throws Throwable
{
+
if (closed)
{
- throw new IllegalStateException("Session is already closed");
+ //don't throw the exception as it maybe called twice
+ if (trace) { log.trace("Session is already closed. "); }
+ return;
+ //throw new IllegalStateException("Session is already closed");
}
if (trace) log.trace(this + " close()");
@@ -1169,10 +1175,10 @@
synchronized (deliveries)
{
- List entries = new ArrayList(deliveries.entrySet());
+ List entries = new ArrayList(deliveries.entrySet());
- //Sort them in reverse delivery id order
- Collections.sort(entries,
+ //Sort them in reverse delivery id order
+ Collections.sort(entries,
new Comparator()
{
public int compare(Object obj1, Object obj2)
@@ -1185,38 +1191,44 @@
}
});
- Iterator iter = entries.iterator();
+ Iterator iter = entries.iterator();
- Set channels = new HashSet();
+ Set channels = new HashSet();
- if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
+ if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
- while (iter.hasNext())
- {
- Map.Entry entry = (Map.Entry)iter.next();
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
- if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
+ if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
- DeliveryRecord rec = (DeliveryRecord)entry.getValue();
+ DeliveryRecord rec = (DeliveryRecord)entry.getValue();
- rec.del.cancel();
+ /*
+ * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+ */
+ if (!rec.del.isXAPrepared())
+ {
+ rec.del.cancel();
+ }
- channels.add(rec.del.getObserver());
- }
+ channels.add(rec.del.getObserver());
+ }
- promptDelivery(channels);
+ promptDelivery(channels);
- //Close down the executor
+ //Close down the executor
- //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
- //prompter is queued and starts to execute
- //prompter almost finishes executing then a message is cancelled due to this session closing
- //this causes another prompter to be queued
- //shutdownAfterProcessingCurrentTask is then called
- //this means the second prompter never runs and the cancelled message doesn't get redelivered
- executor.shutdownAfterProcessingCurrentlyQueuedTasks();
+ //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
+ //prompter is queued and starts to execute
+ //prompter almost finishes executing then a message is cancelled due to this session closing
+ //this causes another prompter to be queued
+ //shutdownAfterProcessingCurrentTask is then called
+ //this means the second prompter never runs and the cancelled message doesn't get redelivered
+ executor.shutdownAfterProcessingCurrentlyQueuedTasks();
- deliveries.clear();
+ deliveries.clear();
}
sp.removeSession(id);
@@ -1228,7 +1240,11 @@
void cancelDelivery(long deliveryId) throws Throwable
{
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+ DeliveryRecord rec = null;
+ synchronized(deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+ }
if (rec == null)
{
@@ -1439,7 +1455,7 @@
{
// one way invocation, no acknowledgment sent back by the client
if (trace) { log.trace(this + " submitting message " + ref.getMessage() + " to the remoting layer to be sent asynchronously"); }
-
+
callbackHandler.handleCallbackOneway(callback);
//We store the delivery id so we know to wait for any deliveries in transit on close
@@ -1572,7 +1588,11 @@
private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
{
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+ DeliveryRecord rec = null;
+ synchronized (deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+ }
if (rec == null)
{
@@ -1722,7 +1742,7 @@
DeliveryRecord rec = null;
- //I put synchronized here to prevent the following:
+ //I put synchronized here to prevent the following from happening:
//a clustered server node detects connection failure and cancel deliveries.
//but the consumer on it get through to here
//if not synchronized, the remove may get the record before the above cancel action clear up the deliveries map.
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java 2009-05-26 14:51:46 UTC (rev 7025)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java 2009-05-26 14:58:09 UTC (rev 7026)
@@ -32,12 +32,12 @@
* A NamedThreadQueuedExecutor
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @deprecated
*
*/
public class NamedThreadQueuedExecutor extends QueuedExecutor
-{
- private static final Logger log = Logger.getLogger(NamedThreadQueuedExecutor.class);
-
+{
private final String name;
private static final ThreadGroup jbmGroup = new ThreadGroup("JBM-threads");
@@ -49,10 +49,6 @@
this.name = name;
setThreadFactory(new Factory());
-
- clearThread();
-
- restart();
}
private class Factory implements ThreadFactory
More information about the jboss-cvs-commits
mailing list