[jboss-cvs] JBoss Messaging SVN: r7770 - in trunk: src/main/org/jboss/messaging/core/management and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 18 08:13:37 EDT 2009


Author: timfox
Date: 2009-08-18 08:13:36 -0400 (Tue, 18 Aug 2009)
New Revision: 7770

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/management/AcceptorControl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/AcceptorControlImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java
   trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/spi/Acceptor.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
fixed bridge stop timeout issue

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -807,6 +807,11 @@
             log.error("Failed to stop discovery group", e);
          }
       }
+      
+      for (ConnectionManager connectionManager : connectionManagerMap.values())
+      {
+         connectionManager.causeExit();
+      }
 
       connectionManagerMap.clear();
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -67,4 +67,6 @@
    void addFailureListener(FailureListener listener);
 
    boolean removeFailureListener(FailureListener listener);
+   
+   void causeExit();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -91,7 +91,7 @@
    // -----------------------------------------------------------------------------------
 
    private final ClientSessionFactory sessionFactory;
-   
+
    private final TransportConfiguration connectorConfig;
 
    private final TransportConfiguration backupConfig;
@@ -149,6 +149,8 @@
    private Future<?> pingerFuture;
 
    private PingRunnable pingRunnable;
+   
+   private volatile boolean exitLoop;
 
    // debug
 
@@ -184,7 +186,7 @@
                                 final ScheduledExecutorService scheduledThreadPool)
    {
       this.sessionFactory = sessionFactory;
-      
+
       this.connectorConfig = connectorConfig;
 
       this.backupConfig = backupConfig;
@@ -288,7 +290,11 @@
 
                   if (connection == null)
                   {
-                     // This can happen if the connection manager gets closed - e.g. the server gets shut down
+                     if (exitLoop)
+                     {                        
+                        return null;
+                     }
+                     // This can happen if the connection manager gets exitLoop - e.g. the server gets shut down
 
                      throw new MessagingException(MessagingException.NOT_CONNECTED,
                                                   "Unable to connect to server using configuration " + connectorConfig);
@@ -423,7 +429,7 @@
       // Should never get here
       throw new IllegalStateException("Oh my God it's full of stars!");
    }
-   
+
    // Must be synchronized to prevent it happening concurrently with failover which can lead to
    // inconsistencies
    public void removeSession(final ClientSessionInternal session)
@@ -434,7 +440,7 @@
          synchronized (failoverLock)
          {
             sessions.remove(session);
-            
+
             returnConnection(session.getConnection().getID());
          }
       }
@@ -459,6 +465,13 @@
    {
       return listeners.remove(listener);
    }
+   
+   
+   
+   public void causeExit()
+   {
+      exitLoop = true;
+   }
 
    // Public
    // ---------------------------------------------------------------------------------------
@@ -494,7 +507,7 @@
          {
             // We already failed over/reconnected - probably the first failure came in, all the connections were failed
             // over then a async connection exception or disconnect
-            // came in for one of the already closed connections, so we return true - we don't want to call the
+            // came in for one of the already exitLoop connections, so we return true - we don't want to call the
             // listeners again
 
             return;
@@ -525,9 +538,14 @@
          // It can then release the channel 1 lock, and retry (which will cause locking on failoverLock
          // until failover is complete
 
-         boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.DISCONNECTED);
+         boolean serverShutdown = me.getCode() == MessagingException.DISCONNECTED;
 
-         if (attemptFailover || reconnectAttempts != 0)
+         boolean attemptFailoverOrReconnect = (backupConnectorFactory != null || reconnectAttempts != 0)
+                                                && (failoverOnServerShutdown || !serverShutdown);
+         
+         log.info("Attempting failover or reconnect " + attemptFailoverOrReconnect);
+
+         if (attemptFailoverOrReconnect)
          {
             lockAllChannel1s();
 
@@ -576,7 +594,6 @@
                oldConnections.add(entry.connection);
             }
 
-
             connections.clear();
 
             refCount = 0;
@@ -593,7 +610,7 @@
 
             connector = null;
 
-            if (attemptFailover)
+            if (backupConnectorFactory != null)
             {
                // Now try failing over to backup
 
@@ -632,6 +649,8 @@
          }
          else
          {
+            log.info("Just closing connections and calling failure listeners");
+            
             closeConnectionsAndCallFailureListeners(me);
          }
       }
@@ -769,6 +788,11 @@
 
       while (true)
       {
+         if (exitLoop)
+         {
+            return null;
+         }
+         
          RemotingConnection connection = getConnection(initialRefCount);
 
          if (connection == null)
@@ -822,7 +846,7 @@
             pingRunnable.cancel();
 
             boolean ok = pingerFuture.cancel(false);
-            
+
             pingRunnable = null;
 
             pingerFuture = null;
@@ -853,13 +877,12 @@
          catch (Throwable ignore)
          {
          }
-         
 
          connector = null;
       }
 
    }
-   
+
    public RemotingConnection getConnection(final int initialRefCount)
    {
       RemotingConnection conn;
@@ -998,14 +1021,14 @@
       {
          refCount--;
       }
-      
+
       if (entry != null)
       {
          checkCloseConnections();
       }
       else
       {
-         // Can be legitimately null if session was closed before then went to remove session from csf
+         // Can be legitimately null if session was exitLoop before then went to remove session from csf
          // and locked since failover had started then after failover removes it but it's already been failed
       }
    }
@@ -1082,6 +1105,7 @@
 
          if (type == PacketImpl.DISCONNECT)
          {
+            log.info("Got a disconnect message");
             threadPool.execute(new Runnable()
             {
                // Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1089,10 +1113,10 @@
                public void run()
                {
                   conn.fail(new MessagingException(MessagingException.DISCONNECTED,
-                                                   "The connection was closed by the server"));
+                                                   "The connection was exitLoop by the server"));
                }
             });
-         }         
+         }
       }
    }
 
@@ -1192,26 +1216,26 @@
          }
       }
    }
-   
+
    private static final class ActualScheduled implements Runnable
    {
       private final WeakReference<PingRunnable> pingRunnable;
-      
+
       ActualScheduled(final PingRunnable runnable)
       {
          this.pingRunnable = new WeakReference<PingRunnable>(runnable);
       }
-      
+
       public void run()
       {
          PingRunnable runnable = pingRunnable.get();
-         
+
          if (runnable != null)
          {
             runnable.run();
          }
       }
-      
+
    }
 
    private final class PingRunnable implements Runnable
@@ -1219,14 +1243,14 @@
       private boolean cancelled;
 
       private boolean first;
-      
+
       public synchronized void run()
       {
          if (cancelled || (stopPingingAfterOne && !first))
          {
             return;
          }
-         
+
          first = false;
 
          synchronized (connections)
@@ -1236,7 +1260,7 @@
             for (ConnectionEntry entry : connections.values())
             {
                final RemotingConnection connection = entry.connection;
-               
+
                if (entry.expiryPeriod != -1 && now >= entry.lastCheck + entry.expiryPeriod)
                {
                   if (!connection.checkDataReceived())
@@ -1252,7 +1276,7 @@
                            connection.fail(me);
                         }
                      });
-                     
+
                      return;
                   }
                   else
@@ -1260,7 +1284,7 @@
                      entry.lastCheck = now;
                   }
                }
-               
+
                // Send a ping
 
                Ping ping = new Ping(connectionTTL);
@@ -1271,7 +1295,7 @@
             }
          }
       }
-      
+
       public synchronized void cancel()
       {
          cancelled = true;

Modified: trunk/src/main/org/jboss/messaging/core/management/AcceptorControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/AcceptorControl.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/management/AcceptorControl.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -37,4 +37,8 @@
    String getFactoryClassName();
 
    Map<String, Object> getParameters();
+   
+   void pause() throws Exception;
+   
+   void resume() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/AcceptorControlImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/AcceptorControlImpl.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/AcceptorControlImpl.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -83,20 +83,20 @@
       acceptor.start();
    }
    
-//   public void pause()
-//   {
-//      acceptor.pause();
-//   }
-//   
-//   public void resume()
-//   {
-//      acceptor.resume();
-//   }
-
+   public void pause()
+   {
+      acceptor.pause();
+   }
+   
    public void stop() throws Exception
    {
       acceptor.stop();
    }
+   
+   public void resume() throws Exception
+   {
+      acceptor.resume();
+   }
 
    // Public --------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -57,6 +57,8 @@
    private volatile boolean started;
 
    private final ExecutorFactory executorFactory;
+   
+   private boolean paused;
 
    public InVMAcceptor(final Map<String, Object> configuration,
                        final BufferHandler handler,
@@ -82,6 +84,8 @@
       InVMRegistry.instance.registerAcceptor(id, this);
 
       started = true;
+      
+      paused = false;
    }
 
    public synchronized void stop()
@@ -91,7 +95,10 @@
          return;
       }
 
-      InVMRegistry.instance.unregisterAcceptor(id);
+      if (!paused)
+      {
+         InVMRegistry.instance.unregisterAcceptor(id);
+      }
 
       for (Connection connection : connections.values())
       {
@@ -101,12 +108,41 @@
       connections.clear();
 
       started = false;
+      
+      paused = false;
    }
 
-   public boolean isStarted()
+   public synchronized boolean isStarted()
    {
       return started;
    }
+   
+   /*
+    * Stop accepting new connections
+    */
+   public synchronized void pause()
+   {      
+      if (!started || paused)
+      {
+         return;
+      }
+      
+      InVMRegistry.instance.unregisterAcceptor(id);   
+      
+      paused = true;
+   }
+   
+   public synchronized void resume()
+   {
+      if (!paused || !started)
+      {
+         return;
+      }
+      
+      InVMRegistry.instance.registerAcceptor(id, this);
+      
+      paused = false;
+   }
 
    public BufferHandler getHandler()
    {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -230,18 +230,25 @@
       }
 
       failureCheckThread.close();
-
-      // We need to stop them accepting first so no new connections are accepted after we send the disconnect message
+      
+     // We need to stop them accepting first so no new connections are accepted after we send the disconnect message
       for (Acceptor acceptor : acceptors)
       {
-         acceptor.stop();
+         acceptor.pause();
       }
 
+      log.info("there are " + connections.size() + " connections to close on server close");
       for (ConnectionEntry entry : connections.values())
       {
+         log.info("sending disconnect message");
          entry.connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
       }
-
+           
+      for (Acceptor acceptor : acceptors)
+      {
+         acceptor.stop();
+      }
+     
       acceptors.clear();
 
       connections.clear();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/spi/Acceptor.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/remoting/spi/Acceptor.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -33,4 +33,7 @@
  */
 public interface Acceptor extends MessagingComponent
 {
+   void pause();
+   
+   void resume();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -277,6 +277,8 @@
 
    public void stop() throws Exception
    {
+      log.info("Stopping bridge " + name);
+      
       if (started)
       {
          // We need to stop the csf here otherwise the stop runnable never runs since the createobjectsrunnable is
@@ -306,6 +308,8 @@
             log.warn("unable to send notification when broadcast group is stopped", e);
          }
       }
+      log.info("Stopped bridge " + name);
+      
    }
 
    public boolean isStarted()
@@ -510,6 +514,8 @@
 
    public void connectionFailed(final MessagingException me)
    {
+      log.info("bridge " + name + " failed " + me);
+      
       fail();
    }
 
@@ -535,7 +541,7 @@
    }
 
    private void fail()
-   {
+   {      
       if (started)
       {
          executor.execute(new FailRunnable());
@@ -757,6 +763,8 @@
          {
             log.error("Failed to stop", e);
          }
+         
+         log.info("Bridge " + name + " closed objects");
 
          if (!createObjects())
          {

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -246,8 +246,7 @@
    }
 
    public synchronized void stop() throws Exception
-   {
-      if (!started)
+   {      if (!started)
       {
          return;
       }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -303,7 +303,7 @@
 
       startServerChannels();
 
-      // paused = false;
+      paused = false;
    }
 
    private void startServerChannels()
@@ -364,6 +364,8 @@
       }
 
       connections.clear();
+      
+      paused = false;
    }
 
    public boolean isStarted()
@@ -371,6 +373,47 @@
       return (channelFactory != null);
    }
 
+   private boolean paused;
+
+   public synchronized void pause()
+   {
+      if (paused)
+      {
+         return;
+      }
+
+      if (channelFactory == null)
+      {
+         return;
+      }
+
+      // We *pause* the acceptor so no new connections are made
+
+      serverChannelGroup.close().awaitUninterruptibly();
+
+      try
+      {
+         Thread.sleep(500);
+      }
+      catch (Exception e)
+      {
+      }
+
+      paused = true;
+   }
+
+   public synchronized void resume()
+   {
+      if (!paused)
+      {
+         return;
+      }
+
+      startServerChannels();
+
+      paused = false;
+   }
+
    // Inner classes -----------------------------------------------------------------------------
 
    @ChannelPipelineCoverage("one")

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java	2009-08-18 09:21:52 UTC (rev 7769)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java	2009-08-18 12:13:36 UTC (rev 7770)
@@ -80,6 +80,8 @@
    public void testRedistributionWhenConsumerIsClosed() throws Exception
    {
       setupCluster(false);
+      
+      log.info("Doing test");
 
       startServers(0, 1, 2);
 
@@ -112,6 +114,8 @@
       removeConsumer(1);
 
       verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
+      
+      log.info("Test done");
    }
 
    public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception




More information about the jboss-cvs-commits mailing list