[jboss-cvs] JBoss Messaging SVN: r4965 - in trunk: src/main/org/jboss/messaging/core/client and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Sep 17 07:18:49 EDT 2008


Author: timfox
Date: 2008-09-17 07:18:49 -0400 (Wed, 17 Sep 2008)
New Revision: 4965

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
Removed:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicationTest.java
Modified:
   trunk/src/config/jbm-jndi.xml
   trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
Log:
Failover


Modified: trunk/src/config/jbm-jndi.xml
===================================================================
--- trunk/src/config/jbm-jndi.xml	2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/config/jbm-jndi.xml	2008-09-17 11:18:49 UTC (rev 4965)
@@ -22,12 +22,16 @@
       <connector>
          <factory-class>org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
       </connector>
+      <backup-connector>
+         <factory-class>org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+         <params>
+            <param key="jbm.remoting.netty.host" value="somehost" type="String"/>
+         </params>
+      </backup-connector>
       <entry name="/ClusteredConnectionFactory"/>
       <entry name="/ClusteredXAConnectionFactory"/>
       <entry name="java:/ClusteredConnectionFactory"/>
       <entry name="java:/ClusteredXAConnectionFactory"/>
-      <supports-failover>true</supports-failover>
-      <supports-load-balancing>true</supports-load-balancing>
    </connection-factory>
 
    <connection-factory name="MyExampleConnectionFactory">

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2008-09-17 11:18:49 UTC (rev 4965)
@@ -80,7 +80,15 @@
    Map<String, Object> getTransportParams();
 
    void setTransportParams(final Map<String, Object> transportParams);
+   
+   ConnectorFactory getBackupConnectorFactory();
 
+   void setBackupConnectorFactory(final ConnectorFactory connectorFactory);
+
+   Map<String, Object> getBackupTransportParams();
+
+   void setBackupTransportParams(final Map<String, Object> transportParams);
+
    long getPingPeriod();
 
    void setPingPeriod(final long pingPeriod);

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-09-17 11:18:49 UTC (rev 4965)
@@ -120,8 +120,6 @@
 
    private final Set<ClientSessionInternal> sessions = new ConcurrentHashSet<ClientSessionInternal>();
 
-   private volatile boolean sessionsCreated;
-
    // Static
    // ---------------------------------------------------------------------------------------
 
@@ -309,7 +307,7 @@
 
    public void setConnectorFactory(final ConnectorFactory connectorFactory)
    {
-      if (sessionsCreated)
+      if (!sessions.isEmpty())
       {
          throw new IllegalStateException("Cannot set connector factory after connections have been created");
       }
@@ -324,7 +322,7 @@
 
    public void setTransportParams(final Map<String, Object> transportParams)
    {
-      if (sessionsCreated)
+      if (!sessions.isEmpty())
       {
          throw new IllegalStateException("Cannot set transport params after connections have been created");
       }
@@ -339,7 +337,7 @@
 
    public void setBackupConnectorFactory(final ConnectorFactory connectorFactory)
    {
-      if (sessionsCreated)
+      if (!sessions.isEmpty())
       {
          throw new IllegalStateException("Cannot set backup connector factory after connections have been created");
       }
@@ -354,7 +352,7 @@
 
    public void setBackupTransportParams(final Map<String, Object> transportParams)
    {
-      if (sessionsCreated)
+      if (!sessions.isEmpty())
       {
          throw new IllegalStateException("Cannot set backup transport params after connections have been created");
       }
@@ -528,8 +526,6 @@
 
          sessionChannel.setHandler(handler);
 
-         sessionsCreated = true;
-
          return session;
       }
       catch (Throwable t)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-09-17 11:18:49 UTC (rev 4965)
@@ -565,10 +565,12 @@
 
          channel.sendBlocking(new PacketImpl(SESS_CLOSE));
       }
-      finally
+      catch (Throwable ignore)
       {
-         doCleanup();
+         //Session close should always return without exception
       }
+      
+      doCleanup();      
    }
 
    public ClientMessage createClientMessage(byte type, boolean durable, long expiration, long timestamp, byte priority)
@@ -750,24 +752,20 @@
    {
       ClientConsumerInternal consumer = consumers.get(consumerID);
 
-      if (consumer == null)
+      if (consumer != null)
       {
-         throw new IllegalArgumentException("Cannot find consumer with id " + consumerID);
-      }
-
-      consumer.handleMessage(message);
+         consumer.handleMessage(message);         
+      }      
    }
 
    public void receiveProducerCredits(final long producerID, final int credits) throws Exception
    {
       ClientProducerInternal producer = producers.get(producerID);
 
-      if (producer == null)
+      if (producer != null)
       {
-         throw new IllegalArgumentException("Cannot find producer with id " + producerID);
-      }
-
-      producer.receiveCredits(credits);
+         producer.receiveCredits(credits);         
+      }     
    }
 
    public void handleFailover(final RemotingConnection backupConnection)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-09-17 11:18:49 UTC (rev 4965)
@@ -488,11 +488,6 @@
 
    private void doWrite(final Packet packet)
    {
-      if (destroyed)
-      {
-         return;
-      }
-
       final MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
 
       packet.encode(buffer);
@@ -944,6 +939,11 @@
       // This must never called by more than one thread concurrently
       public Packet sendBlocking(final Packet packet) throws MessagingException
       {
+         if (connection.destroyed)
+         {
+            throw new MessagingException(MessagingException.NOT_CONNECTED, "Cannot write to connection - it is destroyed");
+         }
+         
          lock.readLock().lock();
 
          try

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-09-17 11:18:49 UTC (rev 4965)
@@ -1,24 +1,24 @@
 /*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
 
 package org.jboss.messaging.jms.server.impl;
 
@@ -41,34 +41,52 @@
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  */
-public class JMSServerDeployer extends XmlDeployer 
+public class JMSServerDeployer extends XmlDeployer
 {
    Logger log = Logger.getLogger(JMSServerDeployer.class);
-   
+
    public static final int DEFAULT_DUPS_OK_BATCH_SIZE = 1000;
 
    private JMSServerManager jmsServerManager;
 
    private static final String CLIENTID_ELEMENT = "client-id";
+
    private static final String PING_PERIOD_ELEMENT = "ping-period";
+
    private static final String CALL_TIMEOUT_ELEMENT = "call-timeout";
+
    private static final String DUPS_OK_BATCH_SIZE_ELEMENT = "dups-ok-batch-size";
+
    private static final String CONSUMER_WINDOW_SIZE_ELEMENT = "consumer-window-size";
+
    private static final String CONSUMER_MAX_RATE_ELEMENT = "consumer-max-rate";
+
    private static final String PRODUCER_WINDOW_SIZE_ELEMENT = "producer-window-size";
+
    private static final String PRODUCER_MAX_RATE_ELEMENT = "producer-max-rate";
+
    private static final String BLOCK_ON_ACKNOWLEDGE_ELEMENT = "block-on-acknowledge";
+
    private static final String SEND_NP_MESSAGES_SYNCHRONOUSLY_ELEMENT = "send-np-messages-synchronously";
+
    private static final String SEND_P_MESSAGES_SYNCHRONOUSLY_ELEMENT = "send-p-messages-synchronously";
+
    private static final String CONNECTOR_ELEMENT = "connector";
+
    private static final String BACKUP_CONNECTOR_ELEMENT = "backup-connector";
+
    private static final String FACTORY_CLASS_ELEMENT = "factory-class";
+
    private static final String PARAMS_ELEMENT = "params";
+
    private static final String PARAM_ELEMENT = "param";
-   
+
    private static final String ENTRY_NODE_NAME = "entry";
+
    private static final String CONNECTION_FACTORY_NODE_NAME = "connection-factory";
+
    private static final String QUEUE_NODE_NAME = "queue";
+
    private static final String TOPIC_NODE_NAME = "topic";
 
    public JMSServerDeployer(DeploymentManager deploymentManager)
@@ -88,7 +106,7 @@
     */
    public String[] getElementTagName()
    {
-      return new String[]{QUEUE_NODE_NAME, TOPIC_NODE_NAME, CONNECTION_FACTORY_NODE_NAME};
+      return new String[] { QUEUE_NODE_NAME, TOPIC_NODE_NAME, CONNECTION_FACTORY_NODE_NAME };
    }
 
    /**
@@ -113,7 +131,7 @@
       if (node.getNodeName().equals(CONNECTION_FACTORY_NODE_NAME))
       {
          NodeList children = node.getChildNodes();
-         
+
          long pingPeriod = ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
          long callTimeout = ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
          String clientID = null;
@@ -125,13 +143,13 @@
          boolean blockOnAcknowledge = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
          boolean blockOnNonPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
          boolean blockOnPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
-         
+
          List<String> jndiBindings = new ArrayList<String>();
-         String connectorFactoryClassName = null;         
+         String connectorFactoryClassName = null;
          Map<String, Object> params = new HashMap<String, Object>();
-         String backupConnectorFactoryClassName = null;         
+         String backupConnectorFactoryClassName = null;
          Map<String, Object> backupParams = new HashMap<String, Object>();
-         
+
          for (int j = 0; j < children.getLength(); j++)
          {
             if (PING_PERIOD_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
@@ -186,50 +204,51 @@
             else if (CONNECTOR_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
             {
                NodeList children2 = children.item(j).getChildNodes();
-                                                         
+
                for (int l = 0; l < children2.getLength(); l++)
-               {                                  
+               {
                   String nodeName = children2.item(l).getNodeName();
-                  
+
                   if (FACTORY_CLASS_ELEMENT.equalsIgnoreCase(nodeName))
-                  {                    
+                  {
                      connectorFactoryClassName = children2.item(l).getTextContent();
                   }
                   else if (PARAMS_ELEMENT.equalsIgnoreCase(nodeName))
-                  {                                                             
+                  {
                      NodeList nlParams = children2.item(l).getChildNodes();
-                     
+
                      for (int m = 0; m < nlParams.getLength(); m++)
                      {
                         if (PARAM_ELEMENT.equalsIgnoreCase(nlParams.item(m).getNodeName()))
                         {
                            Node paramNode = nlParams.item(m);
-                           
+
                            NamedNodeMap attributes = paramNode.getAttributes();
-                           
+
                            Node nkey = attributes.getNamedItem("key");
-                           
+
                            String key = nkey.getTextContent();
-                           
+
                            Node nValue = attributes.getNamedItem("value");
-                           
+
                            String value = nValue.getTextContent();
-                           
+
                            Node nType = attributes.getNamedItem("type");
-                           
+
                            String type = nType.getTextContent();
-                           
+
                            if (type.equalsIgnoreCase("Integer"))
                            {
                               try
                               {
                                  Integer iVal = Integer.parseInt(value);
-                                 
+
                                  params.put(key, iVal);
                               }
                               catch (NumberFormatException e2)
                               {
-                                 throw new IllegalArgumentException("Remoting acceptor parameter " + value + " is not a valid Integer");
+                                 throw new IllegalArgumentException("Remoting acceptor parameter " + value +
+                                                                    " is not a valid Integer");
                               }
                            }
                            else if (type.equalsIgnoreCase("Long"))
@@ -237,23 +256,24 @@
                               try
                               {
                                  Long lVal = Long.parseLong(value);
-                                 
+
                                  params.put(key, lVal);
                               }
                               catch (NumberFormatException e2)
                               {
-                                 throw new IllegalArgumentException("Remoting acceptor parameter " + value + " is not a valid Long");
+                                 throw new IllegalArgumentException("Remoting acceptor parameter " + value +
+                                                                    " is not a valid Long");
                               }
                            }
                            else if (type.equalsIgnoreCase("String"))
                            {
-                              params.put(key, value);                             
+                              params.put(key, value);
                            }
                            else if (type.equalsIgnoreCase("Boolean"))
                            {
                               Boolean lVal = Boolean.parseBoolean(value);
-                                 
-                              params.put(key, lVal);                              
+
+                              params.put(key, lVal);
                            }
                            else
                            {
@@ -261,56 +281,57 @@
                            }
                         }
                      }
-                  }                                                                  
+                  }
                }
             }
             else if (BACKUP_CONNECTOR_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
             {
                NodeList children2 = children.item(j).getChildNodes();
-                                                         
+
                for (int l = 0; l < children2.getLength(); l++)
-               {                                  
+               {
                   String nodeName = children2.item(l).getNodeName();
-                  
+
                   if (FACTORY_CLASS_ELEMENT.equalsIgnoreCase(nodeName))
-                  {                    
+                  {
                      backupConnectorFactoryClassName = children2.item(l).getTextContent();
                   }
                   else if (PARAMS_ELEMENT.equalsIgnoreCase(nodeName))
-                  {                                                             
+                  {
                      NodeList nlParams = children2.item(l).getChildNodes();
-                     
+
                      for (int m = 0; m < nlParams.getLength(); m++)
                      {
                         if (PARAM_ELEMENT.equalsIgnoreCase(nlParams.item(m).getNodeName()))
                         {
                            Node paramNode = nlParams.item(m);
-                           
+
                            NamedNodeMap attributes = paramNode.getAttributes();
-                           
+
                            Node nkey = attributes.getNamedItem("key");
-                           
+
                            String key = nkey.getTextContent();
-                           
+
                            Node nValue = attributes.getNamedItem("value");
-                           
+
                            String value = nValue.getTextContent();
-                           
+
                            Node nType = attributes.getNamedItem("type");
-                           
+
                            String type = nType.getTextContent();
-                           
+
                            if (type.equalsIgnoreCase("Integer"))
                            {
                               try
                               {
                                  Integer iVal = Integer.parseInt(value);
-                                 
+
                                  backupParams.put(key, iVal);
                               }
                               catch (NumberFormatException e2)
                               {
-                                 throw new IllegalArgumentException("Remoting acceptor parameter " + value + " is not a valid Integer");
+                                 throw new IllegalArgumentException("Remoting acceptor parameter " + value +
+                                                                    " is not a valid Integer");
                               }
                            }
                            else if (type.equalsIgnoreCase("Long"))
@@ -318,23 +339,24 @@
                               try
                               {
                                  Long lVal = Long.parseLong(value);
-                                 
+
                                  backupParams.put(key, lVal);
                               }
                               catch (NumberFormatException e2)
                               {
-                                 throw new IllegalArgumentException("Remoting acceptor parameter " + value + " is not a valid Long");
+                                 throw new IllegalArgumentException("Remoting acceptor parameter " + value +
+                                                                    " is not a valid Long");
                               }
                            }
                            else if (type.equalsIgnoreCase("String"))
                            {
-                              backupParams.put(key, value);                             
+                              backupParams.put(key, value);
                            }
                            else if (type.equalsIgnoreCase("Boolean"))
                            {
                               Boolean lVal = Boolean.parseBoolean(value);
-                                 
-                              backupParams.put(key, lVal);                              
+
+                              backupParams.put(key, lVal);
                            }
                            else
                            {
@@ -342,33 +364,42 @@
                            }
                         }
                      }
-                  }                                                                  
+                  }
                }
             }
          }
-         
+
          if (connectorFactoryClassName == null)
          {
             throw new IllegalArgumentException("connector-factory-class-name must be specified in configuration");
          }
-         
-         TransportConfiguration connectorConfig =
-            new TransportConfiguration(connectorFactoryClassName, params);
-         
+
+         TransportConfiguration connectorConfig = new TransportConfiguration(connectorFactoryClassName, params);
+
          TransportConfiguration backupConnectorConfig = null;
-         
+
          if (backupConnectorFactoryClassName != null)
          {
             backupConnectorConfig = new TransportConfiguration(backupConnectorFactoryClassName, backupParams);
          }
-                  
+
          String name = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
-                  
-         jmsServerManager.createConnectionFactory(name, connectorConfig, backupConnectorConfig,
-                  pingPeriod, callTimeout, clientID, dupsOKBatchSize, 
-               consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate, 
-               blockOnAcknowledge, blockOnNonPersistentSend, 
-               blockOnPersistentSend, jndiBindings);
+
+         jmsServerManager.createConnectionFactory(name,
+                                                  connectorConfig,
+                                                  backupConnectorConfig,
+                                                  pingPeriod,
+                                                  callTimeout,
+                                                  clientID,
+                                                  dupsOKBatchSize,
+                                                  consumerWindowSize,
+                                                  consumerMaxRate,
+                                                  producerWindowSize,
+                                                  producerMaxRate,
+                                                  blockOnAcknowledge,
+                                                  blockOnNonPersistentSend,
+                                                  blockOnPersistentSend,
+                                                  jndiBindings);
       }
       else if (node.getNodeName().equals(QUEUE_NODE_NAME))
       {
@@ -419,14 +450,14 @@
       else if (node.getNodeName().equals(QUEUE_NODE_NAME))
       {
          String queueName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
-         //TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1413
-         //jmsServerManager.destroyQueue(queueName);
+         // TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1413
+         // jmsServerManager.destroyQueue(queueName);
       }
       else if (node.getNodeName().equals(TOPIC_NODE_NAME))
       {
          String topicName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
-         //TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1413
-         //jmsServerManager.destroyTopic(topicName);
+         // TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1413
+         // jmsServerManager.destroyTopic(topicName);
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-09-17 11:18:49 UTC (rev 4965)
@@ -1,23 +1,23 @@
 /*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
  */
 
 package org.jboss.messaging.jms.server.impl;
@@ -37,7 +37,6 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.jms.JBossQueue;
@@ -56,8 +55,7 @@
  */
 public class JMSServerManagerImpl implements JMSServerManager
 {
-   private static final Logger log = Logger
-         .getLogger(JMSServerManagerImpl.class);
+   private static final Logger log = Logger.getLogger(JMSServerManagerImpl.class);
 
    /**
     * the initial context to bind to
@@ -81,9 +79,10 @@
    private final JMSManagementService managementService;
 
    public JMSServerManagerImpl(final MessagingServerControlMBean server,
-         final PostOffice postOffice, final StorageManager storageManager,
-         final HierarchicalRepository<QueueSettings> queueSettingsRepository,
-         final JMSManagementService managementService)
+                               final PostOffice postOffice,
+                               final StorageManager storageManager,
+                               final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                               final JMSManagementService managementService)
    {
       this.messagingServer = server;
       this.postOffice = postOffice;
@@ -97,7 +96,8 @@
       try
       {
          initialContext = new InitialContext();
-      } catch (NamingException e)
+      }
+      catch (NamingException e)
       {
          log.error("Unable to create Initial Context", e);
       }
@@ -116,8 +116,7 @@
       return messagingServer.getVersion();
    }
 
-   public boolean createQueue(final String queueName, final String jndiBinding)
-         throws Exception
+   public boolean createQueue(final String queueName, final String jndiBinding) throws Exception
    {
       JBossQueue jBossQueue = new JBossQueue(queueName);
       postOffice.addDestination(jBossQueue.getSimpleAddress(), true);
@@ -128,13 +127,16 @@
          addToDestinationBindings(queueName, jndiBinding);
       }
       Binding binding = postOffice.getBinding(jBossQueue.getSimpleAddress());
-      managementService.registerQueue(jBossQueue, binding.getQueue(),
-            jndiBinding, postOffice, storageManager, queueSettingsRepository);
+      managementService.registerQueue(jBossQueue,
+                                      binding.getQueue(),
+                                      jndiBinding,
+                                      postOffice,
+                                      storageManager,
+                                      queueSettingsRepository);
       return added;
    }
 
-   public boolean createTopic(final String topicName, final String jndiBinding)
-         throws Exception
+   public boolean createTopic(final String topicName, final String jndiBinding) throws Exception
    {
       JBossTopic jBossTopic = new JBossTopic(topicName);
       postOffice.addDestination(jBossTopic.getSimpleAddress(), true);
@@ -143,8 +145,7 @@
       {
          addToDestinationBindings(topicName, jndiBinding);
       }
-      managementService.registerTopic(jBossTopic, jndiBinding, postOffice,
-            storageManager);
+      managementService.registerTopic(jBossTopic, jndiBinding, postOffice, storageManager);
       return added;
    }
 
@@ -161,10 +162,8 @@
       }
       destinations.remove(name);
       managementService.unregisterQueue(name);
-      postOffice.removeDestination(JBossQueue.createAddressFromName(name),
-            false);
-      messagingServer.destroyQueue(JBossQueue.createAddressFromName(name)
-            .toString());
+      postOffice.removeDestination(JBossQueue.createAddressFromName(name), false);
+      messagingServer.destroyQueue(JBossQueue.createAddressFromName(name).toString());
 
       return true;
    }
@@ -182,8 +181,7 @@
       }
       destinations.remove(name);
       managementService.unregisterTopic(name);
-      postOffice.removeDestination(JBossTopic.createAddressFromName(name),
-            false);
+      postOffice.removeDestination(JBossTopic.createAddressFromName(name), false);
 
       return true;
    }
@@ -191,23 +189,33 @@
    public boolean createConnectionFactory(String name,
                                           TransportConfiguration connectorConfig,
                                           TransportConfiguration backupConnectorConfig,
-                                          long pingPeriod, long callTimeout, String clientID,
-                                          int dupsOKBatchSize, int consumerWindowSize, int consumerMaxRate,
-                                          int producerWindowSize, int producerMaxRate,
+                                          long pingPeriod,
+                                          long callTimeout,
+                                          String clientID,
+                                          int dupsOKBatchSize,
+                                          int consumerWindowSize,
+                                          int consumerMaxRate,
+                                          int producerWindowSize,
+                                          int producerMaxRate,
                                           boolean blockOnAcknowledge,
                                           boolean blockOnNonPersistentSend,
-                                          boolean blockOnPersistentSend, String jndiBinding)
-         throws Exception
+                                          boolean blockOnPersistentSend,
+                                          String jndiBinding) throws Exception
    {
       JBossConnectionFactory cf = connectionFactories.get(name);
       if (cf == null)
       {
          cf = new JBossConnectionFactory(connectorConfig,
                                          backupConnectorConfig,
-                                         pingPeriod, callTimeout,
-                                         clientID, dupsOKBatchSize,
-                                         consumerWindowSize, consumerMaxRate, producerWindowSize,
-                                         producerMaxRate, blockOnAcknowledge,
+                                         pingPeriod,
+                                         callTimeout,
+                                         clientID,
+                                         dupsOKBatchSize,
+                                         consumerWindowSize,
+                                         consumerMaxRate,
+                                         producerWindowSize,
+                                         producerMaxRate,
+                                         blockOnAcknowledge,
                                          blockOnNonPersistentSend,
                                          blockOnPersistentSend);
          connectionFactories.put(name, cf);
@@ -232,25 +240,35 @@
    public boolean createConnectionFactory(String name,
                                           TransportConfiguration connectorConfig,
                                           TransportConfiguration backupConnectorConfig,
-                                          long pingPeriod, long callTimeout, String clientID,
-                                          int dupsOKBatchSize, int consumerWindowSize, int consumerMaxRate,
-                                          int producerWindowSize, int producerMaxRate,
+                                          long pingPeriod,
+                                          long callTimeout,
+                                          String clientID,
+                                          int dupsOKBatchSize,
+                                          int consumerWindowSize,
+                                          int consumerMaxRate,
+                                          int producerWindowSize,
+                                          int producerMaxRate,
                                           boolean blockOnAcknowledge,
                                           boolean blockOnNonPersistentSend,
-                                          boolean blockOnPersistentSend, List<String> jndiBindings)
-                                       throws Exception
+                                          boolean blockOnPersistentSend,
+                                          List<String> jndiBindings) throws Exception
    {
       JBossConnectionFactory cf = connectionFactories.get(name);
       if (cf == null)
       {
          cf = new JBossConnectionFactory(connectorConfig,
-                  backupConnectorConfig,
-                  pingPeriod, callTimeout,
-                  clientID, dupsOKBatchSize,
-                  consumerWindowSize, consumerMaxRate, producerWindowSize,
-                  producerMaxRate, blockOnAcknowledge,
-                  blockOnNonPersistentSend,
-                  blockOnPersistentSend);
+                                         backupConnectorConfig,
+                                         pingPeriod,
+                                         callTimeout,
+                                         clientID,
+                                         dupsOKBatchSize,
+                                         consumerWindowSize,
+                                         consumerMaxRate,
+                                         producerWindowSize,
+                                         producerMaxRate,
+                                         blockOnAcknowledge,
+                                         blockOnNonPersistentSend,
+                                         blockOnPersistentSend);
       }
       for (String jndiBinding : jndiBindings)
       {
@@ -295,8 +313,7 @@
 
    // Private -------------------------------------------------------
 
-   private boolean bindToJndi(final String jndiName, final Object objectToBind)
-         throws NamingException
+   private boolean bindToJndi(final String jndiName, final Object objectToBind) throws NamingException
    {
       String parentContext;
       String jndiNameInContext;
@@ -328,8 +345,7 @@
       return true;
    }
 
-   private void addToDestinationBindings(final String destination,
-         final String jndiBinding)
+   private void addToDestinationBindings(final String destination, final String jndiBinding)
    {
       if (destinations.get(destination) == null)
       {

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicationTest.java	2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicationTest.java	2008-09-17 11:18:49 UTC (rev 4965)
@@ -1,408 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.tests.integration.cluster;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.util.SimpleString;
-
-public class ReplicationTest extends TestCase
-{
-   private static final Logger log = Logger.getLogger(ReplicationTest.class);
-      
-   // Constants -----------------------------------------------------
-  
-   // Attributes ----------------------------------------------------
-   
-   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
-   
-   private MessagingService liveService;
-   
-   private MessagingService backupService;
-   
-   private Map<String, Object> backupParams = new HashMap<String, Object>();
-      
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testReplication() throws Exception
-   {                                     
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-
-      ClientSession session = sf.createSession(false, true, true, -1, false);
-        
-      session.createQueue(ADDRESS, ADDRESS, null, false, false);
-            
-      ClientProducer producer = session.createProducer(ADDRESS);     
-      
-      final int numMessages = 1000;
-      
-      log.info("starting");
-      
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
-               System.currentTimeMillis(), (byte) 1);  
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().putString("aardvarks");
-         message.getBody().flip();  
-         producer.send(message);
-      }
-      
-      log.info("Sent messages");
-                                
-      ClientConsumer consumer = session.createConsumer(ADDRESS);
-      
-      session.start();
-       
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message2 = consumer.receive();
-         
-        // log.info("Got message " + message2);
-
-         assertEquals("aardvarks", message2.getBody().getString());
-         assertEquals(i, message2.getProperty(new SimpleString("count")));
-         
-         session.acknowledge();
-      }
-      
-      log.info("done");
-      
-      ClientMessage message3 = consumer.receive(500);
-      
-      assertNull(message3);
-      
-      log.info("Got all messages");
-      
-      session.close();
-   }
-   
-      
-   public void testFailoverSameConnectionFactory() throws Exception
-   {                              
-      ClientSessionFactory sf =
-         new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
-                  new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory", backupParams));
-
-      ClientSession session = sf.createSession(false, true, true, -1, false);
-                  
-      session.createQueue(ADDRESS, ADDRESS, null, false, false);
-       
-      ClientProducer producer = session.createProducer(ADDRESS);     
-      
-      final int numMessages = 1000;
-      
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
-               System.currentTimeMillis(), (byte) 1);         
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().putString("aardvarks");
-         message.getBody().flip();  
-         producer.send(message);
-      }
-      
-      RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
-      
-      //Simulate failure on connection
-      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
-                      
-      ClientConsumer consumer = session.createConsumer(ADDRESS);
-      
-      session.start();
-      
-      for (int i = 0; i < numMessages / 2; i++)
-      {
-         ClientMessage message2 = consumer.receive();
-
-         assertEquals("aardvarks", message2.getBody().getString());
-         
-         assertEquals(i, message2.getProperty(new SimpleString("count")));
-         
-         session.acknowledge();
-         
-         //log.info("got message " + message2.getProperty(new SimpleString("blah")));
-      }
-
-      session.close();
-                  
-      session = sf.createSession(false, true, true, -1, false);
-      
-      consumer = session.createConsumer(ADDRESS);
-      
-      session.start();
-      
-      for (int i = numMessages / 2 ; i < numMessages; i++)
-      {
-         ClientMessage message2 = consumer.receive();
-
-         assertEquals("aardvarks", message2.getBody().getString());
-         
-         assertEquals(i, message2.getProperty(new SimpleString("count")));
-         
-         session.acknowledge();
-         
-        //log.info("got message " + message2.getProperty(new SimpleString("blah")));
-      }
-      
-      ClientMessage message3 = consumer.receive(500);
-      
-      session.close();
-      
-      assertNull(message3);
-   }
-   
-   public void testFailoverChangeConnectionFactory() throws Exception
-   {                                     
-      ClientSessionFactory sf =
-         new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
-                  new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory", backupParams));
-
-      ClientSession session = sf.createSession(false, true, true, -1, false);
-                  
-      session.createQueue(ADDRESS, ADDRESS, null, false, false);
-       
-      ClientProducer producer = session.createProducer(ADDRESS);     
-      
-      final int numMessages = 1000;
-      
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
-               System.currentTimeMillis(), (byte) 1);         
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().putString("aardvarks");
-         message.getBody().flip();  
-         producer.send(message);
-     //    log.info("sent " + i);
-      }
-      
-      RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
-      
-      //Simulate failure on connection
-      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
-                      
-      ClientConsumer consumer = session.createConsumer(ADDRESS);
-      
-      session.start();
-            
-      for (int i = 0; i < numMessages / 2; i++)
-      {
-         ClientMessage message2 = consumer.receive();
-
-         assertEquals("aardvarks", message2.getBody().getString());
-         
-         assertEquals(i, message2.getProperty(new SimpleString("count")));
-         
-         session.acknowledge();
-         
-         //log.info("got message " + message2.getProperty(new SimpleString("blah")));
-      }
-
-      session.close();
-                  
-      sf =
-         new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory", backupParams));                  
-      
-      log.info("** creating new one");
-      
-      session = sf.createSession(false, true, true, -1, false);
-      
-      consumer = session.createConsumer(ADDRESS);
-      
-      session.start();
-      
-      for (int i = numMessages / 2 ; i < numMessages; i++)      
-      {
-         ClientMessage message2 = consumer.receive();
-
-         assertEquals("aardvarks", message2.getBody().getString());
-         
-         assertEquals(i, message2.getProperty(new SimpleString("count")));
-         
-         session.acknowledge();
-         
-        //log.info("got message " + message2.getProperty(new SimpleString("blah")));
-      }
-      
-      ClientMessage message3 = consumer.receive(500);
-      
-      assertNull(message3);
-      
-      session.close();      
-   }
-   
-   public void testFailoverMultipleSessions() throws Exception
-   {                                     
-      ClientSessionFactory sf =
-         new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
-                  new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory", backupParams));
-
-      final int numSessions = 10;
-      
-      List<ClientSession> sessions = new ArrayList<ClientSession>();
-      
-      List<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
-      
-      for (int i = 0; i < numSessions; i++)
-      {
-         ClientSession sess = sf.createSession(false, true, true, -1, false);
-         
-         SimpleString queueName = new SimpleString("subscription" + i);
-         
-         sess.createQueue(ADDRESS, queueName, null, false, false);
-         
-         ClientConsumer consumer = sess.createConsumer(queueName);
-                          
-         sess.start();
-         
-         sessions.add(sess);
-         
-         consumers.add(consumer);
-      }
-      
-      log.info("Created consumers");
-            
-      ClientSession session = sf.createSession(false, true, true, -1, false);
-                  
-      ClientProducer producer = session.createProducer(ADDRESS);     
-      
-      final int numMessages = 100;
-      
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
-               System.currentTimeMillis(), (byte) 1);         
-         message.putIntProperty(new SimpleString("count"), i);
-         message.getBody().putString("aardvarks");
-         message.getBody().flip();  
-         producer.send(message);
-     //    log.info("sent " + i);
-      }
-                  
-      RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
-      
-      //Simulate failure on connection
-      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
-      
-      for (int i = 0; i < numSessions; i++)
-      {      
-         ClientConsumer cons = consumers.get(i);
-         
-         ClientSession sess = sessions.get(i);
-         
-         for (int j = 0; j < numMessages; j++)
-         {
-            ClientMessage message2 = cons.receive();
-   
-            assertEquals("aardvarks", message2.getBody().getString());
-            
-            //log.info("got message " + i + ":" + message2.getProperty(new SimpleString("count")));
-            
-            assertEquals(j, message2.getProperty(new SimpleString("count")));
-            
-            sess.acknowledge();
-         }
-      }
-      
-      session.close();
-      
-      for (int i = 0; i < numSessions; i++)
-      {      
-         ClientSession sess = sessions.get(i);
-         
-         sess.close();
-      }
-   }
-   
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-   
-   protected void setUp() throws Exception
-   {
-      Configuration backupConf = new ConfigurationImpl();      
-      backupConf.setSecurityEnabled(false);        
-      backupConf.setPacketConfirmationBatchSize(1);
-      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-      backupConf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory", backupParams));
-      backupConf.setBackup(true);                  
-      backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);              
-      backupService.start();
-            
-      Configuration liveConf = new ConfigurationImpl();      
-      liveConf.setSecurityEnabled(false);    
-      liveConf.setPacketConfirmationBatchSize(1);
-      liveConf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
-      liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory", backupParams));
-      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);              
-      liveService.start();
-   }
-   
-   protected void tearDown() throws Exception
-   {                
-      assertEquals(0, ConnectionRegistryImpl.instance.size());
-      
-      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
-      
-      backupService.stop();
-      
-      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
-      
-      liveService.stop();
-      
-      assertEquals(0, InVMRegistry.instance.size());
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java	2008-09-17 11:18:49 UTC (rev 4965)
@@ -0,0 +1,714 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A SimpleAutomaticFailoverTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class SimpleAutomaticFailoverTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(SimpleAutomaticFailoverTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private MessagingService liveService;
+
+   private MessagingService backupService;
+
+   private Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testReplication() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true, -1, false);
+
+      session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         // log.info("Got message " + message2);
+
+         assertEquals("aardvarks", message2.getBody().getString());
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         session.acknowledge();
+      }
+
+      ClientMessage message3 = consumer.receive(250);
+
+      assertNull(message3);
+
+      session.close();
+   }
+
+   public void testFailoverSameConnectionFactory() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      ClientSession session = sf.createSession(false, true, true, -1, false);
+
+      session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+      // Simulate failure on connection
+      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages / 2; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().getString());
+
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         session.acknowledge();
+      }
+
+      session.close();
+
+      session = sf.createSession(false, true, true, -1, false);
+
+      consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = numMessages / 2; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().getString());
+
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         session.acknowledge();
+      }
+
+      ClientMessage message3 = consumer.receive(250);
+
+      session.close();
+
+      assertNull(message3);
+   }
+
+   public void testFailoverChangeConnectionFactory() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      ClientSession session = sf.createSession(false, true, true, -1, false);
+
+      session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+      // Simulate failure on connection
+      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages / 2; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().getString());
+
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         session.acknowledge();
+      }
+
+      session.close();
+
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                   backupParams));
+
+      session = sf.createSession(false, true, true, -1, false);
+
+      consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = numMessages / 2; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().getString());
+
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         session.acknowledge();
+      }
+
+      ClientMessage message3 = consumer.receive(250);
+
+      assertNull(message3);
+
+      session.close();
+   }
+
+   public void testNoMessagesLeftAfterFailoverNewSession() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      ClientSession session = sf.createSession(false, true, true, -1, false);
+
+      session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+      // Simulate failure on connection
+      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().getString());
+
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         session.acknowledge();
+      }
+
+      session.close();
+
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                   backupParams));
+
+      session = sf.createSession(false, true, true, -1, false);
+
+      consumer = session.createConsumer(ADDRESS);
+
+      ClientMessage message3 = consumer.receive(250);
+
+      assertNull(message3);
+
+      session.close();
+   }
+   
+   public void testFailureListenerCalledOnFailure() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      ClientSession session = sf.createSession(false, true, true, -1, false);
+
+      session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+      
+      final CountDownLatch latch = new CountDownLatch(1);
+      
+      class MyListener implements FailureListener
+      {
+         public void connectionFailed(MessagingException me)
+         {
+            latch.countDown();
+         }
+      }
+      
+      conn.addFailureListener(new MyListener());
+
+      // Simulate failure on connection
+      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+      
+      boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+      
+      assertTrue(ok);
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().getString());
+
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         session.acknowledge();
+      }
+
+      session.close();
+
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                   backupParams));
+
+      session = sf.createSession(false, true, true, -1, false);
+
+      consumer = session.createConsumer(ADDRESS);
+
+      ClientMessage message3 = consumer.receive(250);
+
+      assertNull(message3);
+
+      session.close();
+   }   
+
+   public void testFailoverMultipleSessions() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      final int numSessions = 10;
+
+      List<ClientSession> sessions = new ArrayList<ClientSession>();
+
+      List<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         ClientSession sess = sf.createSession(false, true, true, -1, false);
+
+         SimpleString queueName = new SimpleString("subscription" + i);
+
+         sess.createQueue(ADDRESS, queueName, null, false, false);
+
+         ClientConsumer consumer = sess.createConsumer(queueName);
+
+         sess.start();
+
+         sessions.add(sess);
+
+         consumers.add(consumer);
+      }
+
+      ClientSession session = sf.createSession(false, true, true, -1, false);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+      // Simulate failure on connection
+      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         ClientConsumer cons = consumers.get(i);
+
+         ClientSession sess = sessions.get(i);
+
+         for (int j = 0; j < numMessages; j++)
+         {
+            ClientMessage message2 = cons.receive();
+
+            assertEquals("aardvarks", message2.getBody().getString());
+
+            assertEquals(j, message2.getProperty(new SimpleString("count")));
+
+            sess.acknowledge();
+         }
+
+         ClientMessage message3 = cons.receive(250);
+
+         assertNull(message3);
+      }
+
+      session.close();
+
+      for (int i = 0; i < numSessions; i++)
+      {
+         ClientSession sess = sessions.get(i);
+
+         sess.close();
+      }
+   }
+
+   public void testCantSetConnectorsAfterCreateSession() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      sf.setConnectorFactory(new InVMConnectorFactory());
+      sf.setTransportParams(new HashMap<String, Object>());
+      sf.setBackupConnectorFactory(new InVMConnectorFactory());
+      sf.setBackupTransportParams(new HashMap<String, Object>());
+
+      ClientSession sess = null;
+
+      try
+      {
+         sess = sf.createSession(false, true, true, -1, false);
+
+         try
+         {
+            sf.setConnectorFactory(new InVMConnectorFactory());
+
+            fail("Should throw exception");
+         }
+         catch (IllegalStateException e)
+         {
+            // Ok
+         }
+
+         try
+         {
+            sf.setTransportParams(new HashMap<String, Object>());
+
+            fail("Should throw exception");
+         }
+         catch (IllegalStateException e)
+         {
+            // Ok
+         }
+
+         try
+         {
+            sf.setBackupConnectorFactory(new InVMConnectorFactory());
+
+            fail("Should throw exception");
+         }
+         catch (IllegalStateException e)
+         {
+            // Ok
+         }
+
+         try
+         {
+            sf.setBackupTransportParams(new HashMap<String, Object>());
+
+            fail("Should throw exception");
+         }
+         catch (IllegalStateException e)
+         {
+            // Ok
+         }
+      }
+      finally
+      {
+         sess.close();
+      }
+
+      sf.setConnectorFactory(new InVMConnectorFactory());
+      sf.setTransportParams(new HashMap<String, Object>());
+      sf.setBackupConnectorFactory(new InVMConnectorFactory());
+      sf.setBackupTransportParams(new HashMap<String, Object>());
+   }
+
+   public void testFailureAfterFailover() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                        backupParams));
+
+      ClientSession session = sf.createSession(false, true, true, -1, false);
+
+      session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             0,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);
+         message.getBody().putString("aardvarks");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+      // Simulate failure on connection
+      conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      // Consume half of them
+
+      for (int i = 0; i < numMessages / 2; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("aardvarks", message2.getBody().getString());
+
+         assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+         session.acknowledge();
+      }
+
+      RemotingConnection conn2 = ((ClientSessionImpl)session).getConnection();
+
+      final CountDownLatch latch = new CountDownLatch(1);
+      
+      class MyListener implements FailureListener
+      {
+         public void connectionFailed(MessagingException me)
+         {
+            latch.countDown();
+         }
+      }
+      
+      conn2.addFailureListener(new MyListener());
+
+      assertFalse(conn == conn2);
+
+      conn2.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+      
+      assertTrue(ok);
+
+      try
+      {               
+         session.createQueue(new SimpleString("blah"), new SimpleString("blah"), null, false, false);
+         
+         fail("Should throw exception");
+      }
+      catch (MessagingException me)
+      {
+         assertEquals(MessagingException.NOT_CONNECTED, me.getCode());
+      }
+      
+      session.close();
+   }
+   
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupConf.setPacketConfirmationBatchSize(10);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.setPacketConfirmationBatchSize(10);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                          backupParams));
+      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+      liveService.start();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, ConnectionRegistryImpl.instance.size());
+
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}




More information about the jboss-cvs-commits mailing list