[hornetq-commits] JBoss hornetq SVN: r7986 - in branches/Branch_Replication_Changes: src/config/common/schema and 20 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 24 05:25:46 EDT 2009


Author: timfox
Date: 2009-09-24 05:25:44 -0400 (Thu, 24 Sep 2009)
New Revision: 7986

Added:
   branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java
Removed:
   branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java
Modified:
   branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml
   branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java
   branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
   branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml
   branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
   branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
replication changes

Modified: branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml
===================================================================
--- branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml	2009-09-24 09:25:44 UTC (rev 7986)
@@ -325,14 +325,6 @@
                             <entry>true</entry>
                         </row>
                         <row>
-                            <entry><link linkend="queue.activation.timeout"
-                                    >queue-activation-timeout</link></entry>
-                            <entry>Long</entry>
-                            <entry>after failover occurs, this timeout specifies how long (in ms) to
-                                wait for consumers to re-attach before starting delivery</entry>
-                            <entry>30000</entry>
-                        </row>
-                        <row>
                             <entry><link linkend="server.scheduled.thread.pool"
                                     >scheduled-thread-pool-max-size</link></entry>
                             <entry>Integer</entry>

Modified: branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd	2009-09-24 09:25:44 UTC (rev 7986)
@@ -102,6 +102,9 @@
             <xsd:element name="reconnect-attempts" type="xsd:int"
                 maxOccurs="1" minOccurs="0">
             </xsd:element>
+            <xsd:element name="use-reattach" type="xsd:boolean"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
             <xsd:element name="failover-on-server-shutdown" type="xsd:boolean"
                 maxOccurs="1" minOccurs="0">
             </xsd:element>

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -861,12 +861,8 @@
          return true;
       }
       
-      log.info("session handling failover");
-
       boolean ok = false;
 
-      log.info("Failover occurring");
-      
       // Need to stop all consumers outside the lock
       for (ClientConsumerInternal consumer : consumers.values())
       {
@@ -882,21 +878,13 @@
          consumer.clearAtFailover();
       }
       
-      log.info("stopped consumers");
-
       // We lock the channel to prevent any packets being sent during the failover process
       channel.lock();
       
-      log.info("got lock");
-
       try
       {
-         log.info("transferring connection");
-         
          channel.transferConnection(backupConnection);
          
-         log.info("transferred connection");
-
          remotingConnection = backupConnection;
 
          Packet request = new CreateSessionMessage(name,
@@ -913,15 +901,10 @@
 
          Channel channel1 = backupConnection.getChannel(1, -1, false);
 
-         log.info("sending create session");
-         
          CreateSessionResponseMessage response = (CreateSessionResponseMessage)channel1.sendBlocking(request);
 
-         log.info("got response from create session");
-         
          if (response.isCreated())
          {
-            log.info("craeted ok");
             // Session was created ok
 
             // Now we need to recreate the consumers
@@ -959,7 +942,7 @@
                   conn.write(buffer, false);                  
                }
             }
-
+            
             if ((!autoCommitAcks || !autoCommitSends) && workDone)
             {
                // Session is transacted - set for rollback only
@@ -990,18 +973,14 @@
             }
 
             ok = true;
-            
-            log.info("session created ok");
          }
          else
          {
-            log.info("not created ok");
             // This means the server we failed onto is not ready to take new sessions - perhaps it hasn't actually
             // failed over
          }
 
          // We cause any blocking calls to return - since they won't get responses.
-         log.info("calling returnblocking");
          channel.returnBlocking();
       }
       catch (Throwable t)
@@ -1012,7 +991,7 @@
       {
          channel.unlock();
       }
-
+      
       return ok;
    }
 

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -551,9 +551,9 @@
          // until failover is complete
 
          boolean serverShutdown = me.getCode() == HornetQException.DISCONNECTED;
-
+         
          boolean attemptFailoverOrReconnect = (backupConnectorFactory != null || reconnectAttempts != 0) && (failoverOnServerShutdown || !serverShutdown);
-
+         
          if (attemptFailoverOrReconnect)
          {
             lockAllChannel1s();
@@ -784,6 +784,8 @@
                ok = false;
             }
          }
+         
+         log.info("Reconnected ok");
       }
 
       return ok;
@@ -792,7 +794,7 @@
    private RemotingConnection getConnectionWithRetry(final int initialRefCount, final int reconnectAttempts)
    {
       long interval = retryInterval;
-
+      
       int count = 0;
 
       while (true)
@@ -1121,7 +1123,7 @@
                public void run()
                {
                   conn.fail(new HornetQException(HornetQException.DISCONNECTED,
-                                                 "The connection was exitLoop by the server"));
+                                                 "The connection was disconnected because of server shutdown"));
                }
             });
          }

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -92,7 +92,7 @@
       }
        
       URL url = getClass().getClassLoader().getResource(configurationUrl);
-      log.info("Loading server configuration from " + url);
+      log.debug("Loading server configuration from " + url);
 
       Reader reader = new InputStreamReader(url.openStream());
       String xml = org.hornetq.utils.XMLUtil.readerToString(reader);

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -125,7 +125,7 @@
 
                try
                {
-                  log.info("Deploying " + url + " for " + deployer.getClass().getSimpleName());
+                  log.debug("Deploying " + url + " for " + deployer.getClass().getSimpleName());
                   deployer.deploy(url);
                }
                catch (Exception e)

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -32,7 +32,7 @@
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
- 
+
    private final Bridge bridge;
 
    private final BridgeConfiguration configuration;
@@ -41,8 +41,7 @@
 
    // Constructors --------------------------------------------------
 
-   public BridgeControlImpl(final Bridge bridge, final BridgeConfiguration configuration)
-      throws Exception
+   public BridgeControlImpl(final Bridge bridge, final BridgeConfiguration configuration) throws Exception
    {
       super(BridgeControl.class);
       this.bridge = bridge;
@@ -54,10 +53,10 @@
    public String[] getConnectorPair() throws Exception
    {
       String[] pair = new String[2];
-      
+
       pair[0] = configuration.getConnectorPair().a;
       pair[1] = configuration.getConnectorPair().b != null ? configuration.getConnectorPair().b : null;
-      
+
       return pair;
    }
 
@@ -70,7 +69,7 @@
    {
       return configuration.getQueueName();
    }
-   
+
    public String getDiscoveryGroupName()
    {
       return configuration.getDiscoveryGroupName();

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -389,15 +389,12 @@
 
    public String[] listRemoteAddresses()
    {
-      log.info("listing remote addresses");
       Set<RemotingConnection> connections = remotingService.getConnections();
 
       String[] remoteAddresses = new String[connections.size()];
       int i = 0;
       for (RemotingConnection connection : connections)
       {
-         log.info("connection " + connection + " is on server");
-         
          remoteAddresses[i++] = connection.getRemoteAddress();
       }
       return remoteAddresses;

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -169,16 +169,16 @@
    }
 
    public HornetQServerControlImpl registerServer(final PostOffice postOffice,
-                                                    final StorageManager storageManager,
-                                                    final Configuration configuration,
-                                                    final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                                                    final HierarchicalRepository<Set<Role>> securityRepository,
-                                                    final ResourceManager resourceManager,
-                                                    final RemotingService remotingService,
-                                                    final HornetQServer messagingServer,
-                                                    final QueueFactory queueFactory,
-                                                    final ScheduledExecutorService scheduledThreadPool,
-                                                    final boolean backup) throws Exception
+                                                  final StorageManager storageManager,
+                                                  final Configuration configuration,
+                                                  final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                                                  final HierarchicalRepository<Set<Role>> securityRepository,
+                                                  final ResourceManager resourceManager,
+                                                  final RemotingService remotingService,
+                                                  final HornetQServer messagingServer,
+                                                  final QueueFactory queueFactory,
+                                                  final ScheduledExecutorService scheduledThreadPool,
+                                                  final boolean backup) throws Exception
    {
       this.postOffice = postOffice;
       this.addressSettingsRepository = addressSettingsRepository;
@@ -191,12 +191,12 @@
       messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
 
       messagingServerControl = new HornetQServerControlImpl(postOffice,
-                                                              configuration,
-                                                              resourceManager,
-                                                              remotingService,
-                                                              messagingServer,
-                                                              messageCounterManager,
-                                                              broadcaster);
+                                                            configuration,
+                                                            resourceManager,
+                                                            remotingService,
+                                                            messagingServer,
+                                                            messageCounterManager,
+                                                            broadcaster);
       ObjectName objectName = ObjectNames.getHornetQServerObjectName();
       registerInJMX(objectName, messagingServerControl);
       registerInRegistry(ResourceNames.CORE_SERVER, messagingServerControl);
@@ -298,7 +298,7 @@
 
    public void unregisterAcceptors()
    {
-      List<String> acceptors = new ArrayList<String>();      
+      List<String> acceptors = new ArrayList<String>();
       for (String resourceName : registry.keySet())
       {
          if (resourceName.startsWith(ResourceNames.CORE_ACCEPTOR))
@@ -306,7 +306,7 @@
             acceptors.add(resourceName);
          }
       }
-      
+
       for (String acceptor : acceptors)
       {
          String name = acceptor.substring(ResourceNames.CORE_ACCEPTOR.length());
@@ -320,7 +320,7 @@
          }
       }
    }
-   
+
    public synchronized void unregisterAcceptor(final String name) throws Exception
    {
       ObjectName objectName = ObjectNames.getAcceptorObjectName(name);
@@ -478,7 +478,7 @@
    }
 
    private Set<ObjectName> registeredNames = new HashSet<ObjectName>();
-   
+
    public void registerInJMX(final ObjectName objectName, final Object managedResource) throws Exception
    {
       if (!jmxManagementEnabled)
@@ -595,14 +595,15 @@
             }
             if (!unexpectedResourceNames.isEmpty())
             {
-               log.warn("On ManagementService stop, there are " + unexpectedResourceNames.size() + " unexpected registered MBeans");
+               log.warn("On ManagementService stop, there are " + unexpectedResourceNames.size() +
+                        " unexpected registered MBeans");
             }
 
             for (ObjectName on : this.registeredNames)
             {
                try
                {
-                  mbeanServer.unregisterMBean(on);                                   
+                  mbeanServer.unregisterMBean(on);
                }
                catch (Exception ignore)
                {
@@ -611,16 +612,19 @@
          }
       }
 
-      messageCounterManager.stop();
+      if (messageCounterManager != null)
+      {
+         messageCounterManager.stop();
 
-      messageCounterManager.resetAllCounters();
+         messageCounterManager.resetAllCounters();
 
-      messageCounterManager.resetAllCounterHistories();
+         messageCounterManager.resetAllCounterHistories();
 
-      messageCounterManager.clear();
-      
+         messageCounterManager.clear();
+      }
+
       registeredNames.clear();
-      
+
       started = false;
    }
 

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -403,7 +403,7 @@
             }
 
             for (Bindable bindable : chosen)
-            {
+            {               
                bindable.route(message, tx);
             }
          }

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -85,8 +85,6 @@
 
    private volatile boolean started;
 
-  // private volatile boolean backup;
-
    private final ManagementService managementService;
 
    private final Reaper reaperRunnable = new Reaper();
@@ -127,7 +125,7 @@
                          final ManagementService managementService,
                          final long reaperPeriod,
                          final int reaperPriority,
-                         final boolean enableWildCardRouting,             
+                         final boolean enableWildCardRouting,
                          final int idCacheSize,
                          final boolean persistIDCache,
                          final ExecutorFactory orderedExecutorFactory,
@@ -182,7 +180,7 @@
       // This is to avoid thread leakages where the Reaper would run beyong the life cycle of the PostOffice
       started = true;
 
-      startExpiryScanner();      
+      startExpiryScanner();
    }
 
    public synchronized void stop() throws Exception
@@ -344,8 +342,7 @@
 
                      if (redistributionDelay != -1)
                      {
-                        queue.addRedistributor(redistributionDelay,
-                                               redistributorExecutorFactory.getExecutor());
+                        queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
                      }
                   }
                }
@@ -415,8 +412,7 @@
 
                      if (redistributionDelay != -1)
                      {
-                        queue.addRedistributor(redistributionDelay,
-                                               redistributorExecutorFactory.getExecutor());
+                        queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
                      }
                   }
                }
@@ -870,7 +866,7 @@
             log.warn("Reaper thread being restarted");
             closed = false;
          }
-         
+
          // The reaper thread should be finished case the PostOffice is gone
          // This is to avoid leaks on PostOffice between stops and starts
          while (PostOfficeImpl.this.isStarted())

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -63,7 +63,11 @@
    Version getVersion();
 
    HornetQServerControlImpl getHornetQServerControl();
+   
+   void registerActivateCallback(ActivateCallback callback);
 
+   void unregisterActivateCallback(ActivateCallback callback);
+
    ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
 
    CreateSessionResponseMessage createSession(String name,

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -241,7 +241,7 @@
    }
 
    private void cancelRefs() throws Exception
-   {
+   {      
       MessageReference ref;
 
       LinkedList<MessageReference> list = new LinkedList<MessageReference>();
@@ -251,10 +251,17 @@
          list.addFirst(ref);
       }
 
+      Queue queue = null;
       for (MessageReference ref2 : list)
       {
-         ref2.getQueue().cancel(ref2);
+         queue = ref2.getQueue();
+         queue.cancel(ref2);
       }
+      
+      if (queue != null)
+      {
+         queue.deliverAsync(executor);
+      }
    }
 
    public void stop() throws Exception
@@ -495,10 +502,116 @@
    {
       if (started)
       {
-         executor.execute(new FailRunnable());
+         //executor.execute(new FailRunnable());
+         
+         try
+         {
+            cancelRefs();
+            
+            //setupNotificationConsumer();
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to handle failure", e);
+         }                 
       }
    }
+   
+   private ClientConsumer notifConsumer;
+      
+   // TODO - we should move this code to the ClusterConnectorImpl - and just execute it when the bridge
+   // connection is opened and closed - we can use
+   // a callback to tell us that
+   private void setupNotificationConsumer() throws Exception
+   {           
+      if (flowRecord != null)
+      {
+         if (notifConsumer != null)
+         {
+            try
+            {
+               notifConsumer.close();
+               
+               notifConsumer = null;
+            }
+            catch (HornetQException e)
+            {
+               log.error("Failed to close consumer", e);
+            }
+         }
+         
+         // Get the queue data
 
+         // Create a queue to catch the notifications - the name must be deterministic on live and backup, but
+         // different each time this is called
+         // Otherwise it may already exist if server is restarted before it has been deleted on backup
+
+         String qName = "notif." + nodeUUID.toString() + "." + name.toString();
+
+         SimpleString notifQueueName = new SimpleString(qName);
+
+         SimpleString filter = new SimpleString(ManagementHelper.HDR_BINDING_TYPE + "<>" +
+                                                BindingType.DIVERT.toInt() +
+                                                " AND " +
+                                                ManagementHelper.HDR_NOTIFICATION_TYPE +
+                                                " IN ('" +
+                                                NotificationType.BINDING_ADDED +
+                                                "','" +
+                                                NotificationType.BINDING_REMOVED +
+                                                "','" +
+                                                NotificationType.CONSUMER_CREATED +
+                                                "','" +
+                                                NotificationType.CONSUMER_CLOSED +
+                                                "') AND " +
+                                                ManagementHelper.HDR_DISTANCE +
+                                                "<" +
+                                                flowRecord.getMaxHops() +
+                                                " AND (" +
+                                                ManagementHelper.HDR_ADDRESS +
+                                                " LIKE '" +
+                                                flowRecord.getAddress() +
+                                                "%')");
+
+         // The queue can't be temporary, since if the node with the bridge crashes then is restarted quickly
+         // it might get deleted on the target when it does connection cleanup
+
+         // When the backup activates the queue might already exist, so we catch this and ignore
+         try
+         {
+            session.createQueue(managementNotificationAddress, notifQueueName, filter, false);
+         }
+         catch (HornetQException me)
+         {
+            if (me.getCode() == HornetQException.QUEUE_EXISTS)
+            {
+               // Ok
+            }
+            else
+            {
+               throw me;
+            }
+         }
+
+         notifConsumer = session.createConsumer(notifQueueName);
+
+         notifConsumer.setMessageHandler(flowRecord);
+
+         session.start();
+
+         ClientMessage message = session.createClientMessage(false);
+
+         ManagementHelper.putOperationInvocation(message,
+                                                 ResourceNames.CORE_SERVER,
+                                                 "sendQueueInfoToQueue",
+                                                 notifQueueName.toString(),
+                                                 flowRecord.getAddress());
+
+         ClientProducer prod = session.createProducer(managementAddress);
+
+         prod.send(message);
+      }
+   }
+
    private synchronized boolean createObjects()
    {
       if (!started)
@@ -507,9 +620,7 @@
       }
 
       try
-      {
-         queue.addConsumer(BridgeImpl.this);
-
+      {                 
          csf = null;
          if (discoveryAddress != null)
          {
@@ -540,92 +651,22 @@
 
          session.setSendAcknowledgementHandler(BridgeImpl.this);
 
-         // TODO - we should move this code to the ClusterConnectorImpl - and just execute it when the bridge
-         // connection is opened and closed - we can use
-         // a callback to tell us that
-         if (flowRecord != null)
-         {
-            // Get the queue data
-
-            // Create a queue to catch the notifications - the name must be deterministic on live and backup, but
-            // different each time this is called
-            // Otherwise it may already exist if server is restarted before it has been deleted on backup
-
-            String qName = "notif." + nodeUUID.toString() + "." + name.toString();
-
-            SimpleString notifQueueName = new SimpleString(qName);
-
-            SimpleString filter = new SimpleString(ManagementHelper.HDR_BINDING_TYPE + "<>" +
-                                                   BindingType.DIVERT.toInt() +
-                                                   " AND " +
-                                                   ManagementHelper.HDR_NOTIFICATION_TYPE +
-                                                   " IN ('" +
-                                                   NotificationType.BINDING_ADDED +
-                                                   "','" +
-                                                   NotificationType.BINDING_REMOVED +
-                                                   "','" +
-                                                   NotificationType.CONSUMER_CREATED +
-                                                   "','" +
-                                                   NotificationType.CONSUMER_CLOSED +
-                                                   "') AND " +
-                                                   ManagementHelper.HDR_DISTANCE +
-                                                   "<" +
-                                                   flowRecord.getMaxHops() +
-                                                   " AND (" +
-                                                   ManagementHelper.HDR_ADDRESS +
-                                                   " LIKE '" +
-                                                   flowRecord.getAddress() +
-                                                   "%')");
-
-            // The queue can't be temporary, since if the node with the bridge crashes then is restarted quickly
-            // it might get deleted on the target when it does connection cleanup
-
-            // When the backup activates the queue might already exist, so we catch this and ignore
-            try
-            {
-               session.createQueue(managementNotificationAddress, notifQueueName, filter, false);
-            }
-            catch (HornetQException me)
-            {
-               if (me.getCode() == HornetQException.QUEUE_EXISTS)
-               {
-                  // Ok
-               }
-               else
-               {
-                  throw me;
-               }
-            }
-
-            ClientConsumer notifConsumer = session.createConsumer(notifQueueName);
-
-            notifConsumer.setMessageHandler(flowRecord);
-
-            session.start();
-
-            ClientMessage message = session.createClientMessage(false);
-
-            ManagementHelper.putOperationInvocation(message,
-                                                    ResourceNames.CORE_SERVER,
-                                                    "sendQueueInfoToQueue",
-                                                    notifQueueName.toString(),
-                                                    flowRecord.getAddress());
-
-            ClientProducer prod = session.createProducer(managementAddress);
-
-            prod.send(message);
-         }
-
+         setupNotificationConsumer();
+                  
          active = true;
+                 
+         queue.addConsumer(BridgeImpl.this);
 
          queue.deliverAsync(executor);
+         
+         log.info("Bridge " + name + " is now connected to destination ");
 
          return true;
       }
       catch (Exception e)
       {
-         log.warn("Unable to connect. Bridge is now disabled.", e);
-
+         log.warn("Bridge " + name + " is unable to connect to destination. It will be disabled.");
+         
          return false;
       }
    }
@@ -667,54 +708,56 @@
       }
    }
 
-   private class FailRunnable implements Runnable
-   {
-      public void run()
-      {
-         synchronized (BridgeImpl.this)
-         {
-            if (!started)
-            {
-               return;
-            }
+//   private class FailRunnable implements Runnable
+//   {
+//      public void run()
+//      {
+//         synchronized (BridgeImpl.this)
+//         {         
+//            
+//            if (!started)
+//            {
+//               return;
+//            }
+//
+//            if (flowRecord != null)
+//            {
+//               try
+//               {
+//                  // flowRecord.reset();
+//               }
+//               catch (Exception e)
+//               {
+//                  log.error("Failed to reset", e);
+//               }
+//            }
+//
+//            active = false;
+//         }
+//
+//         try
+//         {
+//            queue.removeConsumer(BridgeImpl.this);
+//
+//            session.cleanUp();
+//
+//            cancelRefs();
+//
+//            csf.close();
+//         }
+//         catch (Exception e)
+//         {
+//            log.error("Failed to stop", e);
+//         }
+//
+//         if (!createObjects())
+//         {
+//            started = false;
+//         }
+//         }
+//      }
+//   }
 
-            if (flowRecord != null)
-            {
-               try
-               {
-                  // flowRecord.reset();
-               }
-               catch (Exception e)
-               {
-                  log.error("Failed to reset", e);
-               }
-            }
-
-            active = false;
-         }
-
-         try
-         {
-            queue.removeConsumer(BridgeImpl.this);
-
-            session.cleanUp();
-
-            cancelRefs();
-
-            csf.close();
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to stop", e);
-         }
-
-         if (!createObjects())
-         {
-            started = false;
-         }
-      }
-   }
-
    private class CreateObjectsRunnable implements Runnable
    {
       public synchronized void run()

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -15,6 +15,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.Consumer;
 import org.hornetq.core.server.Distributor;
 
@@ -23,6 +24,8 @@
  */
 public abstract class DistributorImpl implements Distributor
 {
+   private static final Logger log = Logger.getLogger(DistributorImpl.class);
+
    protected final List<Consumer> consumers = new ArrayList<Consumer>();
 
    public void addConsumer(Consumer consumer)

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -75,6 +75,7 @@
 import org.hornetq.core.security.Role;
 import org.hornetq.core.security.SecurityStore;
 import org.hornetq.core.security.impl.SecurityStoreImpl;
+import org.hornetq.core.server.ActivateCallback;
 import org.hornetq.core.server.Divert;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.MemoryManager;
@@ -179,17 +180,19 @@
    private Deployer securityDeployer;
 
    private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
- 
+
    private final Object initialiseLock = new Object();
 
    private boolean initialised;
-   
+
    private int managementConnectorID;
 
    private static AtomicInteger managementConnectorSequence = new AtomicInteger(0);
-   
+
    private ConnectionManager replicatingConnectionManager;
 
+   private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -273,6 +276,8 @@
       // so it can be initialised by the live node
       remotingService.start();
 
+      started = true;
+
       log.info("HornetQ Server version " + getVersion().getFullVersion() + " started");
    }
 
@@ -334,16 +339,26 @@
 
       managementService.stop();
 
-      storageManager.stop();
+      if (storageManager != null)
+      {
+         storageManager.stop();
+      }
 
       if (securityManager != null)
       {
          securityManager.stop();
       }
-      
-      resourceManager.stop();
-      postOffice.stop();
 
+      if (resourceManager != null)
+      {
+         resourceManager.stop();
+      }
+
+      if (postOffice != null)
+      {
+         postOffice.stop();
+      }
+
       // Need to shutdown pools before shutting down paging manager to make sure everything is written ok
 
       List<Runnable> tasks = scheduledPool.shutdownNow();
@@ -369,7 +384,10 @@
       scheduledPool = null;
       threadPool = null;
 
-      pagingManager.stop();
+      if (pagingManager != null)
+      {
+         pagingManager.stop();
+      }
 
       memoryManager.stop();
 
@@ -390,7 +408,7 @@
       initialised = false;
       uuid = null;
       nodeID = null;
-      
+
       log.info("HornetQ Server version " + getVersion().getFullVersion() + " stopped");
    }
 
@@ -487,7 +505,7 @@
    }
 
    public CreateSessionResponseMessage createSession(final String name,
-                                                     final long channelID,                                                     
+                                                     final long channelID,
                                                      final String username,
                                                      final String password,
                                                      final int minLargeMessageSize,
@@ -498,7 +516,7 @@
                                                      final boolean preAcknowledge,
                                                      final boolean xa,
                                                      final int sendWindowSize) throws Exception
-   {      
+   {
       if (version.getIncrementingVersion() != incrementingVersion)
       {
          log.warn("Client with version " + incrementingVersion +
@@ -511,11 +529,11 @@
                   "interoperate properly");
          return null;
       }
-      
+
       if (!checkActivate())
       {
-         //Backup server is not ready to accept connections
-         
+         // Backup server is not ready to accept connections
+
          return new CreateSessionResponseMessage(false, version.getIncrementingVersion());
       }
 
@@ -532,7 +550,7 @@
 
       Channel channel = connection.getChannel(channelID, sendWindowSize, false);
 
-      final ServerSessionImpl session = new ServerSessionImpl(name,                                                              
+      final ServerSessionImpl session = new ServerSessionImpl(name,
                                                               username,
                                                               password,
                                                               minLargeMessageSize,
@@ -552,7 +570,7 @@
                                                               queueFactory,
                                                               this,
                                                               configuration.getManagementAddress());
-      
+
       sessions.put(name, session);
 
       ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session);
@@ -602,120 +620,119 @@
       }
    }
 
-//   public void initialiseBackup(final UUID theUUID, final long liveUniqueID) throws Exception
-//   {
-//      if (theUUID == null)
-//      {
-//         throw new IllegalArgumentException("node id is null");
-//      }
-//
-//      synchronized (initialiseLock)
-//      {
-//         if (initialised)
-//         {
-//            throw new IllegalStateException("Server is already initialised");
-//         }
-//
-//         this.uuid = theUUID;
-//
-//         this.nodeID = new SimpleString(uuid.toString());
-//
-//         initialisePart2();
-//
-//         long backupID = storageManager.getCurrentUniqueID();
-//
-//         if (liveUniqueID != backupID)
-//         {
-//            initialised = false;
-//
-//            throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID +
-//                                            ":" +
-//                                            backupID +
-//                                            "). You're probably trying to restart a live backup pair after a crash");
-//         }
-//
-//         log.info("Backup server is now operational");
-//      }
-//   }
-   
-//   private boolean setupReplicatingConnection() throws Exception
-//   {
-//      String backupConnectorName = configuration.getBackupConnectorName();
-//
-//      if (backupConnectorName != null)
-//      {
-//         TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
-//
-//         if (backupConnector == null)
-//         {
-//            log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
-//         }
-//         else
-//         {
-//            replicatingConnectionManager = new ConnectionManagerImpl(null,
-//                                                                     backupConnector,
-//                                                                     null,
-//                                                                     false,
-//                                                                     1,
-//                                                                     ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
-//                                                                     ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
-//                                                                     ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
-//                                                                     0,
-//                                                                     1.0d,
-//                                                                     0,
-//                                                                     threadPool,
-//                                                                     scheduledPool);
-//
-//            replicatingConnection = replicatingConnectionManager.getConnection(1);
-//
-//            if (replicatingConnection != null)
-//            {
-//               replicatingChannel = replicatingConnection.getChannel(2, -1, false);
-//
-//               replicatingConnection.addFailureListener(new FailureListener()
-//               {
-//                  public void connectionFailed(HornetQException me)
-//                  {
-//                     replicatingChannel.executeOutstandingDelayedResults();
-//                  }
-//               });
-//
-//               // First time we get channel we send a message down it informing the backup of our node id -
-//               // backup and live must have the same node id
-//
-//               Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
-//
-//               final Future future = new Future();
-//
-//               replicatingChannel.replicatePacket(packet, 1, new Runnable()
-//               {
-//                  public void run()
-//                  {
-//                     future.run();
-//                  }
-//               });
-//
-//               // This may take a while especially if the journal is large
-//               boolean ok = future.await(60000);
-//
-//               if (!ok)
-//               {
-//                  throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
-//               }
-//            }
-//            else
-//            {
-//               log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
-//
-//               return false;
-//            }
-//         }
-//      }
-//
-//      return true;
-//   }
-   
-      
+   // public void initialiseBackup(final UUID theUUID, final long liveUniqueID) throws Exception
+   // {
+   // if (theUUID == null)
+   // {
+   // throw new IllegalArgumentException("node id is null");
+   // }
+   //
+   // synchronized (initialiseLock)
+   // {
+   // if (initialised)
+   // {
+   // throw new IllegalStateException("Server is already initialised");
+   // }
+   //
+   // this.uuid = theUUID;
+   //
+   // this.nodeID = new SimpleString(uuid.toString());
+   //
+   // initialisePart2();
+   //
+   // long backupID = storageManager.getCurrentUniqueID();
+   //
+   // if (liveUniqueID != backupID)
+   // {
+   // initialised = false;
+   //
+   // throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID +
+   // ":" +
+   // backupID +
+   // "). You're probably trying to restart a live backup pair after a crash");
+   // }
+   //
+   // log.info("Backup server is now operational");
+   // }
+   // }
+
+   // private boolean setupReplicatingConnection() throws Exception
+   // {
+   // String backupConnectorName = configuration.getBackupConnectorName();
+   //
+   // if (backupConnectorName != null)
+   // {
+   // TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
+   //
+   // if (backupConnector == null)
+   // {
+   // log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
+   // }
+   // else
+   // {
+   // replicatingConnectionManager = new ConnectionManagerImpl(null,
+   // backupConnector,
+   // null,
+   // false,
+   // 1,
+   // ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+   // ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+   // ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+   // 0,
+   // 1.0d,
+   // 0,
+   // threadPool,
+   // scheduledPool);
+   //
+   // replicatingConnection = replicatingConnectionManager.getConnection(1);
+   //
+   // if (replicatingConnection != null)
+   // {
+   // replicatingChannel = replicatingConnection.getChannel(2, -1, false);
+   //
+   // replicatingConnection.addFailureListener(new FailureListener()
+   // {
+   // public void connectionFailed(HornetQException me)
+   // {
+   // replicatingChannel.executeOutstandingDelayedResults();
+   // }
+   // });
+   //
+   // // First time we get channel we send a message down it informing the backup of our node id -
+   // // backup and live must have the same node id
+   //
+   // Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
+   //
+   // final Future future = new Future();
+   //
+   // replicatingChannel.replicatePacket(packet, 1, new Runnable()
+   // {
+   // public void run()
+   // {
+   // future.run();
+   // }
+   // });
+   //
+   // // This may take a while especially if the journal is large
+   // boolean ok = future.await(60000);
+   //
+   // if (!ok)
+   // {
+   // throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
+   // }
+   // }
+   // else
+   // {
+   // log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
+   //
+   // return false;
+   // }
+   // }
+   // }
+   //
+   // return true;
+   // }
+
    public HornetQServerControlImpl getHornetQServerControl()
    {
       return messagingServerControl;
@@ -827,6 +844,16 @@
       postOffice.removeBinding(queueName);
    }
 
+   public synchronized void registerActivateCallback(final ActivateCallback callback)
+   {
+      activateCallbacks.add(callback);
+   }
+
+   public synchronized void unregisterActivateCallback(final ActivateCallback callback)
+   {
+      activateCallbacks.remove(callback);
+   }
+
    public ExecutorFactory getExecutorFactory()
    {
       return executorFactory;
@@ -852,26 +879,37 @@
    // Private
    // --------------------------------------------------------------------------------------
 
+   private synchronized void callActivateCallbacks()
+   {
+      for (ActivateCallback callback : activateCallbacks)
+      {
+         callback.activated();
+      }
+   }
+
    private synchronized boolean checkActivate() throws Exception
-   { 
+   {
       if (configuration.isBackup())
       {
-         //Handle backup server activation
-         
+         // Handle backup server activation
+
          if (configuration.isSharedStore())
          {
-            //Complete the startup procedure
-            
+            // Complete the startup procedure
+
+            log.info("Activating server");
+
             configuration.setBackup(false);
-            
-            initialisePart2();                                    
+
+            initialisePart2();
          }
          else
          {
-            //just load journal
+            // TODO
+            // just load journal
          }
       }
-      
+
       return true;
    }
 
@@ -951,7 +989,7 @@
                                       managementService,
                                       configuration.getMessageExpiryScanPeriod(),
                                       configuration.getMessageExpiryThreadPriority(),
-                                      configuration.isWildcardRoutingEnabled(),                                     
+                                      configuration.isWildcardRoutingEnabled(),
                                       configuration.getIDCacheSize(),
                                       configuration.isPersistIDCache(),
                                       executorFactory,
@@ -1024,6 +1062,10 @@
          queueDeployer.start();
       }
 
+      // We need to call this here, this gives any dependent server a chance to deploy its own destinations
+      // this needs to be done before clustering is initialised
+      callActivateCallbacks();
+
       // Deply any pre-defined diverts
       deployDiverts();
 
@@ -1036,7 +1078,7 @@
                                                  scheduledPool,
                                                  managementService,
                                                  configuration,
-                                                 uuid,                                                 
+                                                 uuid,
                                                  configuration.isBackup());
 
          clusterManager.start();
@@ -1050,9 +1092,9 @@
       pagingManager.resumeDepages();
 
       final ServerInfo dumper = new ServerInfo(this, pagingManager);
-      
+
       long dumpInfoInterval = configuration.getServerDumpInterval();
-      
+
       if (dumpInfoInterval > 0)
       {
          scheduledPool.scheduleWithFixedDelay(new Runnable()
@@ -1063,10 +1105,8 @@
             }
          }, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
       }
-      
+
       initialised = true;
-
-      started = true;
    }
 
    private void deployQueuesFromConfiguration() throws Exception
@@ -1079,7 +1119,7 @@
                                             config.isDurable());
       }
    }
-   
+
    private void loadJournal() throws Exception
    {
       List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
@@ -1294,5 +1334,5 @@
    }
 
    // Inner classes
-   // --------------------------------------------------------------------------------  
+   // --------------------------------------------------------------------------------
 }

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -1260,6 +1260,7 @@
                promptDelivery = false;
                return;
             }
+            
             continue;
          }
          else
@@ -1307,7 +1308,7 @@
          }
 
          HandleStatus status = handle(reference, consumer);
-
+         
          if (status == HandleStatus.HANDLED)
          {
             if (iterator == null)
@@ -1335,6 +1336,7 @@
             {
                groups.remove(consumer);
             }
+
             continue;
          }
       }

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -1377,11 +1377,19 @@
    }
 
    public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
-   {      
-      try
+   {   
+      ServerConsumer consumer = consumers.get(packet.getConsumerID());
+      
+      if (consumer == null)
       {
-         consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+         log.error("There is no consumer with id " + packet.getConsumerID());
+         return;
       }
+      
+      try
+      {                 
+         consumer.receiveCredits(packet.getCredits());
+      }
       catch (Exception e)
       {
          log.error("Failed to receive credits " + this.server.getConfiguration().isBackup(), e);

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -654,7 +654,17 @@
    {
       sessionFactory.setReconnectAttempts(reconnectAttempts);
    }
+   
+   public synchronized boolean isUseReattach()
+   {
+      return sessionFactory.isUseReattach();
+   }
 
+   public synchronized void setUseReattach(boolean reattach)
+   {
+      sessionFactory.setUseReattach(reattach);
+   }
+
    public synchronized boolean isFailoverOnServerShutdown()
    {
       return sessionFactory.isFailoverOnServerShutdown();

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -133,7 +133,7 @@
       {
          return;
       }
-
+      
       if (!contextSet)
       {
          context = new InitialContext();
@@ -141,6 +141,8 @@
 
       deploymentManager = new FileDeploymentManager(server.getConfiguration().getFileDeployerScanPeriod());
 
+      server.registerActivateCallback(this);
+      
       server.start();
 
       started = true;

Deleted: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -1,178 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.server.management.impl;
-
-import java.util.List;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.management.ConnectionFactoryControl;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * 
- * @version <tt>$Revision$</tt>
- * 
- */
-public class ConnectionFactoryControlImpl implements ConnectionFactoryControl
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private final HornetQConnectionFactory cf;
-
-   private final List<String> bindings;
-
-   private final String name;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public ConnectionFactoryControlImpl(final HornetQConnectionFactory cf, final String name, final List<String> bindings) throws NotCompliantMBeanException
-   {
-      this.cf = cf;
-      this.name = name;
-      this.bindings = bindings;
-   }
-
-   // Public --------------------------------------------------------
-
-   // ManagedConnectionFactoryMBean implementation ------------------
-
-   public List<String> getBindings()
-   {
-      return bindings;
-   }
-
-   public String getClientID()
-   {
-      return cf.getClientID();
-   }
-
-   public long getClientFailureCheckPeriod()
-   {
-      return cf.getClientFailureCheckPeriod();
-   }
-
-   public long getCallTimeout()
-   {
-      return cf.getCallTimeout();
-   }
-
-   public int getConsumerMaxRate()
-   {
-      return cf.getConsumerMaxRate();
-   }
-
-   public int getConsumerWindowSize()
-   {
-      return cf.getConsumerWindowSize();
-   }
-
-   public int getProducerMaxRate()
-   {
-      return cf.getProducerMaxRate();
-   }
-
-   public int getProducerWindowSize()
-   {
-      return cf.getProducerWindowSize();
-   }
-
-   public int getDupsOKBatchSize()
-   {
-      return cf.getDupsOKBatchSize();
-   }
-
-   public boolean isBlockOnAcknowledge()
-   {
-      return cf.isBlockOnAcknowledge();
-   }
-
-   public boolean isBlockOnNonPersistentSend()
-   {
-      return cf.isBlockOnNonPersistentSend();
-   }
-
-   public boolean isBlockOnPersistentSend()
-   {
-      return cf.isBlockOnPersistentSend();
-   }
-
-   public boolean isPreAcknowledge()
-   {
-      return cf.isPreAcknowledge();
-   }
-
-   public String getName()
-   {
-      return name;
-   }
-
-   public long getConnectionTTL()
-   {
-      return cf.getConnectionTTL();
-   }
-
-   public int getMaxConnections()
-   {
-      return cf.getMaxConnections();
-   }
-
-   public int getReconnectAttempts()
-   {
-      return cf.getReconnectAttempts();
-   }
-
-   public boolean isFailoverOnNodeShutdown()
-   {
-      return cf.isFailoverOnServerShutdown();
-   }
-
-   public long getMinLargeMessageSize()
-   {
-      return cf.getMinLargeMessageSize();
-   }
-
-   public long getRetryInterval()
-   {
-      return cf.getRetryInterval();
-   }
-
-   public double getRetryIntervalMultiplier()
-   {
-      return cf.getRetryIntervalMultiplier();
-   }
-
-   public long getTransactionBatchSize()
-   {
-      return cf.getTransactionBatchSize();
-   }
-
-   public boolean isAutoGroup()
-   {
-      return cf.isAutoGroup();
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Copied: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java (from rev 7946, branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java)
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java	                        (rev 0)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.management.impl;
+
+import java.util.List;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.StandardMBean;
+
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.management.ConnectionFactoryControl;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class JMSConnectionFactoryControlImpl extends StandardMBean implements ConnectionFactoryControl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final HornetQConnectionFactory cf;
+
+   private final List<String> bindings;
+
+   private final String name;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public JMSConnectionFactoryControlImpl(final HornetQConnectionFactory cf, final String name, final List<String> bindings) throws NotCompliantMBeanException
+   {
+      super(ConnectionFactoryControl.class);
+      this.cf = cf;
+      this.name = name;
+      this.bindings = bindings;
+   }
+
+   // Public --------------------------------------------------------
+
+   // ManagedConnectionFactoryMBean implementation ------------------
+
+   public List<String> getBindings()
+   {
+      return bindings;
+   }
+
+   public String getClientID()
+   {
+      return cf.getClientID();
+   }
+
+   public long getClientFailureCheckPeriod()
+   {
+      return cf.getClientFailureCheckPeriod();
+   }
+
+   public long getCallTimeout()
+   {
+      return cf.getCallTimeout();
+   }
+
+   public int getConsumerMaxRate()
+   {
+      return cf.getConsumerMaxRate();
+   }
+
+   public int getConsumerWindowSize()
+   {
+      return cf.getConsumerWindowSize();
+   }
+
+   public int getProducerMaxRate()
+   {
+      return cf.getProducerMaxRate();
+   }
+
+   public int getProducerWindowSize()
+   {
+      return cf.getProducerWindowSize();
+   }
+
+   public int getDupsOKBatchSize()
+   {
+      return cf.getDupsOKBatchSize();
+   }
+
+   public boolean isBlockOnAcknowledge()
+   {
+      return cf.isBlockOnAcknowledge();
+   }
+
+   public boolean isBlockOnNonPersistentSend()
+   {
+      return cf.isBlockOnNonPersistentSend();
+   }
+
+   public boolean isBlockOnPersistentSend()
+   {
+      return cf.isBlockOnPersistentSend();
+   }
+
+   public boolean isPreAcknowledge()
+   {
+      return cf.isPreAcknowledge();
+   }
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public long getConnectionTTL()
+   {
+      return cf.getConnectionTTL();
+   }
+
+   public int getMaxConnections()
+   {
+      return cf.getMaxConnections();
+   }
+
+   public int getReconnectAttempts()
+   {
+      return cf.getReconnectAttempts();
+   }
+
+   public boolean isFailoverOnNodeShutdown()
+   {
+      return cf.isFailoverOnServerShutdown();
+   }
+
+   public long getMinLargeMessageSize()
+   {
+      return cf.getMinLargeMessageSize();
+   }
+
+   public long getRetryInterval()
+   {
+      return cf.getRetryInterval();
+   }
+
+   public double getRetryIntervalMultiplier()
+   {
+      return cf.getRetryIntervalMultiplier();
+   }
+
+   public long getTransactionBatchSize()
+   {
+      return cf.getTransactionBatchSize();
+   }
+
+   public boolean isAutoGroup()
+   {
+      return cf.isAutoGroup();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -106,7 +106,7 @@
    {
       ObjectName objectName = ObjectNames.getJMSTopicObjectName(topic.getTopicName());
       AddressControl addressControl = (AddressControl)managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
-      TopicControlImpl control = new TopicControlImpl(topic, addressControl, jndiBinding, managementService);
+      JMSTopicControlImpl control = new JMSTopicControlImpl(topic, addressControl, jndiBinding, managementService);
       managementService.registerInJMX(objectName, control);
       managementService.registerInRegistry(ResourceNames.JMS_TOPIC + topic.getTopicName(), control);
    }
@@ -123,7 +123,7 @@
                                          final List<String> bindings) throws Exception
    {
       ObjectName objectName = ObjectNames.getConnectionFactoryObjectName(name);
-      ConnectionFactoryControlImpl control = new ConnectionFactoryControlImpl(connectionFactory, name, bindings);
+      JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(connectionFactory, name, bindings);
       managementService.registerInJMX(objectName, control);
       managementService.registerInRegistry(ResourceNames.JMS_CONNECTION_FACTORY + name, control);
    }

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -15,6 +15,8 @@
 
 import java.util.Map;
 
+import javax.management.StandardMBean;
+
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.MessageCounterInfo;
@@ -34,7 +36,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class JMSQueueControlImpl implements JMSQueueControl
+public class JMSQueueControlImpl extends StandardMBean implements JMSQueueControl
 {
    // Constants -----------------------------------------------------
 
@@ -57,7 +59,8 @@
     */
    public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException
    {
-      return (selectorStr == null || selectorStr.trim().length() == 0) ? null : SelectorTranslator.convertToHornetQFilterString(selectorStr);
+      return (selectorStr == null || selectorStr.trim().length() == 0) ? null
+                                                                      : SelectorTranslator.convertToHornetQFilterString(selectorStr);
    }
 
    private static String createFilterForJMSMessageID(String jmsMessageID) throws Exception
@@ -71,7 +74,7 @@
       for (int i = 0; i < messages.length; i++)
       {
          Map<String, Object> message = messages[i];
-         array.put(new JSONObject(message));         
+         array.put(new JSONObject(message));
       }
       return array.toString();
    }
@@ -79,10 +82,11 @@
    // Constructors --------------------------------------------------
 
    public JMSQueueControlImpl(final HornetQQueue managedQueue,
-                          final QueueControl coreQueueControl,
-                          final String jndiBinding,
-                          final MessageCounter counter)
+                              final QueueControl coreQueueControl,
+                              final String jndiBinding,
+                              final MessageCounter counter) throws Exception
    {
+      super(JMSQueueControl.class);
       this.managedQueue = managedQueue;
       this.coreQueueControl = coreQueueControl;
       this.binding = jndiBinding;
@@ -187,10 +191,10 @@
          String filter = createFilterFromJMSSelector(filterStr);
          Map<String, Object>[] coreMessages = coreQueueControl.listMessages(filter);
 
-         Map<String, Object>[] jmsMessages = new Map[coreMessages.length]; 
-         
+         Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
+
          int i = 0;
-         
+
          for (Map<String, Object> coreMessage : coreMessages)
          {
             Map<String, Object> jmsMessage = HornetQMessage.coreMaptoJMSMap(coreMessage);
@@ -203,7 +207,7 @@
          throw new IllegalStateException(e.getMessage());
       }
    }
-   
+
    public String listMessagesAsJSON(String filter) throws Exception
    {
       return toJSON(listMessages(filter));

Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -25,6 +25,7 @@
 import javax.management.NotificationEmitter;
 import javax.management.NotificationFilter;
 import javax.management.NotificationListener;
+import javax.management.StandardMBean;
 
 import org.hornetq.core.client.management.impl.ManagementHelper;
 import org.hornetq.core.config.TransportConfiguration;
@@ -39,7 +40,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class JMSServerControlImpl implements JMSServerControl, NotificationEmitter
+public class JMSServerControlImpl extends StandardMBean implements JMSServerControl, NotificationEmitter
 {
 
    // Constants -----------------------------------------------------
@@ -132,8 +133,9 @@
 
    // Constructors --------------------------------------------------
 
-   public JMSServerControlImpl(final JMSServerManager server)
+   public JMSServerControlImpl(final JMSServerManager server) throws Exception
    {
+      super(JMSServerControl.class);
       this.server = server;
       broadcaster = new NotificationBroadcasterSupport();
    }

Copied: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java (from rev 7946, branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java)
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java	                        (rev 0)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -0,0 +1,354 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.management.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.StandardMBean;
+
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.management.AddressControl;
+import org.hornetq.core.management.HornetQServerControl;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.management.QueueControl;
+import org.hornetq.core.management.ResourceNames;
+import org.hornetq.jms.HornetQTopic;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.jms.client.SelectorTranslator;
+import org.hornetq.jms.server.management.TopicControl;
+import org.hornetq.utils.Pair;
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class JMSTopicControlImpl extends StandardMBean implements TopicControl
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(JMSTopicControlImpl.class);
+
+   // Attributes ----------------------------------------------------
+
+   private final HornetQTopic managedTopic;
+
+   private final String binding;
+
+   private AddressControl addressControl;
+
+   private ManagementService managementService;
+
+   // Static --------------------------------------------------------
+
+   public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException
+   {
+      return (selectorStr == null || selectorStr.trim().length() == 0) ? null
+                                                                      : SelectorTranslator.convertToHornetQFilterString(selectorStr);
+   }
+
+   // Constructors --------------------------------------------------
+
+   public JMSTopicControlImpl(final HornetQTopic topic,
+                           final AddressControl addressControl,
+                           final String jndiBinding,
+                           final ManagementService managementService) throws Exception
+   {
+      super(TopicControl.class);
+      this.managedTopic = topic;
+      this.addressControl = addressControl;
+      this.binding = jndiBinding;
+      this.managementService = managementService;
+   }
+
+   // TopicControlMBean implementation ------------------------------
+
+   public String getName()
+   {
+      return managedTopic.getName();
+   }
+
+   public boolean isTemporary()
+   {
+      return managedTopic.isTemporary();
+   }
+
+   public String getAddress()
+   {
+      return managedTopic.getAddress();
+   }
+
+   public String getJNDIBinding()
+   {
+      return binding;
+   }
+
+   public int getMessageCount()
+   {
+      return getMessageCount(DurabilityType.ALL);
+   }
+
+   public int getDurableMessageCount()
+   {
+      return getMessageCount(DurabilityType.DURABLE);
+   }
+
+   public int getNonDurableMessageCount()
+   {
+      return getMessageCount(DurabilityType.NON_DURABLE);
+   }
+
+   public int getSubscriptionCount()
+   {
+      return getQueues(DurabilityType.ALL).size();
+   }
+
+   public int getDurableSubscriptionCount()
+   {
+      return getQueues(DurabilityType.DURABLE).size();
+   }
+
+   public int getNonDurableSubscriptionCount()
+   {
+      return getQueues(DurabilityType.NON_DURABLE).size();
+   }
+
+   public Object[] listAllSubscriptions()
+   {
+      return listSubscribersInfos(DurabilityType.ALL);
+   }
+
+   public String listAllSubscriptionsAsJSON() throws Exception
+   {
+      return listSubscribersInfosAsJSON(DurabilityType.ALL);
+   }
+
+   public Object[] listDurableSubscriptions()
+   {
+      return listSubscribersInfos(DurabilityType.DURABLE);
+   }
+
+   public String listDurableSubscriptionsAsJSON() throws Exception
+   {
+      return listSubscribersInfosAsJSON(DurabilityType.DURABLE);
+   }
+
+   public Object[] listNonDurableSubscriptions()
+   {
+      return listSubscribersInfos(DurabilityType.NON_DURABLE);
+   }
+
+   public String listNonDurableSubscriptionsAsJSON() throws Exception
+   {
+      return listSubscribersInfosAsJSON(DurabilityType.NON_DURABLE);
+   }
+
+   public Map<String, Object>[] listMessagesForSubscription(final String queueName) throws Exception
+   {
+      QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+      if (coreQueueControl == null)
+      {
+         throw new IllegalArgumentException("No subscriptions with name " + queueName);
+      }
+
+      Map<String, Object>[] coreMessages = coreQueueControl.listMessages(null);
+
+      Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
+
+      int i = 0;
+
+      for (Map<String, Object> coreMessage : coreMessages)
+      {
+         jmsMessages[i++] = HornetQMessage.coreMaptoJMSMap(coreMessage);
+      }
+      return jmsMessages;
+   }
+
+   public String listMessagesForSubscriptionAsJSON(String queueName) throws Exception
+   {
+      return JMSQueueControlImpl.toJSON(listMessagesForSubscription(queueName));
+   }
+
+   public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
+   {
+      String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
+      QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+      if (coreQueueControl == null)
+      {
+         throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
+      }
+      String filter = createFilterFromJMSSelector(filterStr);
+      return coreQueueControl.listMessages(filter).length;
+   }
+
+   public int removeMessages(String filterStr) throws Exception
+   {
+      String filter = createFilterFromJMSSelector(filterStr);
+      int count = 0;
+      String[] queues = addressControl.getQueueNames();
+      for (String queue : queues)
+      {
+         QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
+         count += coreQueueControl.removeMessages(filter);
+      }
+
+      return count;
+   }
+
+   public void dropDurableSubscription(String clientID, String subscriptionName) throws Exception
+   {
+      String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
+      QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+      if (coreQueueControl == null)
+      {
+         throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
+      }
+      HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
+      serverControl.destroyQueue(queueName);
+   }
+
+   public void dropAllSubscriptions() throws Exception
+   {
+      HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
+      String[] queues = addressControl.getQueueNames();
+      for (String queue : queues)
+      {
+         serverControl.destroyQueue(queue);
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private Object[] listSubscribersInfos(final DurabilityType durability)
+   {
+      List<QueueControl> queues = getQueues(durability);
+      List<Object[]> subInfos = new ArrayList<Object[]>(queues.size());
+
+      for (QueueControl queue : queues)
+      {
+         String clientID = null;
+         String subName = null;
+
+         if (queue.isDurable())
+         {
+            Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName()
+                                                                                                   .toString());
+            clientID = pair.a;
+            subName = pair.b;
+         }
+
+         String filter = queue.getFilter() != null ? queue.getFilter() : null;
+
+         Object[] subscriptionInfo = new Object[6];
+         subscriptionInfo[0] = queue.getName();
+         subscriptionInfo[1] = clientID;
+         subscriptionInfo[2] = subName;
+         subscriptionInfo[3] = queue.isDurable();
+         subscriptionInfo[4] = queue.getMessageCount();
+
+         subInfos.add(subscriptionInfo);
+      }
+      return subInfos.toArray(new Object[subInfos.size()]);
+   }
+
+   private String listSubscribersInfosAsJSON(final DurabilityType durability) throws Exception
+   {
+      List<QueueControl> queues = getQueues(durability);
+      JSONArray array = new JSONArray();
+
+      for (QueueControl queue : queues)
+      {
+         String clientID = null;
+         String subName = null;
+
+         if (queue.isDurable())
+         {
+            Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName()
+                                                                                                   .toString());
+            clientID = pair.a;
+            subName = pair.b;
+         }
+
+         String filter = queue.getFilter() != null ? queue.getFilter() : null;
+
+         JSONObject info = new JSONObject();
+         info.put("queueName", queue.getName());
+         info.put("clientID", clientID);
+         info.put("selector", filter);
+         info.put("name", subName);
+         info.put("durable", queue.isDurable());
+         info.put("messageCount", queue.getMessageCount());
+         array.put(info);
+      }
+
+      return array.toString();
+   }
+
+   private int getMessageCount(final DurabilityType durability)
+   {
+      List<QueueControl> queues = getQueues(durability);
+      int count = 0;
+      for (QueueControl queue : queues)
+      {
+         count += queue.getMessageCount();
+      }
+      return count;
+   }
+
+   private List<QueueControl> getQueues(final DurabilityType durability)
+   {
+      try
+      {
+         List<QueueControl> matchingQueues = new ArrayList<QueueControl>();
+         String[] queues = addressControl.getQueueNames();
+         for (String queue : queues)
+         {
+            QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
+
+            // Ignore the "special" subscription
+            if (!coreQueueControl.getName().equals(addressControl.getAddress()))
+            {
+               if (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && coreQueueControl.isDurable()) ||
+                   (durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()))
+               {
+                  matchingQueues.add(coreQueueControl);
+               }
+            }
+         }
+         return matchingQueues;
+      }
+      catch (Exception e)
+      {
+         return Collections.emptyList();
+      }
+   }
+
+   // Inner classes -------------------------------------------------
+
+   private enum DurabilityType
+   {
+      ALL, DURABLE, NON_DURABLE
+   }
+}

Deleted: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -1,348 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.server.management.impl;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.AddressControl;
-import org.hornetq.core.management.HornetQServerControl;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.QueueControl;
-import org.hornetq.core.management.ResourceNames;
-import org.hornetq.jms.HornetQTopic;
-import org.hornetq.jms.client.HornetQMessage;
-import org.hornetq.jms.client.SelectorTranslator;
-import org.hornetq.jms.server.management.TopicControl;
-import org.hornetq.utils.Pair;
-import org.hornetq.utils.json.JSONArray;
-import org.hornetq.utils.json.JSONObject;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * 
- * @version <tt>$Revision$</tt>
- * 
- */
-public class TopicControlImpl implements TopicControl
-{
-   // Constants -----------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(TopicControlImpl.class);
-
-   // Attributes ----------------------------------------------------
-
-   private final HornetQTopic managedTopic;
-
-   private final String binding;
-
-   private AddressControl addressControl;
-
-   private ManagementService managementService;
-
-   // Static --------------------------------------------------------
-
-   public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException
-   {
-      return (selectorStr == null || selectorStr.trim().length() == 0) ? null : SelectorTranslator.convertToHornetQFilterString(selectorStr);
-   }
-
-   // Constructors --------------------------------------------------
-
-   public TopicControlImpl(final HornetQTopic topic,
-                       final AddressControl addressControl,
-                       final String jndiBinding,
-                       final ManagementService managementService)
-   {
-      this.managedTopic = topic;
-      this.addressControl = addressControl;
-      this.binding = jndiBinding;
-      this.managementService = managementService;
-   }
-
-   // TopicControlMBean implementation ------------------------------
-
-   public String getName()
-   {
-      return managedTopic.getName();
-   }
-
-   public boolean isTemporary()
-   {
-      return managedTopic.isTemporary();
-   }
-
-   public String getAddress()
-   {
-      return managedTopic.getAddress();
-   }
-
-   public String getJNDIBinding()
-   {
-      return binding;
-   }
-
-   public int getMessageCount()
-   {
-      return getMessageCount(DurabilityType.ALL);
-   }
-
-   public int getDurableMessageCount()
-   {
-      return getMessageCount(DurabilityType.DURABLE);
-   }
-
-   public int getNonDurableMessageCount()
-   {
-      return getMessageCount(DurabilityType.NON_DURABLE);
-   }
-
-   public int getSubscriptionCount()
-   {
-      return getQueues(DurabilityType.ALL).size();
-   }
-
-   public int getDurableSubscriptionCount()
-   {
-      return getQueues(DurabilityType.DURABLE).size();
-   }
-
-   public int getNonDurableSubscriptionCount()
-   {
-      return getQueues(DurabilityType.NON_DURABLE).size();
-   }
-
-   public Object[] listAllSubscriptions()
-   {
-      return listSubscribersInfos(DurabilityType.ALL);
-   }
-   
-   public String listAllSubscriptionsAsJSON() throws Exception
-   {
-      return listSubscribersInfosAsJSON(DurabilityType.ALL);
-   }
-
-   public Object[] listDurableSubscriptions()
-   {
-      return listSubscribersInfos(DurabilityType.DURABLE);
-   }
-   
-   public String listDurableSubscriptionsAsJSON() throws Exception
-   {
-      return listSubscribersInfosAsJSON(DurabilityType.DURABLE);
-   }
-
-   public Object[] listNonDurableSubscriptions()
-   {
-      return listSubscribersInfos(DurabilityType.NON_DURABLE);
-   }
-   
-   public String listNonDurableSubscriptionsAsJSON() throws Exception
-   {
-      return listSubscribersInfosAsJSON(DurabilityType.NON_DURABLE);
-   }
-
-   public Map<String, Object>[] listMessagesForSubscription(final String queueName) throws Exception
-   {
-      QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
-      if (coreQueueControl == null)
-      {
-         throw new IllegalArgumentException("No subscriptions with name " + queueName);
-      }
-
-      Map<String, Object>[] coreMessages = coreQueueControl.listMessages(null);
-
-      Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
-
-      int i = 0;
-
-      for (Map<String, Object> coreMessage : coreMessages)
-      {
-         jmsMessages[i++] = HornetQMessage.coreMaptoJMSMap(coreMessage);
-      }
-      return jmsMessages;
-   }
-   
-   public String listMessagesForSubscriptionAsJSON(String queueName) throws Exception
-   {
-      return JMSQueueControlImpl.toJSON(listMessagesForSubscription(queueName));
-   }
-
-   public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
-   {
-      String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
-      QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
-      if (coreQueueControl == null)
-      {
-         throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
-      }
-      String filter = createFilterFromJMSSelector(filterStr);
-      return coreQueueControl.listMessages(filter).length;
-   }
-
-   public int removeMessages(String filterStr) throws Exception
-   {
-      String filter = createFilterFromJMSSelector(filterStr);
-      int count = 0;
-      String[] queues = addressControl.getQueueNames();
-      for (String queue : queues)
-      {
-         QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
-         count += coreQueueControl.removeMessages(filter);
-      }
-
-      return count;
-   }
-
-   public void dropDurableSubscription(String clientID, String subscriptionName) throws Exception
-   {
-      String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
-      QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
-      if (coreQueueControl == null)
-      {
-         throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
-      }
-      HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
-      serverControl.destroyQueue(queueName);
-   }
-
-   public void dropAllSubscriptions() throws Exception
-   {
-      HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
-      String[] queues = addressControl.getQueueNames();
-      for (String queue : queues)
-      {
-         serverControl.destroyQueue(queue);
-      }
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   private Object[] listSubscribersInfos(final DurabilityType durability)
-   {
-      List<QueueControl> queues = getQueues(durability);
-      List<Object[]> subInfos = new ArrayList<Object[]>(queues.size());
-
-      for (QueueControl queue : queues)
-      {
-         String clientID = null;
-         String subName = null;
-
-         if (queue.isDurable())
-         {
-            Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName().toString());
-            clientID = pair.a;
-            subName = pair.b;
-         }
-
-         String filter = queue.getFilter() != null ? queue.getFilter() : null;
-
-         Object[] subscriptionInfo = new Object[6];
-         subscriptionInfo[0] = queue.getName();
-         subscriptionInfo[1] = clientID;
-         subscriptionInfo[2] = subName;
-         subscriptionInfo[3] = queue.isDurable();
-         subscriptionInfo[4] = queue.getMessageCount();
-
-         subInfos.add(subscriptionInfo);
-      }
-      return subInfos.toArray(new Object[subInfos.size()]);
-   }
-   
-   private String listSubscribersInfosAsJSON(final DurabilityType durability) throws Exception
-   {
-      List<QueueControl> queues = getQueues(durability);
-      JSONArray array = new JSONArray();
-
-      for (QueueControl queue : queues)
-      {
-         String clientID = null;
-         String subName = null;
-
-         if (queue.isDurable())
-         {
-            Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName().toString());
-            clientID = pair.a;
-            subName = pair.b;
-         }
-
-         String filter = queue.getFilter() != null ? queue.getFilter() : null;
-
-         JSONObject info = new JSONObject();
-         info.put("queueName", queue.getName());
-         info.put("clientID", clientID);
-         info.put("selector", filter);
-         info.put("name", subName);
-         info.put("durable", queue.isDurable());
-         info.put("messageCount", queue.getMessageCount());
-         array.put(info);
-      }
-
-      return array.toString();
-   }
-
-   private int getMessageCount(final DurabilityType durability)
-   {
-      List<QueueControl> queues = getQueues(durability);
-      int count = 0;
-      for (QueueControl queue : queues)
-      {
-         count += queue.getMessageCount();
-      }
-      return count;
-   }
-
-   private List<QueueControl> getQueues(final DurabilityType durability)
-   {
-      try
-      {
-         List<QueueControl> matchingQueues = new ArrayList<QueueControl>();
-         String[] queues = addressControl.getQueueNames();
-         for (String queue : queues)
-         {
-            QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
-
-            // Ignore the "special" subscription
-            if (!coreQueueControl.getName().equals(addressControl.getAddress()))
-            {
-               if (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && coreQueueControl.isDurable()) ||
-                   (durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()))
-               {
-                  matchingQueues.add(coreQueueControl);
-               }
-            }
-         }
-         return matchingQueues;
-      }
-      catch (Exception e)
-      {
-         return Collections.emptyList();
-      }
-   }
-
-   // Inner classes -------------------------------------------------
-
-   private enum DurabilityType
-   {
-      ALL, DURABLE, NON_DURABLE
-   }
-}

Modified: branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml	2009-09-24 09:25:44 UTC (rev 7986)
@@ -25,8 +25,7 @@
       <message-expiry-scan-period>10111213</message-expiry-scan-period>
       <message-expiry-thread-priority>8</message-expiry-thread-priority>
       <id-cache-size>127</id-cache-size>
-      <persist-id-cache>true</persist-id-cache>
-      <queue-activation-timeout>12456</queue-activation-timeout>
+      <persist-id-cache>true</persist-id-cache>      
       <backup>true</backup>
       <shared-store>true</shared-store>
       <persist-delivery-count-before-delivery>true</persist-delivery-count-before-delivery>      

Modified: branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -120,7 +120,9 @@
    protected void tearDown() throws Exception
    {
       super.tearDown();
+      
       getJmsServerManager().destroyConnectionFactory("testsuitecf");
+      
       cf = null;
 
       assertRemainingMessages(0);

Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -31,6 +31,7 @@
 import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.impl.invm.TransportConstants;
 import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
@@ -49,6 +50,8 @@
  */
 public class SessionFactoryTest extends ServiceTestBase
 {
+   private static final Logger log = Logger.getLogger(SessionFactoryTest.class);
+
    private final String groupAddress = "230.1.2.3";
 
    private final int groupPort = 8765;
@@ -62,14 +65,14 @@
    private TransportConfiguration backupTC;
    
    protected void tearDown() throws Exception
-   {
+   {      
       if (liveService != null && liveService.isStarted())
-      {
+      {         
          liveService.stop();
-      }
+      }     
       if (backupService != null && backupService.isStarted())
-      {
-         liveService.stop();
+      {         
+         backupService.stop();
       }
       liveService = null;
       backupService = null;
@@ -106,7 +109,7 @@
    {
       try
       {
-         startLiveAndBackup();
+         startLiveAndBackup();         
          ClientSessionFactory cf = new ClientSessionFactoryImpl();
          assertFactoryParams(cf,
                              null,
@@ -136,7 +139,7 @@
                              ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL,
                              ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
                              ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS,
-                             ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
+                             ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);         
          try
          {
             ClientSession session = cf.createSession(false, true, true);
@@ -144,8 +147,9 @@
          }
          catch (HornetQException e)
          {
+            e.printStackTrace();
             // Ok
-         }
+         }    
          final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
          Pair<TransportConfiguration, TransportConfiguration> pair0 = new Pair<TransportConfiguration, TransportConfiguration>(this.liveTC,
                                                                                                                                this.backupTC);
@@ -854,10 +858,12 @@
    {
       if (liveService.isStarted())
       {
+         log.info("stopping live");
          liveService.stop();
       }
       if (backupService.isStarted())
       {
+         log.info("stopping backup");
          backupService.stop();
       }
    }

Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -50,8 +50,7 @@
 {
    private static final Logger log = Logger.getLogger(BridgeReconnectTest.class);
 
-
-   //Fail bridge and reconnecting immediately
+   // Fail bridge and reconnecting immediately
    public void testFailoverAndReconnectImmediately() throws Exception
    {
       Map<String, Object> server0Params = new HashMap<String, Object>();
@@ -94,16 +93,16 @@
       final long retryInterval = 50;
       final double retryIntervalMultiplier = 1d;
       final int reconnectAttempts = 1;
-   
+
       Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), server2tc.getName());
 
       BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
                                                                         queueName0,
                                                                         forwardAddress,
-                                                                        null,                                                            
                                                                         null,
+                                                                        null,
                                                                         retryInterval,
-                                                                        retryIntervalMultiplier,                                                               
+                                                                        retryIntervalMultiplier,
                                                                         reconnectAttempts,
                                                                         true,
                                                                         false,
@@ -151,7 +150,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = session0.createClientMessage(false);
+         ClientMessage message = session0.createClientMessage(true);
          message.putIntProperty(propKey, i);
 
          prod0.send(message);
@@ -175,7 +174,7 @@
       assertEquals(0, server1.getRemotingService().getConnections().size());
       assertEquals(0, service2.getRemotingService().getConnections().size());
    }
-   
+
    // Fail bridge and attempt failover a few times before succeeding
    public void testFailoverAndReconnectAfterAFewTries() throws Exception
    {
@@ -225,8 +224,8 @@
       BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
                                                                         queueName0,
                                                                         forwardAddress,
-                                                                        null,                                                               
                                                                         null,
+                                                                        null,
                                                                         retryInterval,
                                                                         retryIntervalMultiplier,
                                                                         reconnectAttempts,
@@ -272,7 +271,7 @@
       InVMConnector.numberOfFailures = reconnectAttempts - 1;
       forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
 
-      forwardingConnection = getForwardingConnection(bridge);      
+      forwardingConnection = getForwardingConnection(bridge);
       forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
 
       final int numMessages = 10;
@@ -305,7 +304,7 @@
       assertEquals(0, server1.getRemotingService().getConnections().size());
       assertEquals(0, service2.getRemotingService().getConnections().size());
    }
-   
+
    // Fail bridge and reconnect same node, no backup specified
    public void testReconnectSameNode() throws Exception
    {
@@ -344,8 +343,8 @@
       BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
                                                                         queueName0,
                                                                         forwardAddress,
-                                                                        null,                                                               
                                                                         null,
+                                                                        null,
                                                                         retryInterval,
                                                                         retryIntervalMultiplier,
                                                                         reconnectAttempts,
@@ -389,7 +388,7 @@
       InVMConnector.numberOfFailures = reconnectAttempts - 1;
       forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
 
-      forwardingConnection = getForwardingConnection(bridge);      
+      forwardingConnection = getForwardingConnection(bridge);
       forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
 
       final int numMessages = 10;
@@ -420,7 +419,7 @@
       assertEquals(0, server0.getRemotingService().getConnections().size());
       assertEquals(0, server1.getRemotingService().getConnections().size());
    }
-      
+
    public void testShutdownServerCleanlyAndReconnectSameNode() throws Exception
    {
       Map<String, Object> server0Params = new HashMap<String, Object>();
@@ -458,8 +457,8 @@
       BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
                                                                         queueName0,
                                                                         forwardAddress,
-                                                                        null,                                                               
                                                                         null,
+                                                                        null,
                                                                         retryInterval,
                                                                         retryIntervalMultiplier,
                                                                         reconnectAttempts,
@@ -491,7 +490,7 @@
 
       server1.stop();
       server1.start();
-      
+
       ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
       ClientSession session1 = csf1.createSession(false, true, true);
 
@@ -527,7 +526,7 @@
       assertEquals(0, server0.getRemotingService().getConnections().size());
       assertEquals(0, server1.getRemotingService().getConnections().size());
    }
-   
+
    public void testFailoverThenFailAgainAndReconnect() throws Exception
    {
       Map<String, Object> server0Params = new HashMap<String, Object>();
@@ -564,8 +563,8 @@
       BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
                                                                         queueName0,
                                                                         forwardAddress,
-                                                                        null,                                                           
                                                                         null,
+                                                                        null,
                                                                         retryInterval,
                                                                         retryIntervalMultiplier,
                                                                         reconnectAttempts,
@@ -626,13 +625,13 @@
          assertNotNull(r1);
          assertEquals(i, r1.getProperty(propKey));
       }
-      
-      //Fail again - should reconnect
+
+      // Fail again - should reconnect
       forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
       InVMConnector.failOnCreateConnection = true;
       InVMConnector.numberOfFailures = reconnectAttempts - 1;
       forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-      
+
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session0.createClientMessage(false);
@@ -657,24 +656,24 @@
       assertEquals(0, server0.getRemotingService().getConnections().size());
       assertEquals(0, server1.getRemotingService().getConnections().size());
    }
-   
+
    private RemotingConnection getForwardingConnection(final Bridge bridge) throws Exception
    {
       long start = System.currentTimeMillis();
-      
+
       do
       {
          RemotingConnection forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
-         
+
          if (forwardingConnection != null)
          {
             return forwardingConnection;
          }
-                  
+
          Thread.sleep(10);
       }
       while (System.currentTimeMillis() - start < 50000);
-      
+
       throw new IllegalStateException("Failed to get forwarding connection");
    }
 

Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -224,11 +224,11 @@
                                                                         forwardAddress,
                                                                         null,
                                                                         null,
-                                                                        1000,
+                                                                        500,
                                                                         1d,
                                                                         -1,
-                                                                        false,
                                                                         true,
+                                                                        true,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -322,7 +322,11 @@
 
          sf1.close();
 
+         log.info("stopping server 1");
+         
          server1.stop();
+         
+         log.info("stopped server 1");
 
          for (int i = 0; i < numMessages; i++)
          {
@@ -332,8 +336,12 @@
 
             producer0.send(message);
          }
+         
+         log.info("sent some more messages");
 
          server1.start();
+         
+         log.info("started server1");
 
          sf1 = new ClientSessionFactoryImpl(server1tc);
 
@@ -342,6 +350,8 @@
          consumer1 = session1.createConsumer(queueName1);
 
          session1.start();
+         
+         log.info("started session");
 
          for (int i = 0; i < numMessages; i++)
          {

Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -13,7 +13,6 @@
 
 package org.hornetq.tests.integration.cluster.bridge;
 
-import java.util.HashMap;
 import java.util.Map;
 
 import org.hornetq.core.config.Configuration;
@@ -35,29 +34,6 @@
  */
 public abstract class BridgeTestBase extends UnitTestCase
 {
-   protected HornetQServer createHornetQServerNIO(final int id, final Map<String, Object> params)
-   {
-      return createHornetQServerNIO(id, params, false);
-   }
-
-   protected HornetQServer createHornetQServerNIO(final int id,
-                                                      final Map<String, Object> params,
-                                                      final boolean backup)
-   {
-      Configuration serviceConf = new ConfigurationImpl();
-      serviceConf.setClustered(true);
-      serviceConf.setSecurityEnabled(false);
-      serviceConf.setBackup(backup);
-      serviceConf.setJournalMinFiles(2);
-      serviceConf.setJournalFileSize(100 * 1024);
-      params.put(TransportConstants.SERVER_ID_PROP_NAME, id);
-      serviceConf.getAcceptorConfigurations()
-                 .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                 params));
-      HornetQServer service = HornetQ.newHornetQServer(serviceConf);
-      return service;
-   }
-
    protected HornetQServer createHornetQServer(final int id, final Map<String, Object> params)
    {
       return createHornetQServer(id, params, false);
@@ -69,16 +45,18 @@
       serviceConf.setClustered(true);
       serviceConf.setSecurityEnabled(false);
       serviceConf.setBackup(backup);
+      serviceConf.setSharedStore(true);
+      serviceConf.setBindingsDirectory(getBindingsDir(id, false));
+      serviceConf.setJournalMinFiles(2);
+      serviceConf.setJournalDirectory(getJournalDir(id, false));
+      serviceConf.setPagingDirectory(getPageDir(id, false));
+      serviceConf.setLargeMessagesDirectory(getLargeMessagesDir(id, false));
+      
       params.put(TransportConstants.SERVER_ID_PROP_NAME, id);
       serviceConf.getAcceptorConfigurations()
-                 .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                 params));
-      HornetQServer service = HornetQ.newHornetQServer(serviceConf, false);
+                 .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", params));
+      HornetQServer service = HornetQ.newHornetQServer(serviceConf, true);
       return service;
    }
 
-   protected HornetQServer createHornetQServer(final int id)
-   {
-      return this.createHornetQServer(id, new HashMap<String, Object>());
-   }
 }

Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -1421,8 +1421,13 @@
       closeSessionFactory(3);
 
       stopServers(0, 3);
+      log.info("stopped servers");
 
       startServers(3, 0);
+      
+      log.info("restarted servers");
+      
+      Thread.sleep(2000);
 
       setupSessionFactory(0, isNetty());
       setupSessionFactory(3, isNetty());

Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -32,15 +32,12 @@
    {
       if (packet.getType() == PacketImpl.SESS_SEND)
       {
-         try
-         {
-            Thread.sleep(2000);
-         }
-         catch (Exception e)
-         {                  
-         }                  
+         //Lose the send
+         return false;             
       }
-      
-      return true;
+      else
+      {
+         return true;
+      }
    }
 }

Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -18,6 +18,7 @@
 import org.hornetq.core.remoting.Interceptor;
 import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 
 /**
  * A DelayInterceptor2
@@ -32,16 +33,15 @@
 
    public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
    {
-      try
+      if (packet.getType() == PacketImpl.NULL_RESPONSE)
       {
-         Thread.sleep(2000);
+         //Lose the response from the commit
+
+         return false;
       }
-      catch (InterruptedException e)
+      else
       {
+         return true;
       }
-
-      log.info("proceeding");
-
-      return true;
    }
 }

Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -35,16 +35,12 @@
    {
       if (packet.getType() == PacketImpl.SESS_COMMIT)
       {
-         log.info("got sess commit, delaying");
-         try
-         {
-            Thread.sleep(2000);
-         }
-         catch (Exception e)
-         {                  
-         }                  
+         //lose the commit
+         return false;
       }
-      
-      return false;
+      else
+      {      
+         return true;
+      }      
    }
 }

Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -472,8 +472,6 @@
 
       RemotingConnection conn = ((ClientSessionInternal)session2).getConnection();
 
-      log.info("Failing connection**");
-
       // Simulate failure on connection
       conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
 
@@ -483,16 +481,12 @@
 
       assertTrue(ok);
 
-      log.info("** creating the consumer");
-
       consumer = session2.createConsumer(ADDRESS);
 
       for (int i = numMessages / 2; i < numMessages; i++)
       {
          ClientMessage message = consumer.receive(1000);
 
-         log.info("got message " + message);
-
          assertNotNull(message);
 
          assertEquals("message" + i, message.getBody().readString());
@@ -1173,9 +1167,7 @@
 
       assertTrue(ok);
 
-      log.info("closing session");
       session.close();
-      log.info("closed session");
 
       sf = new ClientSessionFactoryImpl(getConnectorTransportConfiguration(false));
 
@@ -1477,8 +1469,6 @@
 
       assertTrue(ok);
 
-      log.info("after failover");
-
       for (int i = 0; i < numMessages; i++)
       {
          // Only the persistent messages will survive
@@ -1487,8 +1477,6 @@
          {
             ClientMessage message = consumer.receive(1000);
 
-            log.info("got message " + i);
-
             assertNotNull(message);
 
             assertEquals("message" + i, message.getBody().readString());
@@ -1788,9 +1776,7 @@
       sf.setBlockOnPersistentSend(true);
       sf.setBlockOnAcknowledge(true);
 
-      log.info("creating session");
       final ClientSession session = sf.createSession(false, false);
-      log.info("created session");
 
       session.createQueue(ADDRESS, ADDRESS, null, true);
 
@@ -1840,23 +1826,18 @@
                sf.addInterceptor(interceptor);
 
                session.commit();
-
-               log.info("Initial commit succeeded");
             }
             catch (HornetQException e)
             {
                if (e.getCode() == HornetQException.UNBLOCKED)
                {
-                  log.info("commit unblocked");
-
                   // Ok - now we retry the commit after removing the interceptor
 
                   sf.removeInterceptor(interceptor);
 
                   try
                   {
-                     log.info("retrying commit");
-                     session.commit();
+                     session.commit();                     
                   }
                   catch (HornetQException e2)
                   {
@@ -1958,9 +1939,7 @@
       sf.setBlockOnPersistentSend(true);
       sf.setBlockOnAcknowledge(true);
 
-      log.info("creating session");
       final ClientSession session = sf.createSession(false, false);
-      log.info("created session");
 
       session.createQueue(ADDRESS, ADDRESS, null, true);
 
@@ -2002,15 +1981,11 @@
                server0Service.getRemotingService().addInterceptor(interceptor);
 
                session.commit();
-
-               log.info("Initial commit succeeded");
             }
             catch (HornetQException e)
             {
                if (e.getCode() == HornetQException.UNBLOCKED)
                {
-                  log.info("commit unblocked");
-
                   // Ok - now we retry the commit after removing the interceptor
 
                   server0Service.getRemotingService().removeInterceptor(interceptor);

Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -17,6 +17,7 @@
 import java.util.Map;
 
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
@@ -79,12 +80,12 @@
    public void testAutomaticFailover() throws Exception
    {
       HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"),
-                                                               new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                                          backupParams));
-      
+                                                                   new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                              backupParams));
+
       jbcf.setBlockOnPersistentSend(true);
       jbcf.setBlockOnNonPersistentSend(true);
-      
+
       Connection conn = jbcf.createConnection();
 
       MyExceptionListener listener = new MyExceptionListener();
@@ -99,7 +100,7 @@
 
       SimpleString jmsQueueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
 
-      coreSession.createQueue(jmsQueueName, jmsQueueName, null, false);
+      coreSession.createQueue(jmsQueueName, jmsQueueName, null, true);
 
       Queue queue = sess.createQueue("myqueue");
 
@@ -107,6 +108,8 @@
 
       MessageProducer producer = sess.createProducer(queue);
 
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
       MessageConsumer consumer = sess.createConsumer(queue);
 
       for (int i = 0; i < numMessages; i++)
@@ -137,19 +140,20 @@
 
       conn.close();
 
-      assertNull(listener.e);
+      assertNotNull(listener.e);
+      
+      assertTrue(me == listener.e.getCause());
    }
 
    public void testManualFailover() throws Exception
    {
       HornetQConnectionFactory jbcfLive = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
-      
+
       jbcfLive.setBlockOnNonPersistentSend(true);
       jbcfLive.setBlockOnPersistentSend(true);
 
-
       HornetQConnectionFactory jbcfBackup = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                                                backupParams));
+                                                                                                    backupParams));
       jbcfBackup.setBlockOnNonPersistentSend(true);
       jbcfBackup.setBlockOnPersistentSend(true);
 
@@ -167,7 +171,7 @@
 
       SimpleString jmsQueueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
 
-      coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, false);
+      coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, true);
 
       Queue queue = sessLive.createQueue("myqueue");
 
@@ -200,11 +204,8 @@
 
       Connection connBackup = jbcfBackup.createConnection();
 
-      log.info("creating session on backup");
       Session sessBackup = connBackup.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-      log.info("created on backup");
-
       MessageConsumer consumerBackup = sessBackup.createConsumer(queue);
 
       connBackup.start();
@@ -238,24 +239,29 @@
       backupConf.setSecurityEnabled(false);
       backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
       backupConf.getAcceptorConfigurations()
-                .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                backupParams));
+                .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", backupParams));
       backupConf.setBackup(true);
-      backupService = HornetQ.newHornetQServer(backupConf, false);
+      backupConf.setSharedStore(true);
+      backupConf.setBindingsDirectory(getBindingsDir());
+      backupConf.setJournalMinFiles(2);
+      backupConf.setJournalDirectory(getJournalDir());
+      backupConf.setPagingDirectory(getPageDir());
+      backupConf.setLargeMessagesDirectory(getLargeMessagesDir());
+      backupService = HornetQ.newHornetQServer(backupConf, true);
       backupService.start();
 
       Configuration liveConf = new ConfigurationImpl();
       liveConf.setSecurityEnabled(false);
       liveConf.getAcceptorConfigurations()
               .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
-      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
-      TransportConfiguration backupTC = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                   backupParams,
-                                                                   "backup-connector");
-      connectors.put(backupTC.getName(), backupTC);
-      liveConf.setConnectorConfigurations(connectors);
-      liveConf.setBackupConnectorName(backupTC.getName());
-      liveService = HornetQ.newHornetQServer(liveConf, false);
+      liveConf.setSharedStore(true);
+      liveConf.setBindingsDirectory(getBindingsDir());
+      liveConf.setJournalMinFiles(2);
+      liveConf.setJournalDirectory(getJournalDir());
+      liveConf.setPagingDirectory(getPageDir());
+      liveConf.setLargeMessagesDirectory(getLargeMessagesDir());
+
+      liveService = HornetQ.newHornetQServer(liveConf, true);
       liveService.start();
    }
 
@@ -269,11 +275,11 @@
       assertEquals(0, InVMRegistry.instance.size());
 
       liveService = null;
-      
+
       backupService = null;
-      
+
       backupParams = null;
-      
+
       super.tearDown();
    }
 

Deleted: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -1,241 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.jms.server;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.impl.FileConfiguration;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.security.HornetQSecurityManager;
-import org.hornetq.core.security.impl.HornetQSecurityManagerImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.integration.transports.netty.NettyConnectorFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.JMSServerManager;
-import org.hornetq.jms.server.impl.JMSServerManagerImpl;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * 
- * A JMSServerStartStopWithReplicationTest
- * 
- * Make sure live backup pair can be stopped and started ok multiple times with predefined queues etc
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class JMSServerStartStopWithReplicationTest extends UnitTestCase
-{
-   private static final Logger log = Logger.getLogger(JMSServerStartStopWithReplicationTest.class);
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private final Map<String, Object> backupParams = new HashMap<String, Object>();
-
-   private JMSServerManager liveJMSServer;
-
-   private JMSServerManager backupJMSServer;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testStopStartBackupBeforeLive() throws Exception
-   {
-      testStopStart1(true);
-   }
-
-   public void testStopStartLiveBeforeBackup() throws Exception
-   {
-      testStopStart1(false);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   @Override
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      this.liveJMSServer = null;
-      this.backupJMSServer = null;
-      
-      super.tearDown();
-   }
-
-   // Private -------------------------------------------------------
-
-   private void testStopStart1(final boolean backupBeforeLive) throws Exception
-   {
-      final int numMessages = 5;
-
-      for (int j = 0; j < numMessages; j++)
-      {
-         log.info("Iteration " + j);
-
-         startBackup();
-         startLive();
-
-         HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName()),
-                                                                  new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(),
-                                                                                             backupParams));
-
-         jbcf.setBlockOnPersistentSend(true);
-         jbcf.setBlockOnNonPersistentSend(true);
-
-         Connection conn = jbcf.createConnection();
-
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         Queue queue = sess.createQueue("myJMSQueue");
-
-         MessageProducer producer = sess.createProducer(queue);
-
-         TextMessage tm = sess.createTextMessage("message" + j);
-
-         producer.send(tm);
-
-         conn.close();
-
-         jbcf.close();
-
-         if (backupBeforeLive)
-         {
-            stopBackup();
-            stopLive();
-         }
-         else
-         {
-            stopLive();
-            stopBackup();
-         }
-      }
-
-      startBackup();
-      startLive();
-
-      HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName()),
-                                                               new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(),
-                                                                                          backupParams));
-
-      jbcf.setBlockOnPersistentSend(true);
-      jbcf.setBlockOnNonPersistentSend(true);
-
-      Connection conn = jbcf.createConnection();
-
-      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      Queue queue = sess.createQueue("myJMSQueue");
-
-      MessageConsumer consumer = sess.createConsumer(queue);
-
-      conn.start();
-
-      for (int i = 0; i < numMessages; i++)
-      {
-         TextMessage tm = (TextMessage)consumer.receive(1000);
-
-         assertNotNull(tm);
-
-         assertEquals("message" + i, tm.getText());
-      }
-
-      conn.close();
-
-      jbcf.close();
-
-      if (backupBeforeLive)
-      {
-         stopBackup();
-         stopLive();
-      }
-      else
-      {
-         stopLive();
-         stopBackup();
-      }
-   }
-
-   private void stopLive() throws Exception
-   {
-      liveJMSServer.stop();
-   }
-
-   private void stopBackup() throws Exception
-   {
-      backupJMSServer.stop();
-   }
-
-   private void startLive() throws Exception
-   {
-      FileConfiguration fcLive = new FileConfiguration();
-
-      fcLive.setConfigurationUrl("server-start-stop-live-config1.xml");
-
-      fcLive.start();
-
-      HornetQSecurityManager smLive = new HornetQSecurityManagerImpl();
-
-      HornetQServer liveServer = new HornetQServerImpl(fcLive, smLive);
-
-      liveJMSServer = new JMSServerManagerImpl(liveServer, "server-start-stop-live-jms-config1.xml");
-
-      liveJMSServer.setContext(null);
-
-      liveJMSServer.start();
-   }
-
-   private void startBackup() throws Exception
-   {
-      FileConfiguration fcBackup = new FileConfiguration();
-
-      fcBackup.setConfigurationUrl("server-start-stop-backup-config1.xml");
-
-      fcBackup.start();
-
-      HornetQSecurityManager smBackup = new HornetQSecurityManagerImpl();
-
-      HornetQServer liveServer = new HornetQServerImpl(fcBackup, smBackup);
-
-      backupJMSServer = new JMSServerManagerImpl(liveServer, "server-start-stop-backup-jms-config1.xml");
-
-      backupJMSServer.setContext(null);
-
-      backupJMSServer.start();
-   }
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java	2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java	2009-09-24 09:25:44 UTC (rev 7986)
@@ -270,7 +270,7 @@
       }
    }
 
-   protected static  Object checkBinding(Context context, String binding) throws Exception
+   protected static Object checkBinding(Context context, String binding) throws Exception
    {
       Object o = context.lookup(binding);
       assertNotNull(o);



More information about the hornetq-commits mailing list