[jboss-cvs] JBoss Messaging SVN: r6203 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456: src/main/org/jboss/jms/server/endpoint and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Mar 27 13:12:43 EDT 2009


Author: jbertram at redhat.com
Date: 2009-03-27 13:12:43 -0400 (Fri, 27 Mar 2009)
New Revision: 6203

Added:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java
Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml
Log:
[JBPAPP-1836]

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2009-03-27 17:12:41 UTC (rev 6202)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2009-03-27 17:12:43 UTC (rev 6203)
@@ -219,17 +219,6 @@
     */
    public void handleConnectionException(Throwable t, Client client)
    {  
-      if (t instanceof ClientDisconnectedException)
-      {
-         // This is OK
-         if (trace) { log.trace(this + " notified that client " + client + " has disconnected"); }
-         return;
-      }
-      else
-      {
-         if (trace) { log.trace(this + " detected failure on client " + client, t); }
-      }
-
       String remotingSessionID = client.getSessionId();
       
       if (remotingSessionID != null)

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2009-03-27 17:12:41 UTC (rev 6202)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2009-03-27 17:12:43 UTC (rev 6203)
@@ -362,7 +362,12 @@
       }
    }
 
-   public void close() throws JMSException
+   //reason for synchronization
+   //Sometimes the server side detects a connection failure but 
+   //client side is normal. So it's possible the client side is calling
+   //connection.close() while in the mean time the server side connection
+   //failure handler call it also.
+   public synchronized void close() throws JMSException
    {
       try
       {

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2009-03-27 17:12:41 UTC (rev 6202)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2009-03-27 17:12:43 UTC (rev 6203)
@@ -1167,6 +1167,8 @@
       //Note we don't maintain order using a LinkedHashMap since then we lose
       //concurrency since we would have to lock it exclusively
 
+      synchronized (deliveries)
+      {
       List entries = new ArrayList(deliveries.entrySet());
 
       //Sort them in reverse delivery id order
@@ -1215,7 +1217,8 @@
       executor.shutdownAfterProcessingCurrentlyQueuedTasks();
 
       deliveries.clear();
-
+      }
+      
       sp.removeSession(id);
 
       Dispatcher.instance.unregisterTarget(id, this);
@@ -1717,7 +1720,19 @@
    {
       if (trace) { log.trace(this + " acknowledging delivery " + ack); }
 
-      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+      DeliveryRecord rec = null;
+      
+      //I put synchronized here to prevent the following:
+      //a clustered server node detects connection failure and cancel deliveries.
+      //but the consumer on it get through to here
+      //if not synchronized, the remove may get the record before the above cancel action clear up the deliveries map.
+      //so the cancel action makes the message back to queue and this method cause the delivery count to decrement.
+      //as the cancel will decrease the delivery count once, so this will result the delivery count being decremented twice
+      //for one same message.
+      synchronized (deliveries)
+      {
+         rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+      }
 
       if (rec == null)
       {

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml	2009-03-27 17:12:41 UTC (rev 6202)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/build.xml	2009-03-27 17:12:43 UTC (rev 6203)
@@ -346,6 +346,7 @@
          <!--
          <jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=antjunit"/>
          -->
+<<<<<<< .working
          <classpath refid="test.execution.classpath"/>
          <formatter type="xml" usefile="${junit.formatter.usefile}"/>
          <batchtest todir="${junit.batchtest.todir}"
@@ -368,6 +369,29 @@
          </batchtest>
       </junit>
    </target>
+=======
+			<classpath refid="test.execution.classpath" />
+			<formatter type="xml" usefile="${junit.formatter.usefile}" />
+			<batchtest todir="${junit.batchtest.todir}" haltonfailure="${junit.batchtest.haltonfailure}" haltonerror="${junit.batchtest.haltonerror}">
+				<formatter type="plain" usefile="${junit.formatter.usefile}" />
+				<fileset dir="${build.tests.classes}">
+					<include name="**/messaging/core/**/${test-mask}.class" />
+					<include name="**/jms/**/${test-mask}.class" />
+					<include name="**/messaging/util/**/${test-mask}.class" />
+                                        <exclude name="**/jms/DeliveryOnConnectionFailureTest.class" />
+					<exclude name="**/jms/MemLeakTest.class" />
+					<exclude name="**/jms/RemotingConnectionConfigurationTest.class" />
+					<exclude name="**/jms/XAResourceRecoveryTest.class" />
+					<exclude name="**/jms/stress/**" />
+					<exclude name="**/jms/crash/**" />
+					<exclude name="**/jms/bridge/**" />
+					<exclude name="**/jms/manual/**" />
+					<exclude name="**/jms/clustering/**" />
+				</fileset>
+			</batchtest>
+		</junit>
+	</target>
+>>>>>>> .merge-right.r6192
 
    <target name="invm-thirdparty-tests" depends="tests-jar, prepare-testdirs, clear-test-logs"
            description="Runs all available thirdparty tests an in-VM configuration">

Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java (from rev 6192, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java	2009-03-27 17:12:43 UTC (rev 6203)
@@ -0,0 +1,291 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
+import org.jboss.remoting.Client;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.Command;
+import org.jboss.test.messaging.tools.container.Server;
+
+/**
+ * A DeliveryOnConnectionFailureTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Mar 26, 2009 3:14:28 PM
+ *
+ */
+public class DeliveryOnConnectionFailureTest extends JMSTestCase
+{
+
+   public DeliveryOnConnectionFailureTest(String name)
+   {
+      super(name);
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+   //Message Stuck means messages are kept in delivering state and never be delivered again
+   //unless the server is restarted (for persistent messages).
+   //this can happen in the following conditions:
+   //1. The client ping timeout and JBM tries to disconnect from the server (this happens in cluster).
+   //2. Due to the network/remoting issue, the server will receive a 'normal' disconnection event
+   //3. The server assumes the client has already closed it's connection and therefore doesn't clean up
+   //4. So the connection at the server is left open, including the sessions created on that connection.
+   //5. If the sessions contains messages in delivering, those messages will appear stuck.
+   //To fix this, either the server side always tries to clean up the connection whenever a disconnection happens
+   //or the remoting layer handle this correctly.
+   //
+   //Here we simplify the situation. First start the server and get a connection to it. Then
+   //we send a message to the server with client ack. We receive it without ack, 
+   //next we directly call the client.disconnect() from client without closing the connection
+   //the server should cancel the message. Then we receive the message and ack it.
+   public void testMessageStuckOnConnectionFailure() throws Exception
+   {
+      ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+      
+      JBossConnection conn1 = null;
+      JBossConnection conn2 = null;
+
+      try
+      {
+         //create a connection
+         conn1 = (JBossConnection)cf.createConnection();
+         Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         TextMessage msg = sess1.createTextMessage("dont-stuck-me!");
+         conn1.start();
+         
+         //send a message
+         prod1.send(msg);
+         
+         //receive the message but not ack
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         TextMessage rm = (TextMessage)cons1.receive(2000);
+         
+         assertNotNull(rm);
+         assertEquals("dont-stuck-me!", rm.getText());
+         
+         //break connection.
+         JMSRemotingConnection jmsConn = ((ClientConnectionDelegate)conn1.getDelegate()).getRemotingConnection();
+         Client rmClient = jmsConn.getRemotingClient();
+         rmClient.disconnect();
+         
+         //wait for server side cleanup
+         try
+         {
+            Thread.sleep(5000);
+         }
+         catch (InterruptedException e)
+         {
+            //ignore.
+         }
+         
+         //now receive the message 
+         conn2 = (JBossConnection)cf.createConnection();
+         conn2.start();
+         Session sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer cons2 = sess2.createConsumer(queue1);
+         TextMessage rm2 = (TextMessage)cons2.receive(2000);
+
+         assertNotNull(rm2);
+         assertEquals("dont-stuck-me!", rm2.getText());
+         rm2.acknowledge();
+         
+         //Message count should be zero.
+         //this is checked in tearDown().
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+   
+   //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+   //another issue with jira 1456 is negative message count.
+   //This test guarantees the message count won't go negative
+   //Error Scenario:
+   // 1. Server side detects the connection failure (lease timeout) and close the connection/session
+   // 2. The session endpoint will cancel the messages being delivered to the queue.
+   // 3. At the same time the client side received some of the messages and acknowledge them
+   // 4. The acknowledge action will decrease the delivering count of the queue, and the session endpoint
+   //    cancel also decrease the delivering count.
+   // 5. If not synchronized, one message may be canceled and acked at the same time, so the delivering count
+   //    will be decreased twice for each message, resulting in negative message count.
+   //
+   //The test first creates a connection and send messages, then it receives the messages, then ack the last
+   //msg (client-ack), at the same time, simulate the server side connection failure to trigger server side
+   //clean up. This will create a possibility that when not properly synchronized, the above 
+   //described issue may happen. At the end check the message count, it should always be zero.
+   public void testMessageCountOnConnectionFailure() throws Exception
+   {
+      ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+      
+      JBossConnection conn1 = null;
+      JBossConnection conn2 = null;
+      
+      try
+      {
+         conn1 = (JBossConnection)cf.createConnection();
+         conn1.start();
+         Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         
+         //now send messages
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         
+         final int NUM_MSG = 2000;
+         for (int i = 0; i < NUM_MSG; ++i)
+         {
+            TextMessage tm = sess1.createTextMessage("-m"+i);
+            prod1.send(tm);
+         }
+         
+         //receive the messages
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         for (int j = 0; j < NUM_MSG-1; ++j)
+         {
+            TextMessage rm = (TextMessage)cons1.receive(2000);
+            assertNotNull(rm);
+            assertEquals("-m"+j, rm.getText());
+         }
+         
+         //last message
+         TextMessage lastRm = (TextMessage)cons1.receive(2000);
+         assertNotNull(lastRm);
+         assertEquals("-m"+(NUM_MSG-1), lastRm.getText());
+         
+         final ServerClientFailureCommand cmd = new ServerClientFailureCommand();
+         
+         Thread exeThr = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  ServerManagement.getServer().executeCommand(cmd);
+               }
+               catch (Exception e)
+               {
+                  log.error("failed to invoke command", e);
+                  fail("failure in executing command.");
+               }               
+            }
+         };
+         
+         exeThr.start();
+
+         //ack last message, making server side ack happening.
+         lastRm.acknowledge();
+
+         //receive possible canceled messages
+         TextMessage prm = null;
+         conn2 = (JBossConnection)cf.createConnection();
+         conn2.start();
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons2 = sess2.createConsumer(queue1);
+         prm = (TextMessage)cons2.receive(2000);
+         while (prm != null)
+         {
+            prm = (TextMessage)cons2.receive(2000);
+         }
+         
+         //check message count
+         //tearDown will do the check.
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }      
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+   public static class ServerClientFailureCommand implements Command
+   {
+
+      private static final long serialVersionUID = 2603154447586447658L;
+
+      public Object execute(Server server) throws Exception
+      {
+         ServerPeer peer = server.getServerPeer();
+
+         SimpleConnectionManager cm = (SimpleConnectionManager)peer.getConnectionManager();
+
+         Map jmsClients = cm.getClients();
+         assertEquals(1, jmsClients.size());
+         Map endpoints = (Map)jmsClients.values().iterator().next();
+         assertEquals(1, endpoints.size());
+         Map.Entry entry = (Map.Entry)endpoints.entrySet().iterator().next();
+         String sessId = (String)entry.getKey();
+
+         // triggering server side clean up
+         cm.handleClientFailure(sessId);
+         return null;
+      }
+      
+   }
+}




More information about the jboss-cvs-commits mailing list