[jboss-cvs] JBoss Messaging SVN: r7770 - in trunk: src/main/org/jboss/messaging/core/management and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 18 08:13:37 EDT 2009
Author: timfox
Date: 2009-08-18 08:13:36 -0400 (Tue, 18 Aug 2009)
New Revision: 7770
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/management/AcceptorControl.java
trunk/src/main/org/jboss/messaging/core/management/impl/AcceptorControlImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/spi/Acceptor.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
fixed bridge stop timeout issue
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -807,6 +807,11 @@
log.error("Failed to stop discovery group", e);
}
}
+
+ for (ConnectionManager connectionManager : connectionManagerMap.values())
+ {
+ connectionManager.causeExit();
+ }
connectionManagerMap.clear();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -67,4 +67,6 @@
void addFailureListener(FailureListener listener);
boolean removeFailureListener(FailureListener listener);
+
+ void causeExit();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -91,7 +91,7 @@
// -----------------------------------------------------------------------------------
private final ClientSessionFactory sessionFactory;
-
+
private final TransportConfiguration connectorConfig;
private final TransportConfiguration backupConfig;
@@ -149,6 +149,8 @@
private Future<?> pingerFuture;
private PingRunnable pingRunnable;
+
+ private volatile boolean exitLoop;
// debug
@@ -184,7 +186,7 @@
final ScheduledExecutorService scheduledThreadPool)
{
this.sessionFactory = sessionFactory;
-
+
this.connectorConfig = connectorConfig;
this.backupConfig = backupConfig;
@@ -288,7 +290,11 @@
if (connection == null)
{
- // This can happen if the connection manager gets closed - e.g. the server gets shut down
+ if (exitLoop)
+ {
+ return null;
+ }
+ // This can happen if the connection manager gets exitLoop - e.g. the server gets shut down
throw new MessagingException(MessagingException.NOT_CONNECTED,
"Unable to connect to server using configuration " + connectorConfig);
@@ -423,7 +429,7 @@
// Should never get here
throw new IllegalStateException("Oh my God it's full of stars!");
}
-
+
// Must be synchronized to prevent it happening concurrently with failover which can lead to
// inconsistencies
public void removeSession(final ClientSessionInternal session)
@@ -434,7 +440,7 @@
synchronized (failoverLock)
{
sessions.remove(session);
-
+
returnConnection(session.getConnection().getID());
}
}
@@ -459,6 +465,13 @@
{
return listeners.remove(listener);
}
+
+
+
+ public void causeExit()
+ {
+ exitLoop = true;
+ }
// Public
// ---------------------------------------------------------------------------------------
@@ -494,7 +507,7 @@
{
// We already failed over/reconnected - probably the first failure came in, all the connections were failed
// over then a async connection exception or disconnect
- // came in for one of the already closed connections, so we return true - we don't want to call the
+ // came in for one of the already exitLoop connections, so we return true - we don't want to call the
// listeners again
return;
@@ -525,9 +538,14 @@
// It can then release the channel 1 lock, and retry (which will cause locking on failoverLock
// until failover is complete
- boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.DISCONNECTED);
+ boolean serverShutdown = me.getCode() == MessagingException.DISCONNECTED;
- if (attemptFailover || reconnectAttempts != 0)
+ boolean attemptFailoverOrReconnect = (backupConnectorFactory != null || reconnectAttempts != 0)
+ && (failoverOnServerShutdown || !serverShutdown);
+
+ log.info("Attempting failover or reconnect " + attemptFailoverOrReconnect);
+
+ if (attemptFailoverOrReconnect)
{
lockAllChannel1s();
@@ -576,7 +594,6 @@
oldConnections.add(entry.connection);
}
-
connections.clear();
refCount = 0;
@@ -593,7 +610,7 @@
connector = null;
- if (attemptFailover)
+ if (backupConnectorFactory != null)
{
// Now try failing over to backup
@@ -632,6 +649,8 @@
}
else
{
+ log.info("Just closing connections and calling failure listeners");
+
closeConnectionsAndCallFailureListeners(me);
}
}
@@ -769,6 +788,11 @@
while (true)
{
+ if (exitLoop)
+ {
+ return null;
+ }
+
RemotingConnection connection = getConnection(initialRefCount);
if (connection == null)
@@ -822,7 +846,7 @@
pingRunnable.cancel();
boolean ok = pingerFuture.cancel(false);
-
+
pingRunnable = null;
pingerFuture = null;
@@ -853,13 +877,12 @@
catch (Throwable ignore)
{
}
-
connector = null;
}
}
-
+
public RemotingConnection getConnection(final int initialRefCount)
{
RemotingConnection conn;
@@ -998,14 +1021,14 @@
{
refCount--;
}
-
+
if (entry != null)
{
checkCloseConnections();
}
else
{
- // Can be legitimately null if session was closed before then went to remove session from csf
+ // Can be legitimately null if session was exitLoop before then went to remove session from csf
// and locked since failover had started then after failover removes it but it's already been failed
}
}
@@ -1082,6 +1105,7 @@
if (type == PacketImpl.DISCONNECT)
{
+ log.info("Got a disconnect message");
threadPool.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1089,10 +1113,10 @@
public void run()
{
conn.fail(new MessagingException(MessagingException.DISCONNECTED,
- "The connection was closed by the server"));
+ "The connection was exitLoop by the server"));
}
});
- }
+ }
}
}
@@ -1192,26 +1216,26 @@
}
}
}
-
+
private static final class ActualScheduled implements Runnable
{
private final WeakReference<PingRunnable> pingRunnable;
-
+
ActualScheduled(final PingRunnable runnable)
{
this.pingRunnable = new WeakReference<PingRunnable>(runnable);
}
-
+
public void run()
{
PingRunnable runnable = pingRunnable.get();
-
+
if (runnable != null)
{
runnable.run();
}
}
-
+
}
private final class PingRunnable implements Runnable
@@ -1219,14 +1243,14 @@
private boolean cancelled;
private boolean first;
-
+
public synchronized void run()
{
if (cancelled || (stopPingingAfterOne && !first))
{
return;
}
-
+
first = false;
synchronized (connections)
@@ -1236,7 +1260,7 @@
for (ConnectionEntry entry : connections.values())
{
final RemotingConnection connection = entry.connection;
-
+
if (entry.expiryPeriod != -1 && now >= entry.lastCheck + entry.expiryPeriod)
{
if (!connection.checkDataReceived())
@@ -1252,7 +1276,7 @@
connection.fail(me);
}
});
-
+
return;
}
else
@@ -1260,7 +1284,7 @@
entry.lastCheck = now;
}
}
-
+
// Send a ping
Ping ping = new Ping(connectionTTL);
@@ -1271,7 +1295,7 @@
}
}
}
-
+
public synchronized void cancel()
{
cancelled = true;
Modified: trunk/src/main/org/jboss/messaging/core/management/AcceptorControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/AcceptorControl.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/management/AcceptorControl.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -37,4 +37,8 @@
String getFactoryClassName();
Map<String, Object> getParameters();
+
+ void pause() throws Exception;
+
+ void resume() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/AcceptorControlImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/AcceptorControlImpl.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/AcceptorControlImpl.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -83,20 +83,20 @@
acceptor.start();
}
-// public void pause()
-// {
-// acceptor.pause();
-// }
-//
-// public void resume()
-// {
-// acceptor.resume();
-// }
-
+ public void pause()
+ {
+ acceptor.pause();
+ }
+
public void stop() throws Exception
{
acceptor.stop();
}
+
+ public void resume() throws Exception
+ {
+ acceptor.resume();
+ }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -57,6 +57,8 @@
private volatile boolean started;
private final ExecutorFactory executorFactory;
+
+ private boolean paused;
public InVMAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
@@ -82,6 +84,8 @@
InVMRegistry.instance.registerAcceptor(id, this);
started = true;
+
+ paused = false;
}
public synchronized void stop()
@@ -91,7 +95,10 @@
return;
}
- InVMRegistry.instance.unregisterAcceptor(id);
+ if (!paused)
+ {
+ InVMRegistry.instance.unregisterAcceptor(id);
+ }
for (Connection connection : connections.values())
{
@@ -101,12 +108,41 @@
connections.clear();
started = false;
+
+ paused = false;
}
- public boolean isStarted()
+ public synchronized boolean isStarted()
{
return started;
}
+
+ /*
+ * Stop accepting new connections
+ */
+ public synchronized void pause()
+ {
+ if (!started || paused)
+ {
+ return;
+ }
+
+ InVMRegistry.instance.unregisterAcceptor(id);
+
+ paused = true;
+ }
+
+ public synchronized void resume()
+ {
+ if (!paused || !started)
+ {
+ return;
+ }
+
+ InVMRegistry.instance.registerAcceptor(id, this);
+
+ paused = false;
+ }
public BufferHandler getHandler()
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -230,18 +230,25 @@
}
failureCheckThread.close();
-
- // We need to stop them accepting first so no new connections are accepted after we send the disconnect message
+
+ // We need to stop them accepting first so no new connections are accepted after we send the disconnect message
for (Acceptor acceptor : acceptors)
{
- acceptor.stop();
+ acceptor.pause();
}
+ log.info("there are " + connections.size() + " connections to close on server close");
for (ConnectionEntry entry : connections.values())
{
+ log.info("sending disconnect message");
entry.connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
}
-
+
+ for (Acceptor acceptor : acceptors)
+ {
+ acceptor.stop();
+ }
+
acceptors.clear();
connections.clear();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/spi/Acceptor.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/remoting/spi/Acceptor.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -33,4 +33,7 @@
*/
public interface Acceptor extends MessagingComponent
{
+ void pause();
+
+ void resume();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -277,6 +277,8 @@
public void stop() throws Exception
{
+ log.info("Stopping bridge " + name);
+
if (started)
{
// We need to stop the csf here otherwise the stop runnable never runs since the createobjectsrunnable is
@@ -306,6 +308,8 @@
log.warn("unable to send notification when broadcast group is stopped", e);
}
}
+ log.info("Stopped bridge " + name);
+
}
public boolean isStarted()
@@ -510,6 +514,8 @@
public void connectionFailed(final MessagingException me)
{
+ log.info("bridge " + name + " failed " + me);
+
fail();
}
@@ -535,7 +541,7 @@
}
private void fail()
- {
+ {
if (started)
{
executor.execute(new FailRunnable());
@@ -757,6 +763,8 @@
{
log.error("Failed to stop", e);
}
+
+ log.info("Bridge " + name + " closed objects");
if (!createObjects())
{
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -246,8 +246,7 @@
}
public synchronized void stop() throws Exception
- {
- if (!started)
+ { if (!started)
{
return;
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -303,7 +303,7 @@
startServerChannels();
- // paused = false;
+ paused = false;
}
private void startServerChannels()
@@ -364,6 +364,8 @@
}
connections.clear();
+
+ paused = false;
}
public boolean isStarted()
@@ -371,6 +373,47 @@
return (channelFactory != null);
}
+ private boolean paused;
+
+ public synchronized void pause()
+ {
+ if (paused)
+ {
+ return;
+ }
+
+ if (channelFactory == null)
+ {
+ return;
+ }
+
+ // We *pause* the acceptor so no new connections are made
+
+ serverChannelGroup.close().awaitUninterruptibly();
+
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch (Exception e)
+ {
+ }
+
+ paused = true;
+ }
+
+ public synchronized void resume()
+ {
+ if (!paused)
+ {
+ return;
+ }
+
+ startServerChannels();
+
+ paused = false;
+ }
+
// Inner classes -----------------------------------------------------------------------------
@ChannelPipelineCoverage("one")
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java 2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java 2009-08-18 12:13:36 UTC (rev 7770)
@@ -80,6 +80,8 @@
public void testRedistributionWhenConsumerIsClosed() throws Exception
{
setupCluster(false);
+
+ log.info("Doing test");
startServers(0, 1, 2);
@@ -112,6 +114,8 @@
removeConsumer(1);
verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
+
+ log.info("Test done");
}
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
More information about the jboss-cvs-commits
mailing list