[hornetq-commits] JBoss hornetq SVN: r8257 - in trunk: tests/src/org/hornetq/tests/integration/client and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 10 17:54:22 EST 2009


Author: timfox
Date: 2009-11-10 17:54:21 -0500 (Tue, 10 Nov 2009)
New Revision: 8257

Added:
   trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
   trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-173

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-11-10 22:54:21 UTC (rev 8257)
@@ -482,7 +482,7 @@
    }
 
    private void failoverOrReconnect(final Object connectionID, final HornetQException me)
-   {
+   {     
       synchronized (failoverLock)
       {
          if (connection == null || connection.getID() != connectionID)
@@ -620,9 +620,27 @@
          {
             connection.destroy();
 
-            connection = null;
-         }        
+            connection = null;                       
+         }      
          
+         if (connection == null)
+         {
+            // If connection is null it means we didn't succeed in failing over or reconnecting
+            // so we close all the sessions, so they will throw exceptions when attempted to be used
+            
+            for (ClientSessionInternal session: new HashSet<ClientSessionInternal>(sessions))
+            {
+               try
+               {
+                  session.cleanUp();
+               }
+               catch (Exception e)
+               {
+                  log.error("Failed to cleanup session");
+               }
+            }
+         }
+         
          callFailureListeners(me, true);
       }
    }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java	2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java	2009-11-10 22:54:21 UTC (rev 8257)
@@ -166,6 +166,8 @@
       session.createQueue(address, queue, false);
 
    }
+   
+   
 
    private ClientSessionFactory sf;
 

Added: trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java	2009-11-10 22:54:21 UTC (rev 8257)
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A SessionClosedOnRemotingConnectionFailureTest
+ *
+ * @author Tim Fox
+ 
+ */
+public class SessionClosedOnRemotingConnectionFailureTest extends UnitTestCase
+{
+   private HornetQServer server;
+
+   private ClientSessionFactory sf;
+
+   public void testSessionClosedOnRemotingConnectionFailure() throws Exception
+   {
+      ClientSession session = null;
+
+      try
+      {
+         session = sf.createSession();
+
+         session.createQueue("fooaddress", "fooqueue");
+
+         ClientProducer prod = session.createProducer("fooaddress");
+
+         ClientConsumer cons = session.createConsumer("fooqueue");
+
+         session.start();
+
+         prod.send(session.createClientMessage(false));
+
+         assertNotNull(cons.receive());
+
+         // Now fail the underlying connection
+
+         RemotingConnection connection = ((ClientSessionInternal)session).getConnection();
+
+         connection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+         
+         assertTrue(session.isClosed());
+         
+         assertTrue(prod.isClosed());
+         
+         assertTrue(cons.isClosed());
+         
+         //Now try and use the producer
+         
+         try
+         {
+            prod.send(session.createClientMessage(false));
+            
+            fail("Should throw exception");
+         }
+         catch (HornetQException e)
+         {
+            assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+         }
+         
+         try
+         {
+            cons.receive();
+            
+            fail("Should throw exception");
+         }
+         catch (HornetQException e)
+         {
+            assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());            
+         }        
+         
+         session.close();
+      }
+      finally
+      {
+         if (session != null)
+         {
+            session.close();
+         }
+      }
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      Configuration config = new ConfigurationImpl();
+      config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+      config.setSecurityEnabled(false);
+      server = HornetQ.newHornetQServer(config, false);
+
+      server.start();
+
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (sf != null)
+      {
+         sf.close();
+      }
+
+      if (server != null)
+      {
+         server.stop();
+      }
+
+      sf = null;
+
+      server = null;
+
+      super.tearDown();
+   }
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java	2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java	2009-11-10 22:54:21 UTC (rev 8257)
@@ -411,13 +411,19 @@
 
       conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
 
-      session.start();
+      //Should throw exception since didn't reconnect
+      
+      try
+      {
+         session.start();
+         
+         fail("Should throw exception");
+      }
+      catch (HornetQException e)
+      {
+         assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+      }
 
-      // Should be null since failed to reconnect
-      ClientMessage message = consumer.receive(500);
-
-      assertNull(message);
-
       session.close();
 
       sf.close();

Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java	2009-11-10 22:54:21 UTC (rev 8257)
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.JMSTestBase;
+import org.hornetq.utils.Pair;
+
+/**
+ * 
+ * A SessionClosedOnRemotingConnectionFailureTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class SessionClosedOnRemotingConnectionFailureTest extends JMSTestBase
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(SessionClosedOnRemotingConnectionFailureTest.class);
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testSessionClosedOnRemotingConnectionFailure() throws Exception
+   {
+      List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
+      connectorConfigs.add(new Pair<TransportConfiguration, TransportConfiguration>(new TransportConfiguration(NettyConnectorFactory.class.getName()),
+                                                                                    null));
+
+      List<String> jndiBindings = new ArrayList<String>();
+      jndiBindings.add("/cffoo");
+
+      jmsServer.createConnectionFactory("cffoo",
+                                        connectorConfigs,
+                                        null,
+                                        DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                        DEFAULT_CONNECTION_TTL,
+                                        ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+                                        DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+                                        DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        DEFAULT_CONSUMER_WINDOW_SIZE,
+                                        DEFAULT_CONSUMER_MAX_RATE,
+                                        DEFAULT_CONFIRMATION_WINDOW_SIZE,
+                                        DEFAULT_PRODUCER_MAX_RATE,
+                                        DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+                                        DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+                                        DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+                                        DEFAULT_AUTO_GROUP,
+                                        DEFAULT_PRE_ACKNOWLEDGE,
+                                        DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+                                        DEFAULT_ACK_BATCH_SIZE,
+                                        DEFAULT_ACK_BATCH_SIZE,
+                                        DEFAULT_USE_GLOBAL_POOLS,
+                                        DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+                                        DEFAULT_THREAD_POOL_MAX_SIZE,
+                                        DEFAULT_RETRY_INTERVAL,
+                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                        DEFAULT_MAX_RETRY_INTERVAL,
+                                        0,
+                                        false,
+                                        jndiBindings);
+
+
+      cf = (ConnectionFactory)context.lookup("/cffoo");
+                  
+      Connection conn = cf.createConnection();
+
+      Queue queue = createQueue("testQueue");
+
+      try
+      {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = session.createProducer(queue);
+
+         MessageConsumer cons = session.createConsumer(queue);
+
+         conn.start();
+
+         prod.send(session.createMessage());
+
+         assertNotNull(cons.receive());
+
+         // Now fail the underlying connection
+
+         RemotingConnection connection = ((ClientSessionInternal)((HornetQSession)session).getCoreSession()).getConnection();
+
+         connection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+         // Now try and use the producer
+
+         try
+         {
+            prod.send(session.createMessage());
+
+            fail("Should throw exception");
+         }
+         catch (JMSException e)
+         {
+            // assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+         }
+
+         try
+         {
+            cons.receive();
+
+            fail("Should throw exception");
+         }
+         catch (JMSException e)
+         {
+            // assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+         }
+         
+         session.close();
+         
+         conn.close();
+      }
+      finally
+      {
+         try
+         {
+            conn.close();
+         }
+         catch (Throwable igonred)
+         {
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java	2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java	2009-11-10 22:54:21 UTC (rev 8257)
@@ -251,7 +251,7 @@
 
    public void testGetScheduledCount() throws Exception
    {
-      long delay = 2000;
+      long delay = 500;
       SimpleString address = randomSimpleString();
       SimpleString queue = randomSimpleString();
 
@@ -268,7 +268,7 @@
       assertEquals(1, queueControl.getScheduledCount());
       consumeMessages(0, session, queue);
 
-      Thread.sleep(delay);
+      Thread.sleep(delay * 2);
 
       assertEquals(0, queueControl.getScheduledCount());
       consumeMessages(1, session, queue);

Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java	2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java	2009-11-10 22:54:21 UTC (rev 8257)
@@ -63,7 +63,7 @@
    protected HornetQServer server;
 
    protected JMSServerManagerImpl jmsServer;
-   
+
    protected ConnectionFactory cf;
 
    protected InVMContext context;
@@ -96,7 +96,7 @@
    {
       return false;
    }
-   
+
    /**
     * @throws Exception
     * @throws NamingException
@@ -104,10 +104,9 @@
    protected Queue createQueue(String name) throws Exception, NamingException
    {
       jmsServer.createQueue(name, "/jms/" + name, null, true);
-      
+
       return (Queue)context.lookup("/jms/" + name);
    }
-   
 
    @Override
    protected void setUp() throws Exception
@@ -178,9 +177,9 @@
       jndiBindings.add("/cf");
 
       createCF(connectorConfigs, jndiBindings);
-      
+
       cf = (ConnectionFactory)context.lookup("/cf");
-      
+
    }
 
    /**
@@ -189,7 +188,7 @@
     * @throws Exception
     */
    protected void createCF(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
-                         List<String> jndiBindings) throws Exception
+                           List<String> jndiBindings) throws Exception
    {
       int retryInterval = 1000;
       double retryIntervalMultiplier = 1.0;
@@ -198,34 +197,34 @@
       int callTimeout = 30000;
 
       jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
-                                            connectorConfigs,
-                                            null,
-                                            DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
-                                            DEFAULT_CONNECTION_TTL,
-                                            callTimeout,                                            
-                                            DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
-                                            DEFAULT_MIN_LARGE_MESSAGE_SIZE,
-                                            DEFAULT_CONSUMER_WINDOW_SIZE,
-                                            DEFAULT_CONSUMER_MAX_RATE,
-                                            DEFAULT_CONFIRMATION_WINDOW_SIZE,
-                                            DEFAULT_PRODUCER_MAX_RATE,
-                                            DEFAULT_BLOCK_ON_ACKNOWLEDGE,
-                                            DEFAULT_BLOCK_ON_PERSISTENT_SEND,
-                                            DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
-                                            DEFAULT_AUTO_GROUP,
-                                            DEFAULT_PRE_ACKNOWLEDGE,
-                                            DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
-                                            DEFAULT_ACK_BATCH_SIZE,
-                                            DEFAULT_ACK_BATCH_SIZE,
-                                            DEFAULT_USE_GLOBAL_POOLS,
-                                            DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
-                                            DEFAULT_THREAD_POOL_MAX_SIZE,                                 
-                                            retryInterval,
-                                            retryIntervalMultiplier,
-                                            DEFAULT_MAX_RETRY_INTERVAL,
-                                            reconnectAttempts,
-                                            failoverOnServerShutdown,
-                                            jndiBindings);
+                                        connectorConfigs,
+                                        null,
+                                        DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                        DEFAULT_CONNECTION_TTL,
+                                        callTimeout,
+                                        DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+                                        DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        DEFAULT_CONSUMER_WINDOW_SIZE,
+                                        DEFAULT_CONSUMER_MAX_RATE,
+                                        DEFAULT_CONFIRMATION_WINDOW_SIZE,
+                                        DEFAULT_PRODUCER_MAX_RATE,
+                                        DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+                                        DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+                                        DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+                                        DEFAULT_AUTO_GROUP,
+                                        DEFAULT_PRE_ACKNOWLEDGE,
+                                        DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+                                        DEFAULT_ACK_BATCH_SIZE,
+                                        DEFAULT_ACK_BATCH_SIZE,
+                                        DEFAULT_USE_GLOBAL_POOLS,
+                                        DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+                                        DEFAULT_THREAD_POOL_MAX_SIZE,
+                                        retryInterval,
+                                        retryIntervalMultiplier,
+                                        DEFAULT_MAX_RETRY_INTERVAL,
+                                        reconnectAttempts,
+                                        failoverOnServerShutdown,
+                                        jndiBindings);
    }
 
 }



More information about the hornetq-commits mailing list