[hornetq-commits] JBoss hornetq SVN: r8257 - in trunk: tests/src/org/hornetq/tests/integration/client and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Nov 10 17:54:22 EST 2009
Author: timfox
Date: 2009-11-10 17:54:21 -0500 (Tue, 10 Nov 2009)
New Revision: 8257
Added:
trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-173
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -482,7 +482,7 @@
}
private void failoverOrReconnect(final Object connectionID, final HornetQException me)
- {
+ {
synchronized (failoverLock)
{
if (connection == null || connection.getID() != connectionID)
@@ -620,9 +620,27 @@
{
connection.destroy();
- connection = null;
- }
+ connection = null;
+ }
+ if (connection == null)
+ {
+ // If connection is null it means we didn't succeed in failing over or reconnecting
+ // so we close all the sessions, so they will throw exceptions when attempted to be used
+
+ for (ClientSessionInternal session: new HashSet<ClientSessionInternal>(sessions))
+ {
+ try
+ {
+ session.cleanUp();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to cleanup session");
+ }
+ }
+ }
+
callFailureListeners(me, true);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java 2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -166,6 +166,8 @@
session.createQueue(address, queue, false);
}
+
+
private ClientSessionFactory sf;
Added: trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A SessionClosedOnRemotingConnectionFailureTest
+ *
+ * @author Tim Fox
+
+ */
+public class SessionClosedOnRemotingConnectionFailureTest extends UnitTestCase
+{
+ private HornetQServer server;
+
+ private ClientSessionFactory sf;
+
+ public void testSessionClosedOnRemotingConnectionFailure() throws Exception
+ {
+ ClientSession session = null;
+
+ try
+ {
+ session = sf.createSession();
+
+ session.createQueue("fooaddress", "fooqueue");
+
+ ClientProducer prod = session.createProducer("fooaddress");
+
+ ClientConsumer cons = session.createConsumer("fooqueue");
+
+ session.start();
+
+ prod.send(session.createClientMessage(false));
+
+ assertNotNull(cons.receive());
+
+ // Now fail the underlying connection
+
+ RemotingConnection connection = ((ClientSessionInternal)session).getConnection();
+
+ connection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ assertTrue(session.isClosed());
+
+ assertTrue(prod.isClosed());
+
+ assertTrue(cons.isClosed());
+
+ //Now try and use the producer
+
+ try
+ {
+ prod.send(session.createClientMessage(false));
+
+ fail("Should throw exception");
+ }
+ catch (HornetQException e)
+ {
+ assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ cons.receive();
+
+ fail("Should throw exception");
+ }
+ catch (HornetQException e)
+ {
+ assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Configuration config = new ConfigurationImpl();
+ config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+ config.setSecurityEnabled(false);
+ server = HornetQ.newHornetQServer(config, false);
+
+ server.start();
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (sf != null)
+ {
+ sf.close();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+
+ sf = null;
+
+ server = null;
+
+ super.tearDown();
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -411,13 +411,19 @@
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
- session.start();
+ //Should throw exception since didn't reconnect
+
+ try
+ {
+ session.start();
+
+ fail("Should throw exception");
+ }
+ catch (HornetQException e)
+ {
+ assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ }
- // Should be null since failed to reconnect
- ClientMessage message = consumer.receive(500);
-
- assertNull(message);
-
session.close();
sf.close();
Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.JMSTestBase;
+import org.hornetq.utils.Pair;
+
+/**
+ *
+ * A SessionClosedOnRemotingConnectionFailureTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class SessionClosedOnRemotingConnectionFailureTest extends JMSTestBase
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(SessionClosedOnRemotingConnectionFailureTest.class);
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testSessionClosedOnRemotingConnectionFailure() throws Exception
+ {
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
+ connectorConfigs.add(new Pair<TransportConfiguration, TransportConfiguration>(new TransportConfiguration(NettyConnectorFactory.class.getName()),
+ null));
+
+ List<String> jndiBindings = new ArrayList<String>();
+ jndiBindings.add("/cffoo");
+
+ jmsServer.createConnectionFactory("cffoo",
+ connectorConfigs,
+ null,
+ DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ DEFAULT_CONNECTION_TTL,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ DEFAULT_CONSUMER_WINDOW_SIZE,
+ DEFAULT_CONSUMER_MAX_RATE,
+ DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
+ DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ DEFAULT_AUTO_GROUP,
+ DEFAULT_PRE_ACKNOWLEDGE,
+ DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_USE_GLOBAL_POOLS,
+ DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRY_INTERVAL,
+ 0,
+ false,
+ jndiBindings);
+
+
+ cf = (ConnectionFactory)context.lookup("/cffoo");
+
+ Connection conn = cf.createConnection();
+
+ Queue queue = createQueue("testQueue");
+
+ try
+ {
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = session.createProducer(queue);
+
+ MessageConsumer cons = session.createConsumer(queue);
+
+ conn.start();
+
+ prod.send(session.createMessage());
+
+ assertNotNull(cons.receive());
+
+ // Now fail the underlying connection
+
+ RemotingConnection connection = ((ClientSessionInternal)((HornetQSession)session).getCoreSession()).getConnection();
+
+ connection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ // Now try and use the producer
+
+ try
+ {
+ prod.send(session.createMessage());
+
+ fail("Should throw exception");
+ }
+ catch (JMSException e)
+ {
+ // assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ cons.receive();
+
+ fail("Should throw exception");
+ }
+ catch (JMSException e)
+ {
+ // assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ }
+
+ session.close();
+
+ conn.close();
+ }
+ finally
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable igonred)
+ {
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -251,7 +251,7 @@
public void testGetScheduledCount() throws Exception
{
- long delay = 2000;
+ long delay = 500;
SimpleString address = randomSimpleString();
SimpleString queue = randomSimpleString();
@@ -268,7 +268,7 @@
assertEquals(1, queueControl.getScheduledCount());
consumeMessages(0, session, queue);
- Thread.sleep(delay);
+ Thread.sleep(delay * 2);
assertEquals(0, queueControl.getScheduledCount());
consumeMessages(1, session, queue);
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -63,7 +63,7 @@
protected HornetQServer server;
protected JMSServerManagerImpl jmsServer;
-
+
protected ConnectionFactory cf;
protected InVMContext context;
@@ -96,7 +96,7 @@
{
return false;
}
-
+
/**
* @throws Exception
* @throws NamingException
@@ -104,10 +104,9 @@
protected Queue createQueue(String name) throws Exception, NamingException
{
jmsServer.createQueue(name, "/jms/" + name, null, true);
-
+
return (Queue)context.lookup("/jms/" + name);
}
-
@Override
protected void setUp() throws Exception
@@ -178,9 +177,9 @@
jndiBindings.add("/cf");
createCF(connectorConfigs, jndiBindings);
-
+
cf = (ConnectionFactory)context.lookup("/cf");
-
+
}
/**
@@ -189,7 +188,7 @@
* @throws Exception
*/
protected void createCF(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
- List<String> jndiBindings) throws Exception
+ List<String> jndiBindings) throws Exception
{
int retryInterval = 1000;
double retryIntervalMultiplier = 1.0;
@@ -198,34 +197,34 @@
int callTimeout = 30000;
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
- connectorConfigs,
- null,
- DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- DEFAULT_CONNECTION_TTL,
- callTimeout,
- DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
- DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- DEFAULT_CONSUMER_WINDOW_SIZE,
- DEFAULT_CONSUMER_MAX_RATE,
- DEFAULT_CONFIRMATION_WINDOW_SIZE,
- DEFAULT_PRODUCER_MAX_RATE,
- DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- DEFAULT_BLOCK_ON_PERSISTENT_SEND,
- DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
- DEFAULT_AUTO_GROUP,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_USE_GLOBAL_POOLS,
- DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
- retryInterval,
- retryIntervalMultiplier,
- DEFAULT_MAX_RETRY_INTERVAL,
- reconnectAttempts,
- failoverOnServerShutdown,
- jndiBindings);
+ connectorConfigs,
+ null,
+ DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ DEFAULT_CONNECTION_TTL,
+ callTimeout,
+ DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ DEFAULT_CONSUMER_WINDOW_SIZE,
+ DEFAULT_CONSUMER_MAX_RATE,
+ DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
+ DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ DEFAULT_AUTO_GROUP,
+ DEFAULT_PRE_ACKNOWLEDGE,
+ DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_USE_GLOBAL_POOLS,
+ DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
+ retryInterval,
+ retryIntervalMultiplier,
+ DEFAULT_MAX_RETRY_INTERVAL,
+ reconnectAttempts,
+ failoverOnServerShutdown,
+ jndiBindings);
}
}
More information about the hornetq-commits
mailing list