[jboss-cvs] JBoss Messaging SVN: r7029 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456: src/main/org/jboss/jms/client/container and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue May 26 11:36:57 EDT 2009
Author: jbertram at redhat.com
Date: 2009-05-26 11:36:56 -0400 (Tue, 26 May 2009)
New Revision: 7029
Added:
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml
Log:
[JBPAPP-2030]
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml 2009-05-26 15:36:56 UTC (rev 7029)
@@ -57,7 +57,11 @@
<attribute name="secondaryBindPort">xyz</attribute>
<attribute name="secondaryConnectPort">abc</attribute>
-->
-
+
+ <attribute name="failureDisconnectTimeout" isParam="true">0</attribute>
+ <attribute name="callbackErrorsAllowed">1</attribute>
+ <attribute name="useClientConnectionIdentity" isParam="true">true</attribute>
+
</invoker>
<handlers>
<handler subsystem="JMS">org.jboss.jms.server.remoting.JMSServerInvocationHandler</handler>
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml 2009-05-26 15:36:56 UTC (rev 7029)
@@ -56,6 +56,10 @@
<attribute name="secondaryConnectPort">abc</attribute>
-->
+ <attribute name="failureDisconnectTimeout" isParam="true">0</attribute>
+ <attribute name="callbackErrorsAllowed">1</attribute>
+ <attribute name="useClientConnectionIdentity" isParam="true">true</attribute>
+
</invoker>
<handlers>
<handler subsystem="JMS">org.jboss.jms.server.remoting.JMSServerInvocationHandler</handler>
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2009-05-26 15:36:56 UTC (rev 7029)
@@ -31,7 +31,6 @@
import org.jboss.jms.client.delegate.ClientProducerDelegate;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.BrowserState;
import org.jboss.jms.client.state.ConnectionState;
@@ -92,14 +91,6 @@
Version versionToUse = connectionDelegate.getVersionToUse();
JMSRemotingConnection remotingConnection = connectionDelegate.getRemotingConnection();
- // install the consolidated remoting connection listener; it will be de-installed on
- // connection closing by ConnectionAspect
-
- ConsolidatedRemotingConnectionListener listener =
- new ConsolidatedRemotingConnectionListener();
-
- remotingConnection.addConnectionListener(listener);
-
if (versionToUse == null)
{
throw new IllegalStateException("Connection version is null");
@@ -109,7 +100,8 @@
new ConnectionState(serverID, connectionDelegate,
remotingConnection, versionToUse);
- listener.setConnectionState(connectionState);
+ remotingConnection.getConnectionListener().setConnectionState(connectionState);
+ remotingConnection.getConnectionListener().start();
connectionDelegate.setState(connectionState);
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2009-05-26 15:36:56 UTC (rev 7029)
@@ -30,6 +30,7 @@
import javax.jms.JMSException;
import org.jboss.jms.client.container.JMSClientVMIdentifier;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.CreateConnectionResult;
@@ -149,7 +150,7 @@
try
{
- remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck);
+ remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck, new ConsolidatedRemotingConnectionListener(), sendAcksAsync);
remotingConnection.start();
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2009-05-26 15:36:56 UTC (rev 7029)
@@ -42,6 +42,8 @@
private ExceptionListener jmsExceptionListener;
private ConnectionFailureListener remotingListener;
+
+ private boolean started;
// Constructors ---------------------------------------------------------------------------------
@@ -53,6 +55,11 @@
public void handleConnectionException(Throwable throwable, Client client)
{
+ if (!started)
+ {
+ return;
+ }
+
// forward the exception to delegate listener and JMS ExceptionListeners; synchronize
// to avoid race conditions
@@ -162,6 +169,11 @@
}
return state + ".ConsolidatedListener";
}
+
+ public void start()
+ {
+ started = true;
+ }
// Package protected ----------------------------------------------------------------------------
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2009-05-26 15:36:56 UTC (rev 7029)
@@ -256,10 +256,17 @@
public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck) throws Exception
{
+ this(serverLocatorURI, clientPing, strictTck, null, sendAcksAsync);
+ }
+
+ public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck, ConsolidatedRemotingConnectionListener listener, boolean sendAcksAsync) throws Exception
+ {
serverLocator = new InvokerLocator(serverLocatorURI);
this.clientPing = clientPing;
this.strictTck = strictTck;
-
+ this.sendAcksAsync = sendAcksAsync;
+ this.remotingConnectionListener = listener;
+
log.trace(this + " created");
}
@@ -316,7 +323,14 @@
{
public Object run() throws Exception
{
- client.connect();
+ if (remotingConnectionListener != null)
+ {
+ client.connect(remotingConnectionListener, serverLocator.getParameters());
+ }
+ else
+ {
+ client.connect();
+ }
onewayClient.connect();
return null;
}
@@ -345,7 +359,7 @@
public void stop()
{
log.trace(this + " stop");
-
+
// explicitly remove the callback listener, to avoid race conditions on server
// (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
@@ -421,6 +435,11 @@
public synchronized void setFailed()
{
failed = true;
+
+ if (client == null)
+ {
+ return;
+ }
// Remoting has the bad habit of letting the job of cleaning after a failed connection up to
// the application. Here, we take care of that, by disconnecting the remoting client, and
@@ -429,7 +448,7 @@
try
{
- client.setDisconnectTimeout(0);
+ client.setDisconnectTimeout(0);
}
catch (Throwable ignore)
{
@@ -465,7 +484,7 @@
return true;
}
- public synchronized void addPlainConnectionListener(ConnectionListener listener)
+ public synchronized void addPlainConnectionListener(final ConnectionListener listener)
{
client.addConnectionListener(listener);
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java 2009-05-26 15:36:56 UTC (rev 7029)
@@ -1178,7 +1178,10 @@
{
if (sessions.remove(id) == null)
{
- throw new IllegalStateException("Cannot find session with id " + id + " to remove");
+ //here we don't throw exception as the session may have been removed already due to server side
+ //failure handler (SimpleConnectionManager), which deemed to be normal behavior in application environment.
+ if (log.isTraceEnabled()) { log.trace("Cannot find session with id " + id + " to remove"); }
+ // throw new IllegalStateException("Cannot find session with id " + id + " to remove");
}
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2009-05-26 15:36:56 UTC (rev 7029)
@@ -29,8 +29,6 @@
import java.util.Map;
import java.util.Set;
-import javax.jms.JMSException;
-
import org.jboss.jms.delegate.ConnectionEndpoint;
import org.jboss.jms.server.ConnectionManager;
import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
@@ -167,6 +165,7 @@
return e;
}
+
return null;
}
@@ -221,17 +220,20 @@
{
if (t instanceof ClientDisconnectedException)
{
- // This is OK
- if (trace) { log.trace(this + " notified that client " + client + " has disconnected"); }
- return;
+ if (log.isTraceEnabled())
+ {
+ log.trace("Connection is closed normally: " + client);
+ }
}
else
{
- if (trace) { log.trace(this + " detected failure on client " + client, t); }
+ if (log.isTraceEnabled())
+ {
+ log.trace("Connection is closed on failure event: " + client);
+ }
}
+ String remotingSessionID = client.getSessionId();
- String remotingSessionID = client.getSessionId();
-
if (remotingSessionID != null)
{
handleClientFailure(remotingSessionID);
@@ -401,7 +403,7 @@
try
{
- ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+ ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
}
catch (Throwable ignore)
{
@@ -421,10 +423,10 @@
{
String jmsClientID = remotingSessions.get(jmsSessionID);
- log.warn("A problem has been detected " +
- "with the connection to remote client " +
- jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
- "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
+ log.trace("A problem has been detected " +
+ "with the connection to remote client " +
+ jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
+ "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
if (jmsClientID != null)
{
@@ -467,7 +469,6 @@
catch (Throwable ignore)
{
}
-
return;
}
}
@@ -493,7 +494,7 @@
try
{
- ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+ ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
}
catch (Throwable ignore)
{
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2009-05-26 15:36:56 UTC (rev 7029)
@@ -95,7 +95,7 @@
// Attributes -----------------------------------------------------------------------------------
private String id;
-
+
private volatile boolean closed;
private volatile boolean started;
@@ -366,6 +366,13 @@
{
try
{
+ //reason for synchronization
+ //Sometimes the server side detects a connection failure but
+ //client side is normal. So it's possible the client side is calling
+ //connection.close() while in the mean time the server side connection
+ //failure handler call it also.
+ synchronized (this)
+ {
if (trace) { log.trace(this + " close()"); }
if (closed)
@@ -437,11 +444,15 @@
temporaryDestinations.clear();
}
+ closed = true;
+ }
+
+ //we put this outside the sync loop to avoid dead lock where
+ //SimpleConnectionManager.handleClientFailure() holds itself and then tries to call this close(), which requires lock on this
+ //meanwhile this close() (called from client) holds itself and call unregisterConnection(), which requires lock on SimpleConnectionManager.
cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
Dispatcher.instance.unregisterTarget(id, this);
-
- closed = true;
}
catch (Throwable t)
{
@@ -649,7 +660,10 @@
{
if (sessions.remove(sessionId) == null)
{
- throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
+ //Here not to throw exception, because it is possible that the session close can be
+ //called from server side (SimpleConnectionManager) before client side.
+ if (trace) { log.trace("Cannot find session with id " + sessionId + " to remove"); }
+ //throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
}
}
}
@@ -752,7 +766,7 @@
else if (dest.isQueue())
{
if (trace) { log.trace(this + " routing " + msg + " to queue"); }
- if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
+ if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
{
throw new JMSException("Failed to route " + ref + " to " + dest.getName());
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2009-05-26 15:36:56 UTC (rev 7029)
@@ -1119,16 +1119,22 @@
{
if (consumers.remove(consumerId) == null)
{
- throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
+ if (trace) { log.trace("Cannot find consumer with id " + consumerId + " to remove"); }
+ //don't throw, as it maybe called twice from client and server's connection failure handler.
+ //throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
}
}
}
void localClose() throws Throwable
{
+
if (closed)
{
- throw new IllegalStateException("Session is already closed");
+ //don't throw the exception as it maybe called twice
+ if (trace) { log.trace("Session is already closed. "); }
+ return;
+ //throw new IllegalStateException("Session is already closed");
}
if (trace) log.trace(this + " close()");
@@ -1167,10 +1173,12 @@
//Note we don't maintain order using a LinkedHashMap since then we lose
//concurrency since we would have to lock it exclusively
- List entries = new ArrayList(deliveries.entrySet());
+ synchronized (deliveries)
+ {
+ List entries = new ArrayList(deliveries.entrySet());
- //Sort them in reverse delivery id order
- Collections.sort(entries,
+ //Sort them in reverse delivery id order
+ Collections.sort(entries,
new Comparator()
{
public int compare(Object obj1, Object obj2)
@@ -1183,39 +1191,46 @@
}
});
- Iterator iter = entries.iterator();
+ Iterator iter = entries.iterator();
- Set channels = new HashSet();
+ Set channels = new HashSet();
- if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
+ if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
- while (iter.hasNext())
- {
- Map.Entry entry = (Map.Entry)iter.next();
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
- if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
+ if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
- DeliveryRecord rec = (DeliveryRecord)entry.getValue();
+ DeliveryRecord rec = (DeliveryRecord)entry.getValue();
- rec.del.cancel();
+ /*
+ * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+ */
+ if (!rec.del.isXAPrepared())
+ {
+ rec.del.cancel();
+ }
- channels.add(rec.del.getObserver());
- }
+ channels.add(rec.del.getObserver());
+ }
- promptDelivery(channels);
+ promptDelivery(channels);
- //Close down the executor
+ //Close down the executor
- //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
- //prompter is queued and starts to execute
- //prompter almost finishes executing then a message is cancelled due to this session closing
- //this causes another prompter to be queued
- //shutdownAfterProcessingCurrentTask is then called
- //this means the second prompter never runs and the cancelled message doesn't get redelivered
- executor.shutdownAfterProcessingCurrentlyQueuedTasks();
+ //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
+ //prompter is queued and starts to execute
+ //prompter almost finishes executing then a message is cancelled due to this session closing
+ //this causes another prompter to be queued
+ //shutdownAfterProcessingCurrentTask is then called
+ //this means the second prompter never runs and the cancelled message doesn't get redelivered
+ executor.shutdownAfterProcessingCurrentlyQueuedTasks();
- deliveries.clear();
-
+ deliveries.clear();
+ }
+
sp.removeSession(id);
Dispatcher.instance.unregisterTarget(id, this);
@@ -1225,7 +1240,11 @@
void cancelDelivery(long deliveryId) throws Throwable
{
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+ DeliveryRecord rec = null;
+ synchronized(deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+ }
if (rec == null)
{
@@ -1436,7 +1455,7 @@
{
// one way invocation, no acknowledgment sent back by the client
if (trace) { log.trace(this + " submitting message " + ref.getMessage() + " to the remoting layer to be sent asynchronously"); }
-
+
callbackHandler.handleCallbackOneway(callback);
//We store the delivery id so we know to wait for any deliveries in transit on close
@@ -1569,7 +1588,11 @@
private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
{
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+ DeliveryRecord rec = null;
+ synchronized (deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+ }
if (rec == null)
{
@@ -1717,7 +1740,19 @@
{
if (trace) { log.trace(this + " acknowledging delivery " + ack); }
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+ DeliveryRecord rec = null;
+
+ //I put synchronized here to prevent the following from happening:
+ //a clustered server node detects connection failure and cancel deliveries.
+ //but the consumer on it get through to here
+ //if not synchronized, the remove may get the record before the above cancel action clear up the deliveries map.
+ //so the cancel action makes the message back to queue and this method cause the delivery count to decrement.
+ //as the cancel will decrease the delivery count once, so this will result the delivery count being decremented twice
+ //for one same message.
+ synchronized (deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+ }
if (rec == null)
{
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java 2009-05-26 15:36:56 UTC (rev 7029)
@@ -32,12 +32,12 @@
* A NamedThreadQueuedExecutor
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @deprecated
*
*/
public class NamedThreadQueuedExecutor extends QueuedExecutor
-{
- private static final Logger log = Logger.getLogger(NamedThreadQueuedExecutor.class);
-
+{
private final String name;
private static final ThreadGroup jbmGroup = new ThreadGroup("JBM-threads");
@@ -49,10 +49,6 @@
this.name = name;
setThreadFactory(new Factory());
-
- clearThread();
-
- restart();
}
private class Factory implements ThreadFactory
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml 2009-05-26 15:06:52 UTC (rev 7028)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml 2009-05-26 15:36:56 UTC (rev 7029)
@@ -346,28 +346,27 @@
<!--
<jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=antjunit"/>
-->
- <classpath refid="test.execution.classpath"/>
- <formatter type="xml" usefile="${junit.formatter.usefile}"/>
- <batchtest todir="${junit.batchtest.todir}"
- haltonfailure="${junit.batchtest.haltonfailure}"
- haltonerror="${junit.batchtest.haltonerror}">
- <formatter type="plain" usefile="${junit.formatter.usefile}"/>
- <fileset dir="${build.tests.classes}">
- <include name="**/messaging/core/**/${test-mask}.class"/>
- <include name="**/jms/**/${test-mask}.class"/>
- <include name="**/messaging/util/**/${test-mask}.class"/>
- <exclude name="**/jms/MemLeakTest.class"/>
- <exclude name="**/jms/RemotingConnectionConfigurationTest.class"/>
- <exclude name="**/jms/XAResourceRecoveryTest.class"/>
- <exclude name="**/jms/stress/**"/>
- <exclude name="**/jms/crash/**"/>
- <exclude name="**/jms/bridge/**"/>
- <exclude name="**/jms/manual/**"/>
- <exclude name="**/jms/clustering/**"/>
- </fileset>
- </batchtest>
- </junit>
- </target>
+ <classpath refid="test.execution.classpath" />
+ <formatter type="xml" usefile="${junit.formatter.usefile}" />
+ <batchtest todir="${junit.batchtest.todir}" haltonfailure="${junit.batchtest.haltonfailure}" haltonerror="${junit.batchtest.haltonerror}">
+ <formatter type="plain" usefile="${junit.formatter.usefile}" />
+ <fileset dir="${build.tests.classes}">
+ <include name="**/messaging/core/**/${test-mask}.class" />
+ <include name="**/jms/**/${test-mask}.class" />
+ <include name="**/messaging/util/**/${test-mask}.class" />
+ <exclude name="**/jms/DeliveryOnConnectionFailureTest.class" />
+ <exclude name="**/jms/MemLeakTest.class" />
+ <exclude name="**/jms/RemotingConnectionConfigurationTest.class" />
+ <exclude name="**/jms/XAResourceRecoveryTest.class" />
+ <exclude name="**/jms/stress/**" />
+ <exclude name="**/jms/crash/**" />
+ <exclude name="**/jms/bridge/**" />
+ <exclude name="**/jms/manual/**" />
+ <exclude name="**/jms/clustering/**" />
+ </fileset>
+ </batchtest>
+ </junit>
+ </target>
<target name="invm-thirdparty-tests" depends="tests-jar, prepare-testdirs, clear-test-logs"
description="Runs all available thirdparty tests an in-VM configuration">
Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java (from rev 6192, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java 2009-05-26 15:36:56 UTC (rev 7029)
@@ -0,0 +1,291 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
+import org.jboss.remoting.Client;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.Command;
+import org.jboss.test.messaging.tools.container.Server;
+
+/**
+ * A DeliveryOnConnectionFailureTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ * Created Mar 26, 2009 3:14:28 PM
+ *
+ */
+public class DeliveryOnConnectionFailureTest extends JMSTestCase
+{
+
+ public DeliveryOnConnectionFailureTest(String name)
+ {
+ super(name);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+ //Message Stuck means messages are kept in delivering state and never be delivered again
+ //unless the server is restarted (for persistent messages).
+ //this can happen in the following conditions:
+ //1. The client ping timeout and JBM tries to disconnect from the server (this happens in cluster).
+ //2. Due to the network/remoting issue, the server will receive a 'normal' disconnection event
+ //3. The server assumes the client has already closed it's connection and therefore doesn't clean up
+ //4. So the connection at the server is left open, including the sessions created on that connection.
+ //5. If the sessions contains messages in delivering, those messages will appear stuck.
+ //To fix this, either the server side always tries to clean up the connection whenever a disconnection happens
+ //or the remoting layer handle this correctly.
+ //
+ //Here we simplify the situation. First start the server and get a connection to it. Then
+ //we send a message to the server with client ack. We receive it without ack,
+ //next we directly call the client.disconnect() from client without closing the connection
+ //the server should cancel the message. Then we receive the message and ack it.
+ public void testMessageStuckOnConnectionFailure() throws Exception
+ {
+ ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ JBossConnection conn1 = null;
+ JBossConnection conn2 = null;
+
+ try
+ {
+ //create a connection
+ conn1 = (JBossConnection)cf.createConnection();
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer prod1 = sess1.createProducer(queue1);
+ TextMessage msg = sess1.createTextMessage("dont-stuck-me!");
+ conn1.start();
+
+ //send a message
+ prod1.send(msg);
+
+ //receive the message but not ack
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ TextMessage rm = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(rm);
+ assertEquals("dont-stuck-me!", rm.getText());
+
+ //break connection.
+ JMSRemotingConnection jmsConn = ((ClientConnectionDelegate)conn1.getDelegate()).getRemotingConnection();
+ Client rmClient = jmsConn.getRemotingClient();
+ rmClient.disconnect();
+
+ //wait for server side cleanup
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore.
+ }
+
+ //now receive the message
+ conn2 = (JBossConnection)cf.createConnection();
+ conn2.start();
+ Session sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons2 = sess2.createConsumer(queue1);
+ TextMessage rm2 = (TextMessage)cons2.receive(2000);
+
+ assertNotNull(rm2);
+ assertEquals("dont-stuck-me!", rm2.getText());
+ rm2.acknowledge();
+
+ //Message count should be zero.
+ //this is checked in tearDown().
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+ //another issue with jira 1456 is negative message count.
+ //This test guarantees the message count won't go negative
+ //Error Scenario:
+ // 1. Server side detects the connection failure (lease timeout) and close the connection/session
+ // 2. The session endpoint will cancel the messages being delivered to the queue.
+ // 3. At the same time the client side received some of the messages and acknowledge them
+ // 4. The acknowledge action will decrease the delivering count of the queue, and the session endpoint
+ // cancel also decrease the delivering count.
+ // 5. If not synchronized, one message may be canceled and acked at the same time, so the delivering count
+ // will be decreased twice for each message, resulting in negative message count.
+ //
+ //The test first creates a connection and send messages, then it receives the messages, then ack the last
+ //msg (client-ack), at the same time, simulate the server side connection failure to trigger server side
+ //clean up. This will create a possibility that when not properly synchronized, the above
+ //described issue may happen. At the end check the message count, it should always be zero.
+ public void testMessageCountOnConnectionFailure() throws Exception
+ {
+ ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ JBossConnection conn1 = null;
+ JBossConnection conn2 = null;
+
+ try
+ {
+ conn1 = (JBossConnection)cf.createConnection();
+ conn1.start();
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ //now send messages
+ MessageProducer prod1 = sess1.createProducer(queue1);
+
+ final int NUM_MSG = 2000;
+ for (int i = 0; i < NUM_MSG; ++i)
+ {
+ TextMessage tm = sess1.createTextMessage("-m"+i);
+ prod1.send(tm);
+ }
+
+ //receive the messages
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ for (int j = 0; j < NUM_MSG-1; ++j)
+ {
+ TextMessage rm = (TextMessage)cons1.receive(2000);
+ assertNotNull(rm);
+ assertEquals("-m"+j, rm.getText());
+ }
+
+ //last message
+ TextMessage lastRm = (TextMessage)cons1.receive(2000);
+ assertNotNull(lastRm);
+ assertEquals("-m"+(NUM_MSG-1), lastRm.getText());
+
+ final ServerClientFailureCommand cmd = new ServerClientFailureCommand();
+
+ Thread exeThr = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ServerManagement.getServer().executeCommand(cmd);
+ }
+ catch (Exception e)
+ {
+ log.error("failed to invoke command", e);
+ fail("failure in executing command.");
+ }
+ }
+ };
+
+ exeThr.start();
+
+ //ack last message, making server side ack happening.
+ lastRm.acknowledge();
+
+ //receive possible canceled messages
+ TextMessage prm = null;
+ conn2 = (JBossConnection)cf.createConnection();
+ conn2.start();
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons2 = sess2.createConsumer(queue1);
+ prm = (TextMessage)cons2.receive(2000);
+ while (prm != null)
+ {
+ prm = (TextMessage)cons2.receive(2000);
+ }
+
+ //check message count
+ //tearDown will do the check.
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+ public static class ServerClientFailureCommand implements Command
+ {
+
+ private static final long serialVersionUID = 2603154447586447658L;
+
+ public Object execute(Server server) throws Exception
+ {
+ ServerPeer peer = server.getServerPeer();
+
+ SimpleConnectionManager cm = (SimpleConnectionManager)peer.getConnectionManager();
+
+ Map jmsClients = cm.getClients();
+ assertEquals(1, jmsClients.size());
+ Map endpoints = (Map)jmsClients.values().iterator().next();
+ assertEquals(1, endpoints.size());
+ Map.Entry entry = (Map.Entry)endpoints.entrySet().iterator().next();
+ String sessId = (String)entry.getKey();
+
+ // triggering server side clean up
+ cm.handleClientFailure(sessId);
+ return null;
+ }
+
+ }
+}
More information about the jboss-cvs-commits
mailing list