[jboss-cvs] JBoss Messaging SVN: r5477 - in trunk: src/main/org/jboss/messaging/core/cluster/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Dec 8 10:40:33 EST 2008


Author: timfox
Date: 2008-12-08 10:40:33 -0500 (Mon, 08 Dec 2008)
New Revision: 5477

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/AutomaticFailoverWithDiscoveryTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
Log:
More failover/discovery tests


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-12-08 14:22:21 UTC (rev 5476)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-12-08 15:40:33 UTC (rev 5477)
@@ -51,9 +51,9 @@
    public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = "org.jboss.messaging.core.client.impl.RoundRobinConnectionLoadBalancingPolicy";
 
    public static final long DEFAULT_PING_PERIOD = 5000;
-   
-   //5 minutes - normally this should be much higher than ping period, this allows clients to re-attach on live
-   //or backup without fear of session having already been closed when connection times out.
+
+   // 5 minutes - normally this should be much higher than ping period, this allows clients to re-attach on live
+   // or backup without fear of session having already been closed when connection times out.
    public static final long DEFAULT_CONNECTION_TTL = 5 * 60000;
 
    // Any message beyond this size is considered a large message (to be sent in chunks)
@@ -101,7 +101,7 @@
    private ConnectionManager[] connectionManagerArray;
 
    private final long pingPeriod;
-   
+
    private final long connectionTTL;
 
    private final long callTimeout;
@@ -137,7 +137,7 @@
 
    private final DiscoveryGroup discoveryGroup;
 
-   private boolean receivedBroadcast = false;
+   private volatile boolean receivedBroadcast = false;
 
    private final long initialWaitTimeout;
 
@@ -224,23 +224,15 @@
                                    final long retryInterval,
                                    final double retryIntervalMultiplier,
                                    final int maxRetriesBeforeFailover,
-                                   final int maxRetriesAfterFailover) throws MessagingException
+                                   final int maxRetriesAfterFailover) throws Exception
    {
-      try
-      {
-         InetAddress groupAddress = InetAddress.getByName(discoveryGroupName);
+      InetAddress groupAddress = InetAddress.getByName(discoveryGroupName);
 
-         discoveryGroup = new DiscoveryGroupImpl(groupAddress, discoveryGroupPort, discoveryRefreshTimeout);
+      discoveryGroup = new DiscoveryGroupImpl(groupAddress, discoveryGroupPort, discoveryRefreshTimeout);
 
-         discoveryGroup.registerListener(this);
+      discoveryGroup.registerListener(this);
 
-         discoveryGroup.start();
-      }
-      catch (Exception e)
-      {
-         // TODO - better execption
-         throw new MessagingException(MessagingException.INTERNAL_ERROR, "Failed to connect discovery group");
-      }
+      discoveryGroup.start();
 
       this.initialWaitTimeout = initialWaitTimeout;
       this.loadBalancingPolicy = instantiateLoadBalancingPolicy(connectionloadBalancingPolicyClassName);
@@ -818,17 +810,17 @@
 
    // Private --------------------------------------------------------------------------------
 
-   private synchronized ClientSession createSessionInternal(final String username,
-                                                            final String password,
-                                                            final boolean xa,
-                                                            final boolean autoCommitSends,
-                                                            final boolean autoCommitAcks,
-                                                            final boolean preAcknowledge,
-                                                            final int ackBatchSize) throws MessagingException
+   private ClientSession createSessionInternal(final String username,
+                                               final String password,
+                                               final boolean xa,
+                                               final boolean autoCommitSends,
+                                               final boolean autoCommitAcks,
+                                               final boolean preAcknowledge,
+                                               final int ackBatchSize) throws MessagingException
    {
       if (discoveryGroup != null && !receivedBroadcast)
-      {
-         boolean ok = discoveryGroup.waitForBroadcast(initialWaitTimeout);
+      {         
+         boolean ok = discoveryGroup.waitForBroadcast(initialWaitTimeout);         
 
          if (!ok)
          {
@@ -837,26 +829,30 @@
          }
       }
 
-      int pos = loadBalancingPolicy.select(connectionManagerArray.length);
+      synchronized (this)
+      {
 
-      ConnectionManager connectionManager = connectionManagerArray[pos];
+         int pos = loadBalancingPolicy.select(connectionManagerArray.length);
 
-      return connectionManager.createSession(username,
-                                             password,
-                                             xa,
-                                             autoCommitSends,
-                                             autoCommitAcks,
-                                             preAcknowledge,
-                                             ackBatchSize,
-                                             minLargeMessageSize,
-                                             blockOnAcknowledge,
-                                             autoGroup,
-                                             sendWindowSize,
-                                             consumerWindowSize,
-                                             consumerMaxRate,
-                                             producerMaxRate,
-                                             blockOnNonPersistentSend,
-                                             blockOnPersistentSend);
+         ConnectionManager connectionManager = connectionManagerArray[pos];
+
+         return connectionManager.createSession(username,
+                                                password,
+                                                xa,
+                                                autoCommitSends,
+                                                autoCommitAcks,
+                                                preAcknowledge,
+                                                ackBatchSize,
+                                                minLargeMessageSize,
+                                                blockOnAcknowledge,
+                                                autoGroup,
+                                                sendWindowSize,
+                                                consumerWindowSize,
+                                                consumerMaxRate,
+                                                producerMaxRate,
+                                                blockOnNonPersistentSend,
+                                                blockOnPersistentSend);
+      }
    }
 
    private ConnectionLoadBalancingPolicy instantiateLoadBalancingPolicy(final String className)

Modified: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2008-12-08 14:22:21 UTC (rev 5476)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2008-12-08 15:40:33 UTC (rev 5477)
@@ -71,7 +71,7 @@
    private volatile boolean started;
       
    public DiscoveryGroupImpl(final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
-   {
+   {      
       socket = new MulticastSocket(groupPort);
 
       socket.joinGroup(groupAddress);
@@ -131,7 +131,7 @@
    }
    
    public boolean waitForBroadcast(final long timeout)
-   {
+   {      
       synchronized (waitLock)
       { 
          long start = System.currentTimeMillis();
@@ -142,7 +142,7 @@
          {      
             try
             {
-               waitLock.wait(toWait);
+               waitLock.wait(toWait);        
             }
             catch (InterruptedException e)
             {               
@@ -160,7 +160,7 @@
          received = false;
          
          return ret;
-      }
+      }      
    }
 
    public void run()

Modified: trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java	2008-12-08 14:22:21 UTC (rev 5476)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java	2008-12-08 15:40:33 UTC (rev 5477)
@@ -26,6 +26,7 @@
 import java.io.Serializable;
 import java.util.List;
 
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.util.Pair;
 
 /**
@@ -41,6 +42,9 @@
 {
    private static final long serialVersionUID = 1052413739064253955L;
    
+   private static final Logger log = Logger.getLogger(BroadcastGroupConfiguration.class);
+
+   
    private final String name;
    
    private final String localBindAddress;

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2008-12-08 14:22:21 UTC (rev 5476)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2008-12-08 15:40:33 UTC (rev 5477)
@@ -138,7 +138,7 @@
       ByteArrayOutputStream bos = new ByteArrayOutputStream();
 
       ObjectOutputStream oos = new ObjectOutputStream(bos);
-
+      
       oos.writeInt(connectorPairs.size());
 
       for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectorPairs)

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2008-12-08 14:22:21 UTC (rev 5476)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2008-12-08 15:40:33 UTC (rev 5477)
@@ -111,7 +111,7 @@
       {
          return;
       }
-
+      
       for (BroadcastGroupConfiguration config : configuration.getBroadcastGroupConfigurations())
       {
          deployBroadcastGroup(config);

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/AutomaticFailoverWithDiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/AutomaticFailoverWithDiscoveryTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/AutomaticFailoverWithDiscoveryTest.java	2008-12-08 15:40:33 UTC (rev 5477)
@@ -0,0 +1,248 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A AutomaticFailoverWithDiscoveryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 8 Dec 2008 14:52:21
+ *
+ *
+ */
+public class AutomaticFailoverWithDiscoveryTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(AutomaticFailoverWithDiscoveryTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private MessagingService liveService;
+
+   private MessagingService backupService;
+
+   private final Map<String, Object> backupParams = new HashMap<String, Object>();
+   
+   private final String groupAddress = "230.1.2.3";
+
+   private final int groupPort = 8765;
+
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testFailover() throws Exception
+   {
+      ClientSessionFactoryImpl sf = new ClientSessionFactoryImpl(groupAddress, groupPort);
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      log.info("Created session");
+
+      session.createQueue(ADDRESS, ADDRESS, null, false, false, true);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      RemotingConnection conn1 = ((ClientSessionImpl)session).getConnection();
+
+      // Simulate failure on connection
+      conn1.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages / 2; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().getString());
+
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         message2.acknowledge();
+      }
+
+      session.close();
+
+      session = sf.createSession(false, true, true);
+
+      consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = numMessages / 2; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().getString());
+
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         message2.acknowledge();
+      }
+
+      ClientMessage message3 = consumer.receive(250);
+
+      session.close();
+
+      assertNull(message3);
+
+      assertEquals(0, sf.numSessions());
+
+      assertEquals(0, sf.numConnections());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupConf.setClustered(true);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+      TransportConfiguration backupTC = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                   backupParams, "backup-connector");
+      connectors.put(backupTC.getName(), backupTC);
+      liveConf.setConnectorConfigurations(connectors);
+      liveConf.setBackupConnectorName(backupTC.getName());
+      liveConf.setClustered(true);
+      
+      List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+      connectorNames.add(new Pair<String, String>(backupTC.getName(), null));
+      
+      final long broadcastPeriod = 250;
+
+      final String bcGroupName = "bc1";
+
+      final String localBindAddress = "localhost";
+
+      final int localBindPort = 5432;
+      
+      BroadcastGroupConfiguration bcConfig1 = new BroadcastGroupConfiguration(bcGroupName,
+                                                                              localBindAddress,
+                                                                              localBindPort,
+                                                                              groupAddress,
+                                                                              groupPort,
+                                                                              broadcastPeriod,
+                                                                              connectorNames);
+      
+      Set<BroadcastGroupConfiguration> bcConfigs1 = new HashSet<BroadcastGroupConfiguration>();
+      bcConfigs1.add(bcConfig1);
+      liveConf.setBroadcastGroupConfigurations(bcConfigs1);
+      
+      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+      liveService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}




More information about the jboss-cvs-commits mailing list