[jboss-cvs] JBoss Messaging SVN: r7198 - in trunk: src/main/org/jboss/messaging/core/remoting/server/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 4 14:04:05 EDT 2009


Author: timfox
Date: 2009-06-04 14:04:04 -0400 (Thu, 04 Jun 2009)
New Revision: 7198

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
Log:
ping changes part 2

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-06-04 18:04:04 UTC (rev 7198)
@@ -871,6 +871,11 @@
 
       updateConnectionManagerArray();
    }
+   
+   public ConnectionManager[] getConnectionManagers()
+   {
+      return connectionManagerArray;
+   }
 
    // Protected ------------------------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java	2009-06-04 18:04:04 UTC (rev 7198)
@@ -24,6 +24,7 @@
 
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.RemotingConnection;
 
 /**
  * A ConnectionManager
@@ -60,4 +61,6 @@
    int numSessions();
    
    void close();
+   
+   RemotingConnection getConnection(final int initialRefCount);
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-06-04 18:04:04 UTC (rev 7198)
@@ -146,7 +146,7 @@
    private boolean inFailoverOrReconnect;
    
    private Connector connector;
-   
+        
    private Map<Object, FailedConnectionRunnable> failRunnables = new ConcurrentHashMap<Object, FailedConnectionRunnable>();
 
    private Map<Object, Pinger> pingRunnables = new ConcurrentHashMap<Object, Pinger>();
@@ -164,8 +164,6 @@
       debugConns = new ConcurrentHashMap<TransportConfiguration, Set<RemotingConnection>>();
    }
    
-   public static boolean schedulePingersOneShot = false;
-   
    // Static
    // ---------------------------------------------------------------------------------------
 
@@ -458,6 +456,13 @@
 
    // Public
    // ---------------------------------------------------------------------------------------
+   
+   public void cancelPingerForConnectionID(final Object connectionID)
+   {
+      Pinger pinger = pingRunnables.get(connectionID);
+      
+      pinger.close();
+   }
 
    // Protected
    // ------------------------------------------------------------------------------------
@@ -839,7 +844,7 @@
       }
    }
 
-   private RemotingConnection getConnection(final int initialRefCount)
+   public RemotingConnection getConnection(final int initialRefCount)
    {
       RemotingConnection conn;
 
@@ -941,17 +946,8 @@
          {
             Pinger pinger = new Pinger(conn);
             
-            Future<?> pingerFuture;
-            
-            if (schedulePingersOneShot)
-            {
-               pingerFuture = scheduledThreadPool.schedule(pinger, connectionTTL / 2, TimeUnit.MILLISECONDS);
-            }
-            else
-            {
-               pingerFuture = scheduledThreadPool.scheduleAtFixedRate(pinger, connectionTTL / 2, connectionTTL / 2, TimeUnit.MILLISECONDS);
-            }
-                        
+            Future<?> pingerFuture = scheduledThreadPool.scheduleAtFixedRate(pinger, connectionTTL / 2, connectionTTL / 2, TimeUnit.MILLISECONDS);
+                                   
             pinger.setFuture(pingerFuture);
             
             pingRunnables.put(conn.getID(), pinger);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-06-04 18:04:04 UTC (rev 7198)
@@ -97,9 +97,6 @@
 
    private Map<Object, Pinger> pingRunnables = new ConcurrentHashMap<Object, Pinger>();
 
-   // For debug
-   public static boolean schedulePingersOneShot;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -355,6 +352,13 @@
 
    // Public --------------------------------------------------------
 
+   public void cancelPingerForConnectionID(final Object connectionID)
+   {
+      Pinger pinger = pingRunnables.get(connectionID);
+      
+      pinger.close();
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -397,17 +401,8 @@
       {
          Pinger pingRunnable = new Pinger(conn);
 
-         Future<?> pingFuture;
+         Future<?> pingFuture = scheduledThreadPool.scheduleAtFixedRate(pingRunnable, 0, pingPeriod, TimeUnit.MILLISECONDS);         
 
-         if (schedulePingersOneShot)
-         {
-            pingFuture = scheduledThreadPool.schedule(pingRunnable, 0, TimeUnit.MILLISECONDS);
-         }
-         else
-         {
-            pingFuture = scheduledThreadPool.scheduleAtFixedRate(pingRunnable, 0, pingPeriod, TimeUnit.MILLISECONDS);
-         }
-
          pingRunnable.setFuture(pingFuture);
 
          pingRunnables.put(conn.getID(), pingRunnable);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-06-04 18:04:04 UTC (rev 7198)
@@ -31,6 +31,8 @@
 import javax.management.MBeanServer;
 
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ConnectionManager;
+import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.DivertConfiguration;
@@ -67,15 +69,11 @@
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
 import org.jboss.messaging.core.remoting.server.RemotingService;
 import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
-import org.jboss.messaging.core.remoting.spi.Connection;
-import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
 import org.jboss.messaging.core.security.CheckType;
 import org.jboss.messaging.core.security.JBMSecurityManager;
 import org.jboss.messaging.core.security.Role;
@@ -187,16 +185,10 @@
 
    private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
 
-   private ConnectorFactory backupConnectorFactory;
-
-   private Map<String, Object> backupConnectorParams;
-
    private RemotingConnection replicatingConnection;
 
    private Channel replicatingChannel;
 
-   private Object replicatingChannelLock = new Object();
-
    private final Object initialiseLock = new Object();
 
    private boolean initialised;
@@ -342,6 +334,8 @@
 
          replicatingConnection = null;
          replicatingChannel = null;
+         
+         replicatingConnectionManager.close();
       }
 
       resourceManager.stop();
@@ -618,66 +612,8 @@
       }
    }
 
-   public Channel getReplicatingChannel()
-   {
-      synchronized (replicatingChannelLock)
-      {
-         if (replicatingChannel == null && backupConnectorFactory != null)
-         {
-            NoCacheConnectionLifeCycleListener listener = new NoCacheConnectionLifeCycleListener();
+   private ConnectionManager replicatingConnectionManager;
 
-            replicatingConnection = (RemotingConnectionImpl)RemotingConnectionImpl.createConnection(backupConnectorFactory,
-                                                                                                    backupConnectorParams,
-                                                                                                    ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
-                                                                                                    threadPool,
-                                                                                                    listener);
-
-            if (replicatingConnection == null)
-            {
-               return null;
-            }
-
-            listener.conn = replicatingConnection;
-
-            replicatingChannel = replicatingConnection.getChannel(2, -1, false);
-
-            replicatingConnection.addFailureListener(new FailureListener()
-            {
-               public boolean connectionFailed(MessagingException me)
-               {
-                  replicatingChannel.executeOutstandingDelayedResults();
-
-                  return true;
-               }
-            });
-
-            // First time we get channel we send a message down it informing the backup of our node id -
-            // backup and live must have the same node id
-
-            Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
-
-            final Future future = new Future();
-
-            replicatingChannel.replicatePacket(packet, 1, new Runnable()
-            {
-               public void run()
-               {
-                  future.run();
-               }
-            });
-
-            boolean ok = future.await(10000);
-
-            if (!ok)
-            {
-               throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
-            }
-         }
-      }
-
-      return replicatingChannel;
-   }
-
    public MessagingServerControl getMessagingServerControl()
    {
       return messagingServerControl;
@@ -1087,6 +1023,11 @@
       }
    }
 
+   public Channel getReplicatingChannel()
+   {
+      return replicatingChannel;
+   }
+
    private boolean setupReplicatingConnection() throws Exception
    {
       String backupConnectorName = configuration.getBackupConnectorName();
@@ -1101,36 +1042,67 @@
          }
          else
          {
+            replicatingConnectionManager = new ConnectionManagerImpl(backupConnector,
+                                                                     null,
+                                                                     false,
+                                                                     1,
+                                                                     ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+                                                                     ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                                                     ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD * 3,
+                                                                     0,
+                                                                     1.0d,
+                                                                     0,
+                                                                     threadPool,
+                                                                     scheduledPool);
 
-            ClassLoader loader = Thread.currentThread().getContextClassLoader();
-            try
+            replicatingConnection = replicatingConnectionManager.getConnection(1);
+
+            if (replicatingConnection != null)
             {
-               Class<?> clz = loader.loadClass(backupConnector.getFactoryClassName());
-               backupConnectorFactory = (ConnectorFactory)clz.newInstance();
+               replicatingChannel = replicatingConnection.getChannel(2, -1, false);
+
+               replicatingConnection.addFailureListener(new FailureListener()
+               {
+                  public boolean connectionFailed(MessagingException me)
+                  {
+                     replicatingChannel.executeOutstandingDelayedResults();
+
+                     return true;
+                  }
+               });
+
+               // First time we get channel we send a message down it informing the backup of our node id -
+               // backup and live must have the same node id
+
+               Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
+
+               final Future future = new Future();
+
+               replicatingChannel.replicatePacket(packet, 1, new Runnable()
+               {
+                  public void run()
+                  {
+                     future.run();
+                  }
+               });
+
+               boolean ok = future.await(10000);
+
+               if (!ok)
+               {
+                  throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
+               }
             }
-            catch (Exception e)
+            else
             {
-               throw new IllegalArgumentException("Error instantiating interceptor \"" + backupConnector.getFactoryClassName() +
-                                                           "\"",
-                                                  e);
-            }
+               log.warn("Backup server MUST be started before live server. Initialisation will proceed.");
 
-            backupConnectorParams = backupConnector.getParams();
+               return false; 
+            }
          }
       }
 
-      Channel replicatingChannel = getReplicatingChannel();
-
-      if (replicatingChannel == null && backupConnectorFactory != null)
-      {
-         log.warn("Backup server MUST be started before live server. Initialisation will proceed.");
-
-         return false;
-      }
-      else
-      {
-         return true;
-      }
+      return true;      
    }
 
    private void loadJournal() throws Exception
@@ -1450,38 +1422,4 @@
       }
    }
 
-   private class NoCacheConnectionLifeCycleListener implements ConnectionLifeCycleListener
-   {
-      private RemotingConnection conn;
-
-      public void connectionCreated(final Connection connection)
-      {
-      }
-
-      public void connectionDestroyed(final Object connectionID)
-      {
-         if (conn != null)
-         {
-            conn.destroy();
-         }
-      }
-
-      public void connectionException(final Object connectionID, final MessagingException me)
-      {
-         backupConnectorFactory = null;
-
-         if (conn != null)
-         {
-            // Execute on different thread to avoid deadlocks
-            threadPool.execute(new Runnable()
-            {
-               public void run()
-               {
-                  conn.fail(me);
-               }
-            });
-         }
-      }
-   }
-
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java	2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java	2009-06-04 18:04:04 UTC (rev 7198)
@@ -29,6 +29,7 @@
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
 import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -76,7 +77,7 @@
    {
       final long clientFailureCheckPeriod = 500;
 
-      ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactoryImpl sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
                                                                       
       sf1.setClientFailureCheckPeriod(clientFailureCheckPeriod);
       sf1.setConnectionTTL((long)(clientFailureCheckPeriod * 1.5));      
@@ -106,7 +107,7 @@
 
       final RemotingConnectionImpl conn1 = (RemotingConnectionImpl)((ClientSessionImpl)session1).getConnection();
 
-      //conn1.stopPingingAfterOne();
+      ((ConnectionManagerImpl)sf1.getConnectionManagers()[0]).cancelPingerForConnectionID(conn1.getID());
 
       Thread.sleep(3 * clientFailureCheckPeriod);
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java	2009-06-04 16:45:19 UTC (rev 7197)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java	2009-06-04 18:04:04 UTC (rev 7198)
@@ -22,24 +22,21 @@
 
 package org.jboss.messaging.tests.integration.remoting;
 
-import static org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl.schedulePingersOneShot;
-
 import java.util.Set;
 
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionInternal;
 import org.jboss.messaging.core.client.impl.ConnectionManagerImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.tests.util.ServiceTestBase;
@@ -223,9 +220,6 @@
 
       ClientSessionFactoryImpl csf = new ClientSessionFactoryImpl(transportConfig);
       
-      //We want to make sure only ping is sent
-      ConnectionManagerImpl.schedulePingersOneShot = true;
-      
       csf.setClientFailureCheckPeriod(PING_INTERVAL);
       csf.setConnectionTTL((long)(PING_INTERVAL * 1.5));
       
@@ -237,6 +231,12 @@
 
       session.addFailureListener(clientListener);
 
+      RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
+
+      // We need to get it to stop pinging
+      
+      ((ConnectionManagerImpl)csf.getConnectionManagers()[0]).cancelPingerForConnectionID(conn.getID());
+            
       RemotingConnection serverConn = null;
 
       while (serverConn == null)
@@ -281,52 +281,17 @@
       assertNotNull(serverListener.getException());
 
       session.close();
-      
-      ConnectionManagerImpl.schedulePingersOneShot = false;
    }
 
    /*
    * Test the client triggering failure due to no pong received in time
    */
-   public void testClientFailureNoServerPing() throws Exception
-   {
-      Interceptor noPongInterceptor = new Interceptor()
-      {
-         boolean allowPing = true;
-         
-         public boolean intercept(Packet packet, RemotingConnection conn) throws MessagingException
-         {
-            log.info("In interceptor, packet is " + packet.getType());
-            if (packet.getType() == PacketImpl.PING)
-            {
-               if (allowPing)
-               {
-                  log.info("allow 1 ping");
-                  allowPing = false;
-                  return true;
-               }
-               else
-               {
-                  log.info("Ignoring Ping packet.. it will be dropped");
-                  return false;
-               }
-            }
-            else
-            {
-               return true;
-            }
-         }
-      };
-
-      server.getRemotingService().addInterceptor(noPongInterceptor);
-
+   public void testClientFailureNoPong() throws Exception
+   {      
       TransportConfiguration transportConfig = new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory");
 
       ClientSessionFactory csf = new ClientSessionFactoryImpl(transportConfig);
       
-      //We want to make sure only one server->client ping is sent
-      RemotingServiceImpl.schedulePingersOneShot = true;
-      
       csf.setClientFailureCheckPeriod(PING_INTERVAL);
       csf.setConnectionTTL((long)(PING_INTERVAL * 1.5));
 
@@ -353,25 +318,20 @@
             Thread.sleep(10);
          }
       }
-
+      
       Listener serverListener = new Listener();
 
       serverConn.addFailureListener(serverListener);
-
+      
+      ((RemotingServiceImpl)server.getRemotingService()).cancelPingerForConnectionID(serverConn.getID());
+      
       Thread.sleep(3 * PING_INTERVAL);
       
       assertNotNull(clientListener.getException());
-
-      // We receive an exception on the server in this case too
-      assertNotNull(serverListener.getException());
-
+      
       assertEquals(0, server.getRemotingService().getConnections().size());
 
-      server.getRemotingService().removeInterceptor(noPongInterceptor);
-
       session.close();
-      
-      RemotingServiceImpl.schedulePingersOneShot = false;
    }
 
    // Package protected ---------------------------------------------




More information about the jboss-cvs-commits mailing list