[jboss-cvs] JBoss Messaging SVN: r7673 - in trunk: src/main/org/jboss/messaging/core/client/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Aug 6 13:09:25 EDT 2009
Author: timfox
Date: 2009-08-06 13:09:24 -0400 (Thu, 06 Aug 2009)
New Revision: 7673
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/client/FailureDeadlockTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/JBossConnectionFactoryTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1702 and https://jira.jboss.org/jira/browse/JBMESSAGING-1702
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2009-08-06 17:09:24 UTC (rev 7673)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.client;
import java.util.List;
+import java.util.concurrent.Executor;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
@@ -172,4 +173,6 @@
void setDiscoveryRefreshTimeout(long discoveryRefreshTimeout);
void close();
+
+ Executor getThreadPool();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-08-06 17:09:24 UTC (rev 7673)
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -847,6 +848,11 @@
closed = true;
}
+
+ public Executor getThreadPool()
+ {
+ return threadPool;
+ }
// DiscoveryListener implementation --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-08-06 17:09:24 UTC (rev 7673)
@@ -515,7 +515,9 @@
{
return false;
}
-
+
+ boolean done = false;
+
synchronized (failoverLock)
{
if (connectionID != null && !connections.containsKey(connectionID))
@@ -554,9 +556,7 @@
// until failover is complete
boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.DISCONNECTED);
-
- boolean done = false;
-
+
if (attemptFailover || reconnectAttempts != 0)
{
lockAllChannel1s();
@@ -607,7 +607,7 @@
}
closePingers();
-
+
connections.clear();
refCount = 0;
@@ -666,16 +666,16 @@
else
{
// Just fail the connections
-
- closePingers();
-
- failConnection(me);
+
+ failConnections(me);
}
inFailoverOrReconnect = false;
- return done;
}
+
+ return done;
+
}
private void closePingers()
@@ -829,7 +829,7 @@
}
private void checkCloseConnections()
- {
+ {
if (refCount == 0)
{
// Close connections
@@ -863,6 +863,7 @@
connector = null;
}
+
}
public RemotingConnection getConnection(final int initialRefCount)
@@ -1020,23 +1021,25 @@
{
// Can be legitimately null if session was closed before then went to remove session from csf
// and locked since failover had started then after failover removes it but it's already been failed
- }
+ }
}
- private void failConnection(final MessagingException me)
+ private void failConnections(final MessagingException me)
{
synchronized (failConnectionLock)
{
// When a single connection fails, we fail *all* the connections
-
+
Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
for (ConnectionEntry entry : copy)
{
entry.connection.fail(me);
}
-
+
refCount = 0;
+
+ checkCloseConnections();
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-08-06 17:09:24 UTC (rev 7673)
@@ -126,7 +126,7 @@
public void close()
{
if (future != null)
- {
+ {
future.cancel(false);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-08-06 17:09:24 UTC (rev 7673)
@@ -249,15 +249,6 @@
for (Channel channel : channels.values())
{
- // channel.lock.lock();
- // try
- // {
- // channel.sendCondition.signalAll();
- // }
- // finally
- // {
- // channel.lock.unlock();
- // }
channel.returnBlocking();
}
}
@@ -447,14 +438,4 @@
}
}
- private static class DelegatingBufferHandler extends AbstractBufferHandler
- {
- RemotingConnection conn;
-
- public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
- {
- conn.bufferReceived(connectionID, buffer);
- }
- }
-
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-06 17:09:24 UTC (rev 7673)
@@ -297,7 +297,7 @@
// arrive the connection will get closed
scheduledThreadPool.schedule(runnable, INITIAL_PING_TIMEOUT, TimeUnit.MILLISECONDS);
-
+
if (config.isBackup())
{
serverSideReplicatingConnection = rc;
@@ -385,7 +385,7 @@
pingRunnable.setFuture(pingFuture);
- pingers.put(conn.getID(), pingRunnable);
+ pingers.put(conn.getID(), pingRunnable);
}
private RemotingConnection closeConnection(final Object connectionID)
@@ -393,12 +393,12 @@
RemotingConnection connection = connections.remove(connectionID);
Pinger pinger = pingers.remove(connectionID);
-
+
if (pinger != null)
{
pinger.close();
}
-
+
return connection;
}
@@ -441,7 +441,7 @@
public synchronized void run()
{
if (!gotInitialPing)
- {
+ {
// Never received initial ping
log.warn("Did not receive initial ping from " + conn.getRemoteAddress() + ", connection will be closed");
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2009-08-06 17:09:24 UTC (rev 7673)
@@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.Executor;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
@@ -53,6 +54,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.utils.OrderedExecutorFactory;
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.UUIDGenerator;
import org.jboss.messaging.utils.VersionLoader;
@@ -121,6 +123,8 @@
private final int transactionBatchSize;
private ClientSession initialSession;
+
+ private final Executor executor;
// Constructors ---------------------------------------------------------------------------------
@@ -141,6 +145,8 @@
this.clientID = clientID;
this.sessionFactory = sessionFactory;
+
+ this.executor = new OrderedExecutorFactory(sessionFactory.getThreadPool()).getExecutor();
uid = UUIDGenerator.getInstance().generateSimpleStringUUID();
@@ -543,11 +549,20 @@
if (exceptionListener != null)
{
- JMSException je = new JMSException(me.toString());
+ final JMSException je = new JMSException(me.toString());
je.initCause(me);
-
- exceptionListener.onException(je);
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ synchronized (exceptionListener)
+ {
+ exceptionListener.onException(je);
+ }
+ }
+ });
}
failed = true;
Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/FailureDeadlockTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/FailureDeadlockTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/FailureDeadlockTest.java 2009-08-06 17:09:24 UTC (rev 7673)
@@ -0,0 +1,191 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.jms.client.JBossSession;
+import org.jboss.messaging.jms.server.impl.JMSServerManagerImpl;
+import org.jboss.messaging.tests.integration.jms.server.management.NullInitialContext;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ *
+ * A FailureDeadlockTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class FailureDeadlockTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(FailureDeadlockTest.class);
+
+ private MessagingServer server;
+
+ private JMSServerManagerImpl jmsServer;
+
+ private JBossConnectionFactory cf1;
+
+ private JBossConnectionFactory cf2;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Configuration conf = new ConfigurationImpl();
+ conf.setSecurityEnabled(false);
+ conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+ server = Messaging.newMessagingServer(conf, false);
+ jmsServer = new JMSServerManagerImpl(server);
+ jmsServer.setContext(new NullInitialContext());
+ jmsServer.start();
+ cf1 = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ cf2 = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (server != null && server.isStarted())
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ server = null;
+
+ }
+
+ super.tearDown();
+ }
+
+ // https://jira.jboss.org/jira/browse/JBMESSAGING-1702
+ //Test that two failures concurrently executing and calling the same exception listener
+ //don't deadlock
+ public void testDeadlock() throws Exception
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ final Connection conn1 = cf1.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ RemotingConnection rc1 = ((ClientSessionImpl)((JBossSession)sess1).getCoreSession()).getConnection();
+
+ final Connection conn2 = cf2.createConnection();
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ RemotingConnection rc2 = ((ClientSessionImpl)((JBossSession)sess2).getCoreSession()).getConnection();
+
+ ExceptionListener listener1 = new ExceptionListener()
+ {
+ public void onException(JMSException exception)
+ {
+ try
+ {
+ conn2.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close connection2", e);
+ }
+ }
+ };
+
+ conn1.setExceptionListener(listener1);
+
+ conn2.setExceptionListener(listener1);
+
+ Failer f1 = new Failer(rc1);
+
+ Failer f2 = new Failer(rc2);
+
+ f1.start();
+
+ f2.start();
+
+ f1.join();
+
+ f2.join();
+
+ conn1.close();
+
+ conn2.close();
+ }
+ }
+
+ private class Failer extends Thread
+ {
+ RemotingConnection conn;
+
+ Failer(RemotingConnection conn)
+ {
+ this.conn = conn;
+ }
+
+ public void run()
+ {
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+ }
+ }
+
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1702
+ //Make sure that failing a connection removes it from the connection manager and can't be returned in a subsequent call
+ public void testUsingDeadConnection() throws Exception
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ final Connection conn1 = cf1.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ RemotingConnection rc1 = ((ClientSessionImpl)((JBossSession)sess1).getCoreSession()).getConnection();
+
+ rc1.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+
+ Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn1.close();
+ }
+ }
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/JBossConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/JBossConnectionFactoryTest.java 2009-08-06 09:27:50 UTC (rev 7672)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/JBossConnectionFactoryTest.java 2009-08-06 17:09:24 UTC (rev 7673)
@@ -58,15 +58,6 @@
{
private static final Logger log = Logger.getLogger(JBossConnectionFactoryTest.class);
- // private MessagingServer server;
- //
- // private JMSServerManagerImpl jmsServer;
- //
- // private JBossConnectionFactory cf;
- //
- // private static final String Q_NAME = "ConnectionTestQueue";
- //
-
private final String groupAddress = "230.1.2.3";
private final int groupPort = 8765;
More information about the jboss-cvs-commits
mailing list