[jboss-cvs] JBoss Messaging SVN: r6203 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456: src/main/org/jboss/jms/server/endpoint and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Mar 27 13:12:43 EDT 2009
Author: jbertram at redhat.com
Date: 2009-03-27 13:12:43 -0400 (Fri, 27 Mar 2009)
New Revision: 6203
Added:
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml
Log:
[JBPAPP-1836]
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2009-03-27 17:12:41 UTC (rev 6202)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2009-03-27 17:12:43 UTC (rev 6203)
@@ -219,17 +219,6 @@
*/
public void handleConnectionException(Throwable t, Client client)
{
- if (t instanceof ClientDisconnectedException)
- {
- // This is OK
- if (trace) { log.trace(this + " notified that client " + client + " has disconnected"); }
- return;
- }
- else
- {
- if (trace) { log.trace(this + " detected failure on client " + client, t); }
- }
-
String remotingSessionID = client.getSessionId();
if (remotingSessionID != null)
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2009-03-27 17:12:41 UTC (rev 6202)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2009-03-27 17:12:43 UTC (rev 6203)
@@ -362,7 +362,12 @@
}
}
- public void close() throws JMSException
+ //reason for synchronization
+ //Sometimes the server side detects a connection failure but
+ //client side is normal. So it's possible the client side is calling
+ //connection.close() while in the mean time the server side connection
+ //failure handler call it also.
+ public synchronized void close() throws JMSException
{
try
{
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2009-03-27 17:12:41 UTC (rev 6202)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2009-03-27 17:12:43 UTC (rev 6203)
@@ -1167,6 +1167,8 @@
//Note we don't maintain order using a LinkedHashMap since then we lose
//concurrency since we would have to lock it exclusively
+ synchronized (deliveries)
+ {
List entries = new ArrayList(deliveries.entrySet());
//Sort them in reverse delivery id order
@@ -1215,7 +1217,8 @@
executor.shutdownAfterProcessingCurrentlyQueuedTasks();
deliveries.clear();
-
+ }
+
sp.removeSession(id);
Dispatcher.instance.unregisterTarget(id, this);
@@ -1717,7 +1720,19 @@
{
if (trace) { log.trace(this + " acknowledging delivery " + ack); }
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+ DeliveryRecord rec = null;
+
+ //I put synchronized here to prevent the following:
+ //a clustered server node detects connection failure and cancel deliveries.
+ //but the consumer on it get through to here
+ //if not synchronized, the remove may get the record before the above cancel action clear up the deliveries map.
+ //so the cancel action makes the message back to queue and this method cause the delivery count to decrement.
+ //as the cancel will decrease the delivery count once, so this will result the delivery count being decremented twice
+ //for one same message.
+ synchronized (deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+ }
if (rec == null)
{
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml 2009-03-27 17:12:41 UTC (rev 6202)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml 2009-03-27 17:12:43 UTC (rev 6203)
@@ -346,6 +346,7 @@
<!--
<jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=antjunit"/>
-->
+<<<<<<< .working
<classpath refid="test.execution.classpath"/>
<formatter type="xml" usefile="${junit.formatter.usefile}"/>
<batchtest todir="${junit.batchtest.todir}"
@@ -368,6 +369,29 @@
</batchtest>
</junit>
</target>
+=======
+ <classpath refid="test.execution.classpath" />
+ <formatter type="xml" usefile="${junit.formatter.usefile}" />
+ <batchtest todir="${junit.batchtest.todir}" haltonfailure="${junit.batchtest.haltonfailure}" haltonerror="${junit.batchtest.haltonerror}">
+ <formatter type="plain" usefile="${junit.formatter.usefile}" />
+ <fileset dir="${build.tests.classes}">
+ <include name="**/messaging/core/**/${test-mask}.class" />
+ <include name="**/jms/**/${test-mask}.class" />
+ <include name="**/messaging/util/**/${test-mask}.class" />
+ <exclude name="**/jms/DeliveryOnConnectionFailureTest.class" />
+ <exclude name="**/jms/MemLeakTest.class" />
+ <exclude name="**/jms/RemotingConnectionConfigurationTest.class" />
+ <exclude name="**/jms/XAResourceRecoveryTest.class" />
+ <exclude name="**/jms/stress/**" />
+ <exclude name="**/jms/crash/**" />
+ <exclude name="**/jms/bridge/**" />
+ <exclude name="**/jms/manual/**" />
+ <exclude name="**/jms/clustering/**" />
+ </fileset>
+ </batchtest>
+ </junit>
+ </target>
+>>>>>>> .merge-right.r6192
<target name="invm-thirdparty-tests" depends="tests-jar, prepare-testdirs, clear-test-logs"
description="Runs all available thirdparty tests an in-VM configuration">
Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java (from rev 6192, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java 2009-03-27 17:12:43 UTC (rev 6203)
@@ -0,0 +1,291 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
+import org.jboss.remoting.Client;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.Command;
+import org.jboss.test.messaging.tools.container.Server;
+
+/**
+ * A DeliveryOnConnectionFailureTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ * Created Mar 26, 2009 3:14:28 PM
+ *
+ */
+public class DeliveryOnConnectionFailureTest extends JMSTestCase
+{
+
+ public DeliveryOnConnectionFailureTest(String name)
+ {
+ super(name);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+ //Message Stuck means messages are kept in delivering state and never be delivered again
+ //unless the server is restarted (for persistent messages).
+ //this can happen in the following conditions:
+ //1. The client ping timeout and JBM tries to disconnect from the server (this happens in cluster).
+ //2. Due to the network/remoting issue, the server will receive a 'normal' disconnection event
+ //3. The server assumes the client has already closed it's connection and therefore doesn't clean up
+ //4. So the connection at the server is left open, including the sessions created on that connection.
+ //5. If the sessions contains messages in delivering, those messages will appear stuck.
+ //To fix this, either the server side always tries to clean up the connection whenever a disconnection happens
+ //or the remoting layer handle this correctly.
+ //
+ //Here we simplify the situation. First start the server and get a connection to it. Then
+ //we send a message to the server with client ack. We receive it without ack,
+ //next we directly call the client.disconnect() from client without closing the connection
+ //the server should cancel the message. Then we receive the message and ack it.
+ public void testMessageStuckOnConnectionFailure() throws Exception
+ {
+ ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ JBossConnection conn1 = null;
+ JBossConnection conn2 = null;
+
+ try
+ {
+ //create a connection
+ conn1 = (JBossConnection)cf.createConnection();
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer prod1 = sess1.createProducer(queue1);
+ TextMessage msg = sess1.createTextMessage("dont-stuck-me!");
+ conn1.start();
+
+ //send a message
+ prod1.send(msg);
+
+ //receive the message but not ack
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ TextMessage rm = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(rm);
+ assertEquals("dont-stuck-me!", rm.getText());
+
+ //break connection.
+ JMSRemotingConnection jmsConn = ((ClientConnectionDelegate)conn1.getDelegate()).getRemotingConnection();
+ Client rmClient = jmsConn.getRemotingClient();
+ rmClient.disconnect();
+
+ //wait for server side cleanup
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore.
+ }
+
+ //now receive the message
+ conn2 = (JBossConnection)cf.createConnection();
+ conn2.start();
+ Session sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons2 = sess2.createConsumer(queue1);
+ TextMessage rm2 = (TextMessage)cons2.receive(2000);
+
+ assertNotNull(rm2);
+ assertEquals("dont-stuck-me!", rm2.getText());
+ rm2.acknowledge();
+
+ //Message count should be zero.
+ //this is checked in tearDown().
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+ //another issue with jira 1456 is negative message count.
+ //This test guarantees the message count won't go negative
+ //Error Scenario:
+ // 1. Server side detects the connection failure (lease timeout) and close the connection/session
+ // 2. The session endpoint will cancel the messages being delivered to the queue.
+ // 3. At the same time the client side received some of the messages and acknowledge them
+ // 4. The acknowledge action will decrease the delivering count of the queue, and the session endpoint
+ // cancel also decrease the delivering count.
+ // 5. If not synchronized, one message may be canceled and acked at the same time, so the delivering count
+ // will be decreased twice for each message, resulting in negative message count.
+ //
+ //The test first creates a connection and send messages, then it receives the messages, then ack the last
+ //msg (client-ack), at the same time, simulate the server side connection failure to trigger server side
+ //clean up. This will create a possibility that when not properly synchronized, the above
+ //described issue may happen. At the end check the message count, it should always be zero.
+ public void testMessageCountOnConnectionFailure() throws Exception
+ {
+ ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ JBossConnection conn1 = null;
+ JBossConnection conn2 = null;
+
+ try
+ {
+ conn1 = (JBossConnection)cf.createConnection();
+ conn1.start();
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ //now send messages
+ MessageProducer prod1 = sess1.createProducer(queue1);
+
+ final int NUM_MSG = 2000;
+ for (int i = 0; i < NUM_MSG; ++i)
+ {
+ TextMessage tm = sess1.createTextMessage("-m"+i);
+ prod1.send(tm);
+ }
+
+ //receive the messages
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ for (int j = 0; j < NUM_MSG-1; ++j)
+ {
+ TextMessage rm = (TextMessage)cons1.receive(2000);
+ assertNotNull(rm);
+ assertEquals("-m"+j, rm.getText());
+ }
+
+ //last message
+ TextMessage lastRm = (TextMessage)cons1.receive(2000);
+ assertNotNull(lastRm);
+ assertEquals("-m"+(NUM_MSG-1), lastRm.getText());
+
+ final ServerClientFailureCommand cmd = new ServerClientFailureCommand();
+
+ Thread exeThr = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ServerManagement.getServer().executeCommand(cmd);
+ }
+ catch (Exception e)
+ {
+ log.error("failed to invoke command", e);
+ fail("failure in executing command.");
+ }
+ }
+ };
+
+ exeThr.start();
+
+ //ack last message, making server side ack happening.
+ lastRm.acknowledge();
+
+ //receive possible canceled messages
+ TextMessage prm = null;
+ conn2 = (JBossConnection)cf.createConnection();
+ conn2.start();
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons2 = sess2.createConsumer(queue1);
+ prm = (TextMessage)cons2.receive(2000);
+ while (prm != null)
+ {
+ prm = (TextMessage)cons2.receive(2000);
+ }
+
+ //check message count
+ //tearDown will do the check.
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+ public static class ServerClientFailureCommand implements Command
+ {
+
+ private static final long serialVersionUID = 2603154447586447658L;
+
+ public Object execute(Server server) throws Exception
+ {
+ ServerPeer peer = server.getServerPeer();
+
+ SimpleConnectionManager cm = (SimpleConnectionManager)peer.getConnectionManager();
+
+ Map jmsClients = cm.getClients();
+ assertEquals(1, jmsClients.size());
+ Map endpoints = (Map)jmsClients.values().iterator().next();
+ assertEquals(1, endpoints.size());
+ Map.Entry entry = (Map.Entry)endpoints.entrySet().iterator().next();
+ String sessId = (String)entry.getKey();
+
+ // triggering server side clean up
+ cm.handleClientFailure(sessId);
+ return null;
+ }
+
+ }
+}
More information about the jboss-cvs-commits
mailing list