[hornetq-commits] JBoss hornetq SVN: r8256 - in trunk: examples/jms/bridge/server0 and 17 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 10 15:03:12 EST 2009


Author: timfox
Date: 2009-11-10 15:03:11 -0500 (Tue, 10 Nov 2009)
New Revision: 8256

Modified:
   trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java
   trunk/examples/jms/bridge/server0/hornetq-configuration.xml
   trunk/examples/jms/bridge/server1/hornetq-configuration.xml
   trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java
   trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java
   trunk/examples/jms/large-message/build.xml
   trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
   trunk/src/config/common/schema/hornetq-configuration.xsd
   trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
   trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java
   trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/server/ServerMessage.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/utils/XMLUtil.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
   trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
mainly https://jira.jboss.org/jira/browse/HORNETQ-182

Modified: trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java
===================================================================
--- trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -178,7 +178,10 @@
    {
       for (Process server : servers)
       {
-         stopServer(server);
+         if (server != null)
+         {
+            stopServer(server);
+         }
       }
    }
 

Modified: trunk/examples/jms/bridge/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/bridge/server0/hornetq-configuration.xml	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/bridge/server0/hornetq-configuration.xml	2009-11-10 20:03:11 UTC (rev 8256)
@@ -42,10 +42,23 @@
           <filter string="name='aardvark'"/>
           <transformer-class-name>org.hornetq.jms.example.HatColourChangeTransformer</transformer-class-name>
           <reconnect-attempts>-1</reconnect-attempts>
+          
+          <confirmation-window-size>500000</confirmation-window-size>
+          
           <connector-ref connector-name="remote-connector"/>          
        </bridge>
    </bridges>
+
+
+   <address-settings>
+     <address-setting match="jms.queue.sausage-factory">
+         <max-size-bytes>10000000</max-size-bytes>
+         <page-size-bytes>1000000</page-size-bytes> 
+         <address-full-policy>PAGE</address-full-policy>
+      </address-setting>
+   </address-settings>
    
+   
    <!-- Other config -->
 
    <security-settings>

Modified: trunk/examples/jms/bridge/server1/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/bridge/server1/hornetq-configuration.xml	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/bridge/server1/hornetq-configuration.xml	2009-11-10 20:03:11 UTC (rev 8256)
@@ -23,6 +23,15 @@
    
    <!-- Other config -->
 
+ <address-settings>
+ 
+     <address-setting match="jms.queue.sausage-factory">
+         <max-size-bytes>10000</max-size-bytes>
+         <page-size-bytes>1000</page-size-bytes> 
+         <address-full-policy>PAGE</address-full-policy>
+      </address-setting>
+   </address-settings>
+   
    <security-settings>
       <!--security for example queue-->
       <security-setting match="jms.queue.#">

Modified: trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java
===================================================================
--- trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -12,6 +12,7 @@
  */
 package org.hornetq.jms.example;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Message;
@@ -98,42 +99,61 @@
          // Step 12. We create a JMS MessageProducer object on server 0
          MessageProducer producer = session0.createProducer(sausageFactory);
 
-         // Step 13. We create and send a message representing an aardvark with a green hat to the sausage-factory
-         // on node 0
-         Message message = session0.createMessage();
+         final int numMessages = 100000;
+         
+         byte[] bytes = new byte[1000];
 
-         message.setStringProperty("name", "aardvark");
+         for (int i = 0; i < numMessages; i++)
+         {
 
-         message.setStringProperty("hat", "green");
+            // Step 13. We create and send a message representing an aardvark with a green hat to the sausage-factory
+            // on node 0
+            BytesMessage message = session0.createBytesMessage();
 
-         producer.send(message);
+            message.setStringProperty("name", "aardvark");
 
-         System.out.println("Sent " + message.getStringProperty("name") +
-                            " message with " +
-                            message.getStringProperty("hat") +
-                            " hat to sausage-factory on node 0");
+            message.setStringProperty("hat", "green");
+            
+            message.writeBytes(bytes);
 
+            producer.send(message);
+
+            if (i % 1000 == 0)
+            {
+               System.out.println("Sent " + i);
+            }
+         }
+
          // Step 14 - we successfully receive the aardvark message from the mincing-machine one node 1. The aardvark's
          // hat is now blue since it has been transformed!
 
-         Message receivedMessage = consumer.receive(5000);
+         for (int i = 0; i < numMessages; i++)
+         {
+            Message receivedMessage = consumer.receive(5000);
+            
+            if (receivedMessage == null)
+            {
+               throw new IllegalStateException("Did not receive message");
+            }
 
-         System.out.println("Received " + receivedMessage.getStringProperty("name") +
-                            " message with " +
-                            receivedMessage.getStringProperty("hat") +
-                            " hat from mincing-machine on node 1");
+            if (i % 1000 == 0)
+            {
+               System.out.println("Received " + i);
+            }
 
-         // Step 13. We create and send another message, this time representing a sasquatch with a mauve hat to the
-         // sausage-factory on node 0. This won't be bridged to the mincing-machine since we only want aardvarks, not sasquatches
+            // Step 13. We create and send another message, this time representing a sasquatch with a mauve hat to the
+            // sausage-factory on node 0. This won't be bridged to the mincing-machine since we only want aardvarks, not
+            // sasquatches
+         }
+         
+         Message message = session0.createMessage();
 
-         message = session0.createMessage();
-
          message.setStringProperty("name", "sasquatch");
 
-         message.setStringProperty("hat", "mauve");        
+         message.setStringProperty("hat", "mauve");
 
          producer.send(message);
-         
+
          System.out.println("Sent " + message.getStringProperty("name") +
                             " message with " +
                             message.getStringProperty("hat") +
@@ -141,7 +161,7 @@
 
          // Step 14. We don't receive the message since it has not been bridged.
 
-         receivedMessage = (TextMessage)consumer.receive(1000);
+         Message receivedMessage = (TextMessage)consumer.receive(1000);
 
          if (receivedMessage == null)
          {

Modified: trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java
===================================================================
--- trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -33,7 +33,7 @@
       
       SimpleString oldProp = message.getSimpleStringProperty(propName);
       
-      System.out.println("Old hat colour is " + oldProp);
+      //System.out.println("Old hat colour is " + oldProp);
       
       //Change the colour
       message.putStringProperty(propName, new SimpleString("blue"));

Modified: trunk/examples/jms/large-message/build.xml
===================================================================
--- trunk/examples/jms/large-message/build.xml	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/large-message/build.xml	2009-11-10 20:03:11 UTC (rev 8256)
@@ -23,7 +23,7 @@
       <antcall target="runExample">
          <param name="example.classname" value="org.hornetq.jms.example.LargeMessageExample"/>
          
-         <!-- We limit the client to running in only 50MB of RAM -->
+           <!-- We limit the client to running in only 50MB of RAM -->
      	   <param name="java-min-memory" value="50M"/>
      	   <param name="java-max-memory" value="50M"/>
       </antcall>
@@ -36,7 +36,7 @@
       </antcall>
    </target>
    
-   <target name="delete-large-messages">
+   <target name="delete-large-messages" depends="clean">
       <delete file="huge_message_to_send.dat"/>
       <delete file="huge_message_received.dat"/>
    </target>

Modified: trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -45,11 +45,13 @@
       new LargeMessageExample().run(args);
    }
 
-   // The message we will send is size 256MB, even though we are only running in 50MB of RAM on both client and server.
-   // HornetQ will support much larger message sizes, but we use 512MB so the example runs in reasonable time.
-  // private final long FILE_SIZE = 256L * 1024 * 1024;
+   // The message we will send is size 10GiB, even though we are only running in 50MB of RAM on both client and server.
+   // This may take some considerable time to create, send and consume - if it takes too long or you don't have
+   // enough disk space just reduce the file size here
    
-   private final long FILE_SIZE = 2L * 1024 * 1024 * 1024; // 2 GiB message
+   // private final long FILE_SIZE = 256L * 1024 * 1024;
+   
+   private final long FILE_SIZE = 10L * 1024 * 1024 * 1024; // 10 GiB message
 
    public boolean runExample() throws Exception
    {

Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd	2009-11-10 20:03:11 UTC (rev 8256)
@@ -316,6 +316,8 @@
 			</xsd:element>
 			<xsd:element maxOccurs="1" minOccurs="0" name="use-duplicate-detection" type="xsd:boolean">
 			</xsd:element>
+			<xsd:element maxOccurs="1" minOccurs="0" name="confirmation-window-size" type="xsd:int">
+			</xsd:element>
 			<xsd:choice>
 				<xsd:element maxOccurs="1" minOccurs="1" name="connector-ref" type="connector-refType">
 				</xsd:element>
@@ -342,7 +344,8 @@
 			</xsd:element>
 			<xsd:element maxOccurs="1" minOccurs="0" name="max-hops" type="xsd:int">
 			</xsd:element>
-			
+			<xsd:element maxOccurs="1" minOccurs="0" name="confirmation-window-size" type="xsd:int">
+			</xsd:element>
 			<xsd:choice>
 				<xsd:element maxOccurs="unbounded" minOccurs="1" name="connector-ref" type="connector-refType">
 				</xsd:element>

Modified: trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -53,6 +53,8 @@
    private boolean failoverOnServerShutdown;
 
    private boolean useDuplicateDetection;
+   
+   private int confirmationWindowSize;
 
    public BridgeConfiguration(final String name,
                               final String queueName,
@@ -64,6 +66,7 @@
                               final int reconnectAttempts,
                               final boolean failoverOnServerShutdown,
                               final boolean useDuplicateDetection,
+                              final int confirmationWindowSize,
                               final Pair<String, String> connectorPair)
    {
       this.name = name;
@@ -76,6 +79,7 @@
       this.reconnectAttempts = reconnectAttempts;
       this.failoverOnServerShutdown = failoverOnServerShutdown;
       this.useDuplicateDetection = useDuplicateDetection;
+      this.confirmationWindowSize = confirmationWindowSize;
       this.connectorPair = connectorPair;
       this.discoveryGroupName = null;
    }
@@ -90,6 +94,7 @@
                               final int reconnectAttempts,
                               final boolean failoverOnServerShutdown,
                               final boolean useDuplicateDetection,
+                              final int confirmationWindowSize,
                               final String discoveryGroupName)
    {
       this.name = name;
@@ -102,6 +107,7 @@
       this.reconnectAttempts = reconnectAttempts;
       this.failoverOnServerShutdown = failoverOnServerShutdown;
       this.useDuplicateDetection = useDuplicateDetection;
+      this.confirmationWindowSize = confirmationWindowSize;
       this.connectorPair = null;
       this.discoveryGroupName = discoveryGroupName;
    }
@@ -165,6 +171,11 @@
    {
       return useDuplicateDetection;
    }
+   
+   public int getConfirmationWindowSize()
+   {
+      return confirmationWindowSize;
+   }
 
    /**
     * @param name the name to set
@@ -261,4 +272,12 @@
    {
       this.useDuplicateDetection = useDuplicateDetection;
    }
+   
+   /**
+    * @param confirmationWindowSize the confirmationWindowSize to set
+    */
+   public void setConfirmationWindowSize(int confirmationWindowSize)
+   {
+      this.confirmationWindowSize = confirmationWindowSize;
+   }
 }

Modified: trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -47,12 +47,15 @@
 
    private final int maxHops;
 
+   private final int confirmationWindowSize;
+
    public ClusterConnectionConfiguration(final String name,
                                          final String address,
                                          final long retryInterval,
                                          final boolean duplicateDetection,
                                          final boolean forwardWhenNoConsumers,
                                          final int maxHops,
+                                         final int confirmationWindowSize,
                                          final List<Pair<String, String>> staticConnectorNamePairs)
    {
       this.name = name;
@@ -63,6 +66,7 @@
       this.forwardWhenNoConsumers = forwardWhenNoConsumers;
       this.discoveryGroupName = null;
       this.maxHops = maxHops;
+      this.confirmationWindowSize = confirmationWindowSize;
    }
 
    public ClusterConnectionConfiguration(final String name,
@@ -71,6 +75,7 @@
                                          final boolean duplicateDetection,
                                          final boolean forwardWhenNoConsumers,
                                          final int maxHops,
+                                         final int confirmationWindowSize,
                                          final String discoveryGroupName)
    {
       this.name = name;
@@ -81,6 +86,7 @@
       this.discoveryGroupName = discoveryGroupName;
       this.staticConnectorNamePairs = null;
       this.maxHops = maxHops;
+      this.confirmationWindowSize = confirmationWindowSize;
    }
 
    public String getName()
@@ -108,6 +114,11 @@
       return maxHops;
    }
 
+   public int getConfirmationWindowSize()
+   {
+      return confirmationWindowSize;
+   }
+
    public List<Pair<String, String>> getStaticConnectorNamePairs()
    {
       return staticConnectorNamePairs;

Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -154,7 +154,7 @@
 
    public static final int DEFAULT_CLUSTER_MAX_HOPS = 1;
 
-   public static final int DEFAULT_CLUSTER_RETRY_INTERVAL = 500;
+   public static final long DEFAULT_CLUSTER_RETRY_INTERVAL = 500;
 
    public static final boolean DEFAULT_DIVERT_EXCLUSIVE = false;
 
@@ -166,7 +166,7 @@
 
    public static final int DEFAULT_MEMORY_WARNING_THRESHOLD = 25;
 
-   public static final long DEFAULT_MEMORY_MEASURE_INTERVAL = 3000; // in milliseconds
+   public static final long DEFAULT_MEMORY_MEASURE_INTERVAL = -1; // in milliseconds
 
    public static final int DEFAULT_BACKUP_WINDOW_SIZE = 1024 * 1024;
 

Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -36,7 +36,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.config.TransportConfiguration;
@@ -74,25 +73,28 @@
    private static final String DEFAULT_CONFIGURATION_URL = "hornetq-configuration.xml";
 
    private static final String CONFIGURATION_SCHEMA_URL = "schema/hornetq-configuration.xsd";
+   
+   //For a bridge confirmations must be activated or send acknowledgements won't return
+   
+   public static final int DEFAULT_CONFIRMATION_WINDOW_SIZE = 1024 * 1024;
 
    // Static --------------------------------------------------------------------------
-   
+
    // Attributes ----------------------------------------------------------------------
 
    private String configurationUrl = DEFAULT_CONFIGURATION_URL;
-   
 
    private boolean started;
-   
+
    // Public -------------------------------------------------------------------------
 
    public synchronized void start() throws Exception
-   {      
+   {
       if (started)
       {
          return;
       }
-       
+
       URL url = getClass().getClassLoader().getResource(configurationUrl);
       log.debug("Loading server configuration from " + url);
 
@@ -105,20 +107,22 @@
       clustered = getBoolean(e, "clustered", clustered);
 
       backup = getBoolean(e, "backup", backup);
-      
+
       sharedStore = getBoolean(e, "shared-store", sharedStore);
-      
-      //Defaults to true when using FileConfiguration
+
+      // Defaults to true when using FileConfiguration
       fileDeploymentEnabled = getBoolean(e, "file-deployment-enabled", true);
-      
+
       persistenceEnabled = getBoolean(e, "persistence-enabled", persistenceEnabled);
 
-      persistDeliveryCountBeforeDelivery = getBoolean(e, "persist-delivery-count-before-delivery", persistDeliveryCountBeforeDelivery);
-      
+      persistDeliveryCountBeforeDelivery = getBoolean(e,
+                                                      "persist-delivery-count-before-delivery",
+                                                      persistDeliveryCountBeforeDelivery);
+
       // NOTE! All the defaults come from the super class
 
       scheduledThreadPoolMaxSize = getInteger(e, "scheduled-thread-pool-max-size", scheduledThreadPoolMaxSize, GT_ZERO);
-      
+
       threadPoolMaxSize = getInteger(e, "thread-pool-max-size", threadPoolMaxSize, MINUS_ONE_OR_GT_ZERO);
 
       securityEnabled = getBoolean(e, "security-enabled", securityEnabled);
@@ -130,35 +134,53 @@
       securityInvalidationInterval = getLong(e, "security-invalidation-interval", securityInvalidationInterval, GT_ZERO);
 
       connectionTTLOverride = getLong(e, "connection-ttl-override", connectionTTLOverride, MINUS_ONE_OR_GT_ZERO);
-      
-      asyncConnectionExecutionEnabled = getBoolean(e, "async-connection-execution-enabled", asyncConnectionExecutionEnabled);
 
+      asyncConnectionExecutionEnabled = getBoolean(e,
+                                                   "async-connection-execution-enabled",
+                                                   asyncConnectionExecutionEnabled);
+
       transactionTimeout = getLong(e, "transaction-timeout", transactionTimeout, GT_ZERO);
 
-      transactionTimeoutScanPeriod = getLong(e, "transaction-timeout-scan-period", transactionTimeoutScanPeriod, GT_ZERO);
+      transactionTimeoutScanPeriod = getLong(e,
+                                             "transaction-timeout-scan-period",
+                                             transactionTimeoutScanPeriod,
+                                             GT_ZERO);
 
       messageExpiryScanPeriod = getLong(e, "message-expiry-scan-period", messageExpiryScanPeriod, GT_ZERO);
 
-      messageExpiryThreadPriority = getInteger(e, "message-expiry-thread-priority", messageExpiryThreadPriority, THREAD_PRIORITY_RANGE);
+      messageExpiryThreadPriority = getInteger(e,
+                                               "message-expiry-thread-priority",
+                                               messageExpiryThreadPriority,
+                                               THREAD_PRIORITY_RANGE);
 
       idCacheSize = getInteger(e, "id-cache-size", idCacheSize, GT_ZERO);
 
       persistIDCache = getBoolean(e, "persist-id-cache", persistIDCache);
 
-      managementAddress = new SimpleString(getString(e, "management-address", managementAddress.toString(), NOT_NULL_OR_EMPTY));
+      managementAddress = new SimpleString(getString(e,
+                                                     "management-address",
+                                                     managementAddress.toString(),
+                                                     NOT_NULL_OR_EMPTY));
 
       managementNotificationAddress = new SimpleString(getString(e,
                                                                  "management-notification-address",
-                                                                 managementNotificationAddress.toString(), NOT_NULL_OR_EMPTY));
+                                                                 managementNotificationAddress.toString(),
+                                                                 NOT_NULL_OR_EMPTY));
 
-      managementClusterPassword = getString(e, "management-cluster-password", managementClusterPassword, NOT_NULL_OR_EMPTY);
+      managementClusterPassword = getString(e,
+                                            "management-cluster-password",
+                                            managementClusterPassword,
+                                            NOT_NULL_OR_EMPTY);
 
       managementClusterUser = getString(e, "management-cluster-user", managementClusterUser, NOT_NULL_OR_EMPTY);
 
       managementRequestTimeout = getLong(e, "management-request-timeout", managementRequestTimeout, GT_ZERO);
-      
-      logDelegateFactoryClassName = getString(e, "log-delegate-factory-class-name", logDelegateFactoryClassName, NOT_NULL_OR_EMPTY);
 
+      logDelegateFactoryClassName = getString(e,
+                                              "log-delegate-factory-class-name",
+                                              logDelegateFactoryClassName,
+                                              NOT_NULL_OR_EMPTY);
+
       NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
 
       ArrayList<String> interceptorList = new ArrayList<String>();
@@ -259,7 +281,7 @@
 
       for (int i = 0; i < gaNodes.getLength(); i++)
       {
-         Element gaNode = (Element) gaNodes.item(i);
+         Element gaNode = (Element)gaNodes.item(i);
 
          parseGroupingHandlerConfiguration(gaNode);
       }
@@ -312,23 +334,23 @@
       journalSyncNonTransactional = getBoolean(e, "journal-sync-non-transactional", journalSyncNonTransactional);
 
       journalFileSize = getInteger(e, "journal-file-size", journalFileSize, GT_ZERO);
-      
+
       journalAIOFlushSync = getBoolean(e, "journal-aio-flush-on-sync", DEFAULT_JOURNAL_AIO_FLUSH_SYNC);
-      
+
       journalAIOBufferTimeout = getInteger(e, "journal-aio-buffer-timeout", DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT, GT_ZERO);
-      
+
       journalAIOBufferSize = getInteger(e, "journal-aio-buffer-size", DEFAULT_JOURNAL_AIO_BUFFER_SIZE, GT_ZERO);
 
       journalMinFiles = getInteger(e, "journal-min-files", journalMinFiles, GT_ZERO);
-      
+
       journalCompactMinFiles = getInteger(e, "journal-compact-min-files", journalCompactMinFiles, GE_ZERO);
 
       journalCompactPercentage = getInteger(e, "journal-compact-percentage", journalCompactPercentage, PERCENTAGE);
 
       journalMaxAIO = getInteger(e, "journal-max-aio", journalMaxAIO, GT_ZERO);
-      
+
       logJournalWriteRate = getBoolean(e, "log-journal-write-rate", DEFAULT_JOURNAL_LOG_WRITE_RATE);
-      
+
       journalPerfBlastPages = getInteger(e, "perf-blast-pages", DEFAULT_JOURNAL_PERF_BLAST_PAGES, MINUS_ONE_OR_GT_ZERO);
 
       runSyncSpeedTest = getBoolean(e, "run-sync-speed-test", runSyncSpeedTest);
@@ -339,23 +361,28 @@
 
       messageCounterSamplePeriod = getLong(e, "message-counter-sample-period", messageCounterSamplePeriod, GT_ZERO);
 
-      messageCounterMaxDayHistory = getInteger(e, "message-counter-max-day-history", messageCounterMaxDayHistory, GT_ZERO);
-      
-      serverDumpInterval = getLong(e, "server-dump-interval", serverDumpInterval, MINUS_ONE_OR_GT_ZERO); // in milliseconds
+      messageCounterMaxDayHistory = getInteger(e,
+                                               "message-counter-max-day-history",
+                                               messageCounterMaxDayHistory,
+                                               GT_ZERO);
 
+      serverDumpInterval = getLong(e, "server-dump-interval", serverDumpInterval, MINUS_ONE_OR_GT_ZERO); // in
+                                                                                                         // milliseconds
+
       memoryWarningThreshold = getInteger(e, "memory-warning-threshold", memoryWarningThreshold, PERCENTAGE);
-      
-      memoryMeasureInterval = getLong(e, "memory-measure-interval", memoryMeasureInterval, MINUS_ONE_OR_GT_ZERO); // in milliseconds
-      
+
+      memoryMeasureInterval = getLong(e, "memory-measure-interval", memoryMeasureInterval, MINUS_ONE_OR_GT_ZERO); // in
+                                                                                                                  // milliseconds
+
       backupWindowSize = getInteger(e, "backup-window-size", DEFAULT_BACKUP_WINDOW_SIZE, MINUS_ONE_OR_GT_ZERO);
-      
+
       started = true;
    }
-   
+
    public synchronized void stop() throws Exception
    {
       super.stop();
-      
+
       started = false;
    }
 
@@ -386,7 +413,7 @@
       for (int i = 0; i < paramsNodes.getLength(); i++)
       {
          Node paramNode = paramsNodes.item(i);
-         NamedNodeMap attributes =paramNode.getAttributes();
+         NamedNodeMap attributes = paramNode.getAttributes();
 
          Node nkey = attributes.getNamedItem("key");
 
@@ -405,7 +432,7 @@
       String name = e.getAttribute("name");
 
       String localAddress = getString(e, "local-bind-address", null, NO_CHECK);
-      
+
       int localBindPort = getInteger(e, "local-bind-port", -1, MINUS_ONE_OR_GT_ZERO);
 
       String groupAddress = getString(e, "group-address", null, NOT_NULL_OR_EMPTY);
@@ -487,12 +514,16 @@
 
       boolean duplicateDetection = getBoolean(e, "use-duplicate-detection", DEFAULT_CLUSTER_DUPLICATE_DETECTION);
 
-      boolean forwardWhenNoConsumers = getBoolean(e, "forward-when-no-consumers", DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS);
+      boolean forwardWhenNoConsumers = getBoolean(e,
+                                                  "forward-when-no-consumers",
+                                                  DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS);
 
       int maxHops = getInteger(e, "max-hops", DEFAULT_CLUSTER_MAX_HOPS, GE_ZERO);
 
-      long retryInterval = getLong(e, "retry-interval", (long)DEFAULT_CLUSTER_RETRY_INTERVAL, GT_ZERO);
+      long retryInterval = getLong(e, "retry-interval", DEFAULT_CLUSTER_RETRY_INTERVAL, GT_ZERO);
 
+      int confirmationWindowSize = getInteger(e, "confirmation-window-size", DEFAULT_CONFIRMATION_WINDOW_SIZE, GT_ZERO);
+      
       String discoveryGroupName = null;
 
       List<Pair<String, String>> connectorPairs = new ArrayList<Pair<String, String>>();
@@ -531,21 +562,23 @@
       if (discoveryGroupName == null)
       {
          config = new ClusterConnectionConfiguration(name,
-                                                     address,                                                   
-                                                     retryInterval,                                                     
+                                                     address,
+                                                     retryInterval,
                                                      duplicateDetection,
                                                      forwardWhenNoConsumers,
                                                      maxHops,
+                                                     confirmationWindowSize,
                                                      connectorPairs);
       }
       else
       {
          config = new ClusterConnectionConfiguration(name,
                                                      address,
-                                                     retryInterval,                                                     
+                                                     retryInterval,
                                                      duplicateDetection,
                                                      forwardWhenNoConsumers,
                                                      maxHops,
+                                                     confirmationWindowSize,
                                                      discoveryGroupName);
       }
 
@@ -556,15 +589,15 @@
    {
       String name = node.getAttribute("name");
       String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
-      String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
+      String address = getString(node, "address", null, NOT_NULL_OR_EMPTY);
       Integer timeout = getInteger(node, "timeout", GroupingHandlerConfiguration.DEFAULT_TIMEOUT, GT_ZERO);
       groupingHandlerConfiguration = new GroupingHandlerConfiguration(new SimpleString(name),
-                                  type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())? GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
-                                  new SimpleString(address),
-                                  timeout);
+                                                                      type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType()) ? GroupingHandlerConfiguration.TYPE.LOCAL
+                                                                                                                                    : GroupingHandlerConfiguration.TYPE.REMOTE,
+                                                                      new SimpleString(address),
+                                                                      timeout);
    }
 
-
    private void parseBridgeConfiguration(final Element brNode)
    {
       String name = brNode.getAttribute("name");
@@ -576,17 +609,28 @@
       String transformerClassName = getString(brNode, "transformer-class-name", null, NO_CHECK);
 
       long retryInterval = getLong(brNode, "retry-interval", DEFAULT_RETRY_INTERVAL, GT_ZERO);
-
-      double retryIntervalMultiplier = getDouble(brNode, "retry-interval-multiplier", DEFAULT_RETRY_INTERVAL_MULTIPLIER, GT_ZERO);
       
-      int reconnectAttempts = getInteger(brNode, "reconnect-attempts", DEFAULT_BRIDGE_RECONNECT_ATTEMPTS, MINUS_ONE_OR_GE_ZERO);
+      //Default bridge conf
+      int confirmationWindowSize = getInteger(brNode, "confirmation-window-size", DEFAULT_CONFIRMATION_WINDOW_SIZE, GT_ZERO);
+      
+      double retryIntervalMultiplier = getDouble(brNode,
+                                                 "retry-interval-multiplier",
+                                                 DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                 GT_ZERO);
 
-      boolean failoverOnServerShutdown = getBoolean(brNode, "failover-on-server-shutdown", ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
+      int reconnectAttempts = getInteger(brNode,
+                                         "reconnect-attempts",
+                                         DEFAULT_BRIDGE_RECONNECT_ATTEMPTS,
+                                         MINUS_ONE_OR_GE_ZERO);
 
+      boolean failoverOnServerShutdown = getBoolean(brNode,
+                                                    "failover-on-server-shutdown",
+                                                    ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
+
       boolean useDuplicateDetection = getBoolean(brNode, "use-duplicate-detection", DEFAULT_BRIDGE_DUPLICATE_DETECTION);
 
       String filterString = null;
-      
+
       Pair<String, String> connectorPair = null;
 
       String discoveryGroupName = null;
@@ -636,6 +680,7 @@
                                           reconnectAttempts,
                                           failoverOnServerShutdown,
                                           useDuplicateDetection,
+                                          confirmationWindowSize,
                                           connectorPair);
       }
       else
@@ -650,6 +695,7 @@
                                           reconnectAttempts,
                                           failoverOnServerShutdown,
                                           useDuplicateDetection,
+                                          confirmationWindowSize,
                                           discoveryGroupName);
       }
 

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -15,19 +15,17 @@
 
 import static org.hornetq.utils.DataConstants.SIZE_INT;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 
 /**
@@ -222,9 +220,9 @@
    }
 
    @Override
-   public synchronized int decrementRefCount(PagingStore pagingStore, MessageReference reference) throws Exception
+   public synchronized int decrementRefCount(MessageReference reference) throws Exception
    {
-      int currentRefCount = super.decrementRefCount(pagingStore, reference);
+      int currentRefCount = super.decrementRefCount(reference);
 
       // We use <= as this could be used by load.
       // because of a failure, no references were loaded, so we have 0... and we still need to delete the associated

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -245,7 +245,7 @@
                {
                   throw new IllegalArgumentException("ID is null");
                }
-               
+
                long id = props.getLongProperty(ManagementHelper.HDR_BINDING_ID);
 
                SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
@@ -254,7 +254,7 @@
                {
                   throw new IllegalArgumentException("Distance is null");
                }
-               
+
                int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
 
                QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, id, distance);
@@ -271,7 +271,7 @@
                {
                   throw new IllegalStateException("No cluster name");
                }
-               
+
                SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
 
                QueueInfo info = queueInfos.remove(clusterName);
@@ -291,7 +291,7 @@
                {
                   throw new IllegalStateException("No cluster name");
                }
-               
+
                SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
 
                SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
@@ -606,7 +606,7 @@
 
          cache.addToCache(duplicateIDBytes, context.getTransaction());
       }
-      
+
       setPagingStore(message);
 
       if (context.getTransaction() == null)
@@ -679,7 +679,7 @@
    public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
    {
       setPagingStore(message);
-      
+
       MessageReference reference = message.createReference(queue);
 
       if (message.containsProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME))
@@ -690,10 +690,8 @@
 
       message.incrementDurableRefCount();
 
-      PagingStore store = pagingManager.getPageStore(message.getDestination());
+      message.incrementRefCount(reference);
 
-      message.incrementRefCount(store, reference);
-
       if (tx == null)
       {
          queue.addLast(reference);
@@ -843,7 +841,7 @@
    {
       PagingStore store = pagingManager.getPageStore(message.getDestination());
 
-      message.setPagingStore(store);      
+      message.setPagingStore(store);
    }
 
    private void routeDirect(final ServerMessage message, final Queue queue, final boolean applyFilters) throws Exception
@@ -864,8 +862,6 @@
 
       Transaction tx = context.getTransaction();
 
-      PagingStore store = pagingManager.getPageStore(message.getDestination());
-
       for (Queue queue : context.getQueues())
       {
          MessageReference reference = message.createReference(queue);
@@ -918,7 +914,7 @@
             }
          }
 
-         message.incrementRefCount(store, reference);
+         message.incrementRefCount(reference);
       }
 
       if (tx != null)
@@ -1255,9 +1251,7 @@
                message.decrementDurableRefCount();
             }
 
-            PagingStore store = pagingManager.getPageStore(message.getDestination());
-
-            message.decrementRefCount(store, ref);
+            message.decrementRefCount(ref);
          }
       }
    }

Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -31,18 +31,16 @@
 
    MessageReference createReference(Queue queue);
 
-   int incrementRefCount(PagingStore pagingStore, MessageReference reference)
-      throws Exception;
+   int incrementRefCount(MessageReference reference) throws Exception;
 
-   int decrementRefCount(PagingStore pagingStore, MessageReference reference)
-      throws Exception;
-   
+   int decrementRefCount(MessageReference reference) throws Exception;
+
    int incrementDurableRefCount();
 
    int decrementDurableRefCount();
 
    ServerMessage copy(long newID) throws Exception;
-   
+
    ServerMessage copy() throws Exception;
 
    int getMemoryEstimate();
@@ -50,16 +48,16 @@
    int getRefCount();
 
    ServerMessage makeCopyForExpiryOrDLA(long newID, boolean expiry) throws Exception;
-   
-   void setOriginalHeaders(ServerMessage other, boolean expiry);   
-   
+
+   void setOriginalHeaders(ServerMessage other, boolean expiry);
+
    void setPagingStore(PagingStore store);
-   
+
    PagingStore getPagingStore();
-   
+
    boolean page(boolean duplicateDetection) throws Exception;
-   
+
    boolean page(long transactionID, boolean duplicateDetection) throws Exception;
-   
+
    boolean storeIsPaging();
 }

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -117,6 +117,8 @@
 
    private final boolean failoverOnServerShutdown;
 
+   private final int confirmationWindowSize;
+
    private final SimpleString idsHeaderName;
 
    private MessageFlowRecord flowRecord;
@@ -133,6 +135,8 @@
 
    private NotificationService notificationService;
 
+   private ClientConsumer notifConsumer;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -160,6 +164,7 @@
                      final int reconnectAttempts,
                      final boolean failoverOnServerShutdown,
                      final boolean useDuplicateDetection,
+                     final int confirmationWindowSize,
                      final SimpleString managementAddress,
                      final SimpleString managementNotificationAddress,
                      final String clusterUser,
@@ -184,6 +189,13 @@
 
       this.useDuplicateDetection = useDuplicateDetection;
 
+      if (!(confirmationWindowSize > 0))
+      {
+         throw new IllegalStateException("confirmation-window-size must be > 0 for a bridge");
+      }
+
+      this.confirmationWindowSize = confirmationWindowSize;
+
       this.discoveryAddress = discoveryAddress;
 
       this.discoveryPort = discoveryPort;
@@ -253,9 +265,11 @@
       }
 
       Queue queue = null;
+
       for (MessageReference ref2 : list)
       {
          queue = ref2.getQueue();
+
          queue.cancel(ref2);
       }
 
@@ -381,7 +395,7 @@
       {
          return HandleStatus.NO_MATCH;
       }
-      
+
       synchronized (this)
       {
          if (!active)
@@ -521,15 +535,15 @@
                {
                   active = false;
                }
-               
+
                cancelRefs();
             }
             else
             {
                setupNotificationConsumer();
-               
+
                active = true;
-               
+
                if (queue != null)
                {
                   queue.deliverAsync(executor);
@@ -543,8 +557,6 @@
       }
    }
 
-   private ClientConsumer notifConsumer;
-
    // TODO - we should move this code to the ClusterConnectorImpl - and just execute it when the bridge
    // connection is opened and closed - we can use
    // a callback to tell us that
@@ -552,7 +564,6 @@
    {
       if (flowRecord != null)
       {
-
          if (notifConsumer != null)
          {
             try
@@ -600,7 +611,7 @@
                                                 "%')");
 
          session.createQueue(managementNotificationAddress, notifQueueName, filter, false);
-         
+
          notifConsumer = session.createConsumer(notifQueueName);
 
          notifConsumer.setMessageHandler(flowRecord);
@@ -645,6 +656,10 @@
          csf.setReconnectAttempts(reconnectAttempts);
          csf.setBlockOnPersistentSend(false);
 
+         // Must have confirmations enabled so we get send acks
+
+         csf.setConfirmationWindowSize(confirmationWindowSize);
+
          // Session is pre-acknowledge
          session = (ClientSessionInternal)csf.createSession(clusterUser, clusterPassword, false, true, true, true, 1);
 
@@ -732,41 +747,4 @@
          }
       }
    }
-
-   // private class FailRunnable implements Runnable
-   // {
-   // public void run()
-   // {
-   // synchronized (BridgeImpl.this)
-   // {
-   // if (!started)
-   // {
-   // return;
-   // }
-   //
-   // active = false;
-   // }
-   //
-   // try
-   // {
-   // queue.removeConsumer(BridgeImpl.this);
-   //
-   // session.cleanUp();
-   //
-   // cancelRefs();
-   //
-   // csf.close();
-   // }
-   // catch (Exception e)
-   // {
-   // log.error("Failed to stop", e);
-   // }
-   //         
-   // if (!createObjects())
-   // {
-   // started = false;
-   // }
-   // }
-   // }
-
 }

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -81,6 +81,8 @@
    private final long retryInterval;
 
    private final boolean useDuplicateDetection;
+   
+   private final int confirmationWindowSize;
 
    private final boolean routeWhenNoConsumers;
 
@@ -108,6 +110,7 @@
                                 final long retryInterval,
                                 final boolean useDuplicateDetection,
                                 final boolean routeWhenNoConsumers,
+                                final int confirmationWindowSize,
                                 final org.hornetq.utils.ExecutorFactory executorFactory,
                                 final HornetQServer server,
                                 final PostOffice postOffice,
@@ -127,6 +130,8 @@
       this.useDuplicateDetection = useDuplicateDetection;
 
       this.routeWhenNoConsumers = routeWhenNoConsumers;
+      
+      this.confirmationWindowSize = confirmationWindowSize;
 
       this.executorFactory = executorFactory;
 
@@ -167,6 +172,7 @@
                                 final long retryInterval,
                                 final boolean useDuplicateDetection,
                                 final boolean routeWhenNoConsumers,
+                                final int confirmationWindowSize,
                                 final ExecutorFactory executorFactory,
                                 final HornetQServer server,
                                 final PostOffice postOffice,
@@ -198,6 +204,8 @@
       this.useDuplicateDetection = useDuplicateDetection;
 
       this.routeWhenNoConsumers = routeWhenNoConsumers;
+      
+      this.confirmationWindowSize = confirmationWindowSize;
 
       this.maxHops = maxHops;
 
@@ -221,12 +229,14 @@
       }
 
       started = true;
-      
+
       if (managementService != null)
       {
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(new SimpleString("name"), name);
-         Notification notification = new Notification(nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STARTED, props);
+         Notification notification = new Notification(nodeUUID.toString(),
+                                                      NotificationType.CLUSTER_CONNECTION_STARTED,
+                                                      props);
          managementService.sendNotification(notification);
       }
    }
@@ -258,10 +268,12 @@
       {
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(new SimpleString("name"), name);
-         Notification notification = new Notification(nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STOPPED, props);
+         Notification notification = new Notification(nodeUUID.toString(),
+                                                      NotificationType.CLUSTER_CONNECTION_STOPPED,
+                                                      props);
          managementService.sendNotification(notification);
       }
-      
+
       started = false;
    }
 
@@ -274,7 +286,7 @@
    {
       return name;
    }
-   
+
    public String getNodeID()
    {
       return nodeUUID.toString();
@@ -283,7 +295,7 @@
    public synchronized Map<String, String> getNodes()
    {
       Map<String, String> nodes = new HashMap<String, String>();
-      for (Entry<String, MessageFlowRecord> record : records.entrySet( ))
+      for (Entry<String, MessageFlowRecord> record : records.entrySet())
       {
          if (record.getValue().getBridge().getForwardingConnection() != null)
          {
@@ -292,7 +304,7 @@
       }
       return nodes;
    }
-   
+
    public synchronized void activate()
    {
       if (!started)
@@ -428,6 +440,7 @@
                                      -1,
                                      true,
                                      useDuplicateDetection,
+                                     confirmationWindowSize,
                                      managementService.getManagementAddress(),
                                      managementService.getManagementNotificationAddress(),
                                      managementService.getClusterUser(),
@@ -484,7 +497,7 @@
       {
          this.bridge = bridge;
       }
-      
+
       public Bridge getBridge()
       {
          return bridge;
@@ -570,7 +583,7 @@
          {
             throw new IllegalStateException("proposal type is null");
          }
-         
+
          SimpleString type = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID);
 
          SimpleString val = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
@@ -579,7 +592,7 @@
 
          Response response = server.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
 
-         if(response != null)
+         if (response != null)
          {
             server.getGroupingHandler().send(response, 0);
          }
@@ -634,7 +647,7 @@
          {
             throw new IllegalStateException("routingName is null");
          }
-         
+
          if (!message.containsProperty(ManagementHelper.HDR_BINDING_ID))
          {
             throw new IllegalStateException("queueID is null");
@@ -697,7 +710,7 @@
          {
             throw new IllegalStateException("clusterName is null");
          }
-         
+
          SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
 
          removeBinding(clusterName);
@@ -756,7 +769,7 @@
          {
             throw new IllegalStateException("distance is null");
          }
-         
+
          if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME))
          {
             throw new IllegalStateException("clusterName is null");
@@ -782,7 +795,7 @@
          // Need to propagate the consumer close
          Notification notification = new Notification(null, CONSUMER_CLOSED, message.getProperties());
 
-         managementService.sendNotification(notification);         
+         managementService.sendNotification(notification);
       }
 
    }
@@ -830,7 +843,7 @@
       theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
    }
 
-   //for testing only
+   // for testing only
    public Map<String, MessageFlowRecord> getRecords()
    {
       return records;

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -37,7 +37,6 @@
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.cluster.Bridge;
@@ -71,7 +70,7 @@
    private final Map<String, ClusterConnection> clusters = new HashMap<String, ClusterConnection>();
 
    private final org.hornetq.utils.ExecutorFactory executorFactory;
-   
+
    private final HornetQServer server;
 
    private final PostOffice postOffice;
@@ -83,9 +82,9 @@
    private final Configuration configuration;
 
    private final UUID nodeUUID;
-   
+
    private volatile boolean started;
-   
+
    private boolean backup;
 
    public ClusterManagerImpl(final org.hornetq.utils.ExecutorFactory executorFactory,
@@ -94,16 +93,16 @@
                              final ScheduledExecutorService scheduledExecutor,
                              final ManagementService managementService,
                              final Configuration configuration,
-                             final UUID nodeUUID,                            
+                             final UUID nodeUUID,
                              final boolean backup)
    {
       if (nodeUUID == null)
       {
          throw new IllegalArgumentException("Node uuid is null");
       }
-      
+
       this.executorFactory = executorFactory;
-      
+
       this.server = server;
 
       this.postOffice = postOffice;
@@ -115,7 +114,7 @@
       this.configuration = configuration;
 
       this.nodeUUID = nodeUUID;
-      
+
       this.backup = backup;
    }
 
@@ -203,34 +202,34 @@
    {
       return new HashSet<ClusterConnection>(clusters.values());
    }
-   
+
    public Set<BroadcastGroup> getBroadcastGroups()
    {
       return new HashSet<BroadcastGroup>(broadcastGroups.values());
    }
-   
+
    public ClusterConnection getClusterConnection(final SimpleString name)
    {
-      return clusters.get(name.toString()); 
+      return clusters.get(name.toString());
    }
-   
+
    public synchronized void activate()
-   {      
-      for (BroadcastGroup bg: broadcastGroups.values())
+   {
+      for (BroadcastGroup bg : broadcastGroups.values())
       {
          bg.activate();
       }
-      
-      for (Bridge bridge: bridges.values())
+
+      for (Bridge bridge : bridges.values())
       {
          bridge.activate();
       }
-      
-      for (ClusterConnection cc: clusters.values())
+
+      for (ClusterConnection cc : clusters.values())
       {
          cc.activate();
       }
-      
+
       backup = false;
    }
 
@@ -385,14 +384,16 @@
 
       if (config.getDiscoveryGroupName() != null)
       {
-         DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
+         DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
+                                                                                .get(config.getDiscoveryGroupName());
          if (discoveryGroupConfiguration == null)
          {
-            log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() + "'. The bridge will not be deployed.");
+            log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() +
+                     "'. The bridge will not be deployed.");
 
             return;
          }
-         
+
          bridge = new BridgeImpl(nodeUUID,
                                  new SimpleString(config.getName()),
                                  queue,
@@ -409,11 +410,12 @@
                                  config.getReconnectAttempts(),
                                  config.isFailoverOnServerShutdown(),
                                  config.isUseDuplicateDetection(),
+                                 config.getConfirmationWindowSize(),
                                  managementService.getManagementAddress(),
                                  managementService.getManagementNotificationAddress(),
                                  managementService.getClusterUser(),
                                  managementService.getClusterPassword(),
-                                 null,                        
+                                 null,
                                  !backup,
                                  server.getStorageManager());
       }
@@ -461,11 +463,12 @@
                                  config.getReconnectAttempts(),
                                  config.isFailoverOnServerShutdown(),
                                  config.isUseDuplicateDetection(),
+                                 config.getConfirmationWindowSize(),
                                  managementService.getManagementAddress(),
                                  managementService.getManagementNotificationAddress(),
                                  managementService.getClusterUser(),
                                  managementService.getClusterPassword(),
-                                 null,                        
+                                 null,
                                  !backup,
                                  server.getStorageManager());
       }
@@ -534,17 +537,18 @@
 
          clusterConnection = new ClusterConnectionImpl(new SimpleString(config.getName()),
                                                        new SimpleString(config.getAddress()),
-                                                       config.getRetryInterval(),                                                       
+                                                       config.getRetryInterval(),
                                                        config.isDuplicateDetection(),
                                                        config.isForwardWhenNoConsumers(),
+                                                       config.getConfirmationWindowSize(),
                                                        executorFactory,
-                                                       server,                                         
+                                                       server,
                                                        postOffice,
                                                        managementService,
-                                                       scheduledExecutor,                                            
+                                                       scheduledExecutor,
                                                        connectors,
                                                        config.getMaxHops(),
-                                                       nodeUUID,                                                       
+                                                       nodeUUID,
                                                        backup);
       }
       else
@@ -559,17 +563,18 @@
 
          clusterConnection = new ClusterConnectionImpl(new SimpleString(config.getName()),
                                                        new SimpleString(config.getAddress()),
-                                                       config.getRetryInterval(),                                                      
+                                                       config.getRetryInterval(),
                                                        config.isDuplicateDetection(),
                                                        config.isForwardWhenNoConsumers(),
+                                                       config.getConfirmationWindowSize(),
                                                        executorFactory,
-                                                       server,                                             
+                                                       server,
                                                        postOffice,
                                                        managementService,
-                                                       scheduledExecutor,                                               
+                                                       scheduledExecutor,
                                                        dg,
                                                        config.getMaxHops(),
-                                                       nodeUUID,                                                      
+                                                       nodeUUID,
                                                        backup);
       }
 

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -36,8 +36,6 @@
 import org.hornetq.core.list.impl.PriorityLinkedListImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
@@ -51,7 +49,6 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.cluster.impl.Redistributor;
 import org.hornetq.core.settings.HierarchicalRepository;
-import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
@@ -111,12 +108,8 @@
 
    private final Runnable deliverRunner = new DeliverRunner();
 
-   private final PagingManager pagingManager;
-
    private final Semaphore lock = new Semaphore(1);
 
-   private volatile PagingStore pagingStore;
-
    private final StorageManager storageManager;
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
@@ -172,15 +165,6 @@
 
       this.scheduledExecutor = scheduledExecutor;
 
-      if (postOffice == null)
-      {
-         pagingManager = null;
-      }
-      else
-      {
-         pagingManager = postOffice.getPagingManager();
-      }
-
       direct = true;
 
       scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
@@ -1091,12 +1075,14 @@
          if (reference == null)
          {
             nullReferences.add(consumer);
+
             if (nullReferences.size() + busyConsumers.size() == totalConsumers)
             {
-               startDepaging();
                // We delivered all the messages - go into direct delivery
                direct = true;
+
                promptDelivery = false;
+
                return;
             }
 
@@ -1133,8 +1119,6 @@
             }
          }
 
-         initPagingStore(reference.getMessage().getDestination());
-
          final SimpleString groupID = reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
 
          if (groupID != null)
@@ -1407,19 +1391,7 @@
 
       queue.deliveringCount.decrementAndGet();
 
-      PagingStore store;
-      if (pagingManager != null)
-      {
-         // TODO: We could optimize this by storing the paging-store for the address on the Queue. We would need to know
-         // the Address for the Queue
-         store = pagingManager.getPageStore(ref.getMessage().getDestination());
-      }
-      else
-      {
-         store = null;
-      }
-
-      message.decrementRefCount(store, ref);
+      message.decrementRefCount(ref);
    }
 
    void postRollback(final LinkedList<MessageReference> refs) throws Exception
@@ -1437,42 +1409,6 @@
       }
    }
 
-   private synchronized void initPagingStore(final SimpleString destination)
-   {
-      // PagingManager would be null only on testcases
-      if (pagingStore == null && pagingManager != null)
-      {
-         // TODO: It would be better if we could initialize the pagingStore during the construction
-         try
-         {
-            pagingStore = pagingManager.getPageStore(destination);
-         }
-         catch (Exception e)
-         {
-            // This shouldn't happen, and if it happens, this shouldn't abort the route
-         }
-      }
-   }
-
-   private synchronized void startDepaging()
-   {
-      if (pagingStore != null)
-      {
-         // If the queue is empty, we need to check if there are pending messages, and throw a warning
-         if (pagingStore.isPaging() && pagingStore.getAddressFullMessagePolicy() == AddressFullMessagePolicy.PAGE)
-         {
-            // This is just a *request* to depage. Depage will only happens if there is space on the Address
-            // and GlobalSize
-            pagingStore.startDepaging();
-
-            log.warn("The Queue " + name +
-                     " is empty, however there are pending messages on Paging for the address " +
-                     pagingStore.getStoreName() +
-                     " waiting message ACK before they could be routed");
-         }
-      }
-   }
-
    // Inner classes
    // --------------------------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -102,7 +102,7 @@
       return ref;
    }
 
-   public int incrementRefCount(final PagingStore pagingStore, final MessageReference reference) throws Exception
+   public int incrementRefCount(final MessageReference reference) throws Exception
    {
       int count = refCount.incrementAndGet();
 
@@ -119,7 +119,7 @@
       return count;
    }
 
-   public int decrementRefCount(final PagingStore pagingStore, final MessageReference reference) throws Exception
+   public int decrementRefCount(final MessageReference reference) throws Exception
    {
       int count = refCount.decrementAndGet();
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -387,7 +387,7 @@
 
          if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
          {
-            throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "Binding " + name + " does not exist");
+            throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "Queue " + name + " does not exist");
          }
 
          securityStore.check(binding.getAddress(), CheckType.CONSUME, this);

Modified: trunk/src/main/org/hornetq/utils/XMLUtil.java
===================================================================
--- trunk/src/main/org/hornetq/utils/XMLUtil.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/utils/XMLUtil.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -493,6 +493,8 @@
       }
       catch (SAXException e)
       {
+         log.error("Invalid configuration", e);
+         
          throw new IllegalStateException("Invalid configuration", e);
       }
    }

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -93,6 +93,7 @@
       final long retryInterval = 50;
       final double retryIntervalMultiplier = 1d;
       final int reconnectAttempts = 1;
+      final int confirmationWindowSize = 1024;
 
       Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), server2tc.getName());
 
@@ -106,6 +107,7 @@
                                                                         reconnectAttempts,
                                                                         true,
                                                                         false,
+                                                                        confirmationWindowSize,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -219,6 +221,7 @@
       final long retryInterval = 50;
       final double retryIntervalMultiplier = 1d;
       final int reconnectAttempts = 3;
+      final int confirmationWindowSize = 1024;
 
       Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), server2tc.getName());
 
@@ -232,6 +235,7 @@
                                                                         reconnectAttempts,
                                                                         true,
                                                                         false,
+                                                                        confirmationWindowSize,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -338,6 +342,7 @@
       final long retryInterval = 50;
       final double retryIntervalMultiplier = 1d;
       final int reconnectAttempts = 3;
+      final int confirmationWindowSize = 1024;
 
       Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
 
@@ -351,6 +356,7 @@
                                                                         reconnectAttempts,
                                                                         true,
                                                                         false,
+                                                                        confirmationWindowSize,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -452,6 +458,7 @@
       final long retryInterval = 50;
       final double retryIntervalMultiplier = 1d;
       final int reconnectAttempts = -1;
+      final int confirmationWindowSize = 1024;
 
       Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
 
@@ -465,6 +472,7 @@
                                                                         reconnectAttempts,
                                                                         true,
                                                                         false,
+                                                                        confirmationWindowSize,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -513,7 +521,7 @@
 
          prod0.send(message);
       }
-      
+
       log.info("sent messages");
 
       for (int i = 0; i < numMessages; i++)
@@ -523,7 +531,7 @@
          assertEquals(i, r1.getObjectProperty(propKey));
          log.info("got message " + r1.getObjectProperty(propKey));
       }
-      
+
       log.info("got messages");
 
       session0.close();
@@ -566,6 +574,7 @@
       final long retryInterval = 50;
       final double retryIntervalMultiplier = 1d;
       final int reconnectAttempts = 3;
+      final int confirmationWindowSize = 1024;
 
       Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
 
@@ -579,6 +588,7 @@
                                                                         reconnectAttempts,
                                                                         true,
                                                                         false,
+                                                                        confirmationWindowSize,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -86,6 +86,7 @@
                                                                         0,
                                                                         true,
                                                                         true,
+                                                                        1024,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -229,6 +230,7 @@
                                                                         -1,
                                                                         true,
                                                                         true,
+                                                                        1024,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -419,6 +421,7 @@
                                                                         0,
                                                                         false,
                                                                         false,
+                                                                        1024,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -547,6 +550,7 @@
                                                                         1,
                                                                         false,
                                                                         true,
+                                                                        1024,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -76,7 +76,6 @@
 
       try
       {
-
          Map<String, Object> server0Params = new HashMap<String, Object>();
          server0 = createClusteredServerWithParams(0, useFiles, server0Params);
 
@@ -101,6 +100,10 @@
 
          Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
 
+         final int messageSize = 1024;
+
+         final int numMessages = 10;
+
          BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
                                                                            queueName0,
                                                                            forwardAddress,
@@ -111,6 +114,9 @@
                                                                            -1,
                                                                            true,
                                                                            false,
+                                                                           // Choose confirmation size to make sure acks
+                                                                           // are sent
+                                                                           numMessages * messageSize / 2,
                                                                            connectorPair);
 
          List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -144,7 +150,7 @@
 
          session1.start();
 
-         final int numMessages = 10;
+         final byte[] bytes = new byte[messageSize];
 
          final SimpleString propKey = new SimpleString("testkey");
 
@@ -159,6 +165,8 @@
 
             message.putIntProperty(propKey, i);
 
+            message.getBody().writeBytes(bytes);
+
             producer0.send(message);
          }
 
@@ -286,6 +294,7 @@
                                                                            -1,
                                                                            true,
                                                                            false,
+                                                                           1024,
                                                                            connectorPair);
 
          List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -332,7 +341,7 @@
             message.putIntProperty(propKey, i);
 
             message.putStringProperty(selectorKey, new SimpleString("monkey"));
-            
+
             if (largeMessage)
             {
                message.setBodyInputStream(createFakeLargeStream(1024 * 1024));
@@ -368,7 +377,7 @@
             assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
 
             message.acknowledge();
-            
+
             if (largeMessage)
             {
                readMessages(message);
@@ -407,7 +416,7 @@
       }
 
    }
-   
+
    public void testWithTransformer() throws Exception
    {
       internaltestWithTransformer(false);
@@ -453,6 +462,7 @@
                                                                         -1,
                                                                         true,
                                                                         false,
+                                                                        1024,
                                                                         connectorPair);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -497,7 +507,7 @@
          message.putStringProperty(propKey, new SimpleString("bing"));
 
          message.getBody().writeString("doo be doo be doo be doo");
-         
+
          producer0.send(message);
       }
 
@@ -516,8 +526,7 @@
          assertEquals("dee be dee be dee be dee", sval);
 
          message.acknowledge();
-         
-         
+
       }
 
       assertNull(consumer1.receiveImmediate());
@@ -534,10 +543,8 @@
 
       server1.stop();
    }
-   
 
-   // https://jira.jboss.org/jira/browse/HORNETQ-182
-   public void disabled_testBridgeWithPaging() throws Exception
+   public void testBridgeWithPaging() throws Exception
    {
       HornetQServer server0 = null;
       HornetQServer server1 = null;
@@ -583,6 +590,7 @@
                                                                            -1,
                                                                            true,
                                                                            false,
+                                                                           1024,
                                                                            connectorPair);
 
          List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -623,7 +631,7 @@
          for (int i = 0; i < numMessages; i++)
          {
             ClientMessage message = session0.createClientMessage(false);
-            
+
             message.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
 
             message.putIntProperty(propKey, i);
@@ -674,13 +682,12 @@
 
    }
 
-   
    protected void setUp() throws Exception
    {
       super.setUp();
       clearData();
    }
-   
+
    protected void tearDown() throws Exception
    {
       clearData();

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -99,6 +99,7 @@
                                                                         0,
                                                                         true,
                                                                         true,
+                                                                        1024,
                                                                         "dg1");
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -1407,6 +1407,7 @@
                                                                                       true,
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
+                                                                                      1024,
                                                                                       pairs);
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
@@ -1458,6 +1459,7 @@
                                                                                       true,
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
+                                                                                      1024,
                                                                                       pairs);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
@@ -1526,6 +1528,7 @@
                                                                                       true,
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
+                                                                                      1024,
                                                                                       pairs);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
@@ -1552,6 +1555,7 @@
                                                                                       true,
                                                                                       forwardWhenNoConsumers,
                                                                                       maxHops,
+                                                                                      1024,
                                                                                       discoveryGroupName);
       List<ClusterConnectionConfiguration> clusterConfs = server.getConfiguration().getClusterConfigurations();
 

Modified: trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -162,6 +162,7 @@
                                              randomPositiveInt(),
                                              randomBoolean(),
                                              randomBoolean(),
+                                             randomPositiveInt(),
                                              connectorPair);
 
       Configuration conf_1 = new ConfigurationImpl();

Modified: trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -61,7 +61,7 @@
    private BridgeConfiguration bridgeConfig;
 
    private HornetQServer server_1;
-   
+
    private ClientSession session;
 
    // Constructors --------------------------------------------------
@@ -79,10 +79,14 @@
       assertEquals(bridgeConfig.getForwardingAddress(), (String)proxy.retrieveAttributeValue("forwardingAddress"));
       assertEquals(bridgeConfig.getFilterString(), (String)proxy.retrieveAttributeValue("filterString"));
       assertEquals(bridgeConfig.getRetryInterval(), ((Long)proxy.retrieveAttributeValue("retryInterval")).longValue());
-      assertEquals(bridgeConfig.getRetryIntervalMultiplier(), (Double)proxy.retrieveAttributeValue("retryIntervalMultiplier"));
-      assertEquals(bridgeConfig.getReconnectAttempts(), ((Integer)proxy.retrieveAttributeValue("reconnectAttempts")).intValue());
-      assertEquals(bridgeConfig.isFailoverOnServerShutdown(), ((Boolean)proxy.retrieveAttributeValue("failoverOnServerShutdown")).booleanValue());
-      assertEquals(bridgeConfig.isUseDuplicateDetection(), ((Boolean)proxy.retrieveAttributeValue("useDuplicateDetection")).booleanValue());
+      assertEquals(bridgeConfig.getRetryIntervalMultiplier(),
+                   (Double)proxy.retrieveAttributeValue("retryIntervalMultiplier"));
+      assertEquals(bridgeConfig.getReconnectAttempts(),
+                   ((Integer)proxy.retrieveAttributeValue("reconnectAttempts")).intValue());
+      assertEquals(bridgeConfig.isFailoverOnServerShutdown(),
+                   ((Boolean)proxy.retrieveAttributeValue("failoverOnServerShutdown")).booleanValue());
+      assertEquals(bridgeConfig.isUseDuplicateDetection(),
+                   ((Boolean)proxy.retrieveAttributeValue("useDuplicateDetection")).booleanValue());
 
       Object[] data = (Object[])proxy.retrieveAttributeValue("connectorPair");
       assertEquals(bridgeConfig.getConnectorPair().a, data[0]);
@@ -99,10 +103,10 @@
       // started by the server
       assertTrue((Boolean)proxy.retrieveAttributeValue("Started"));
 
-      proxy.invokeOperation("stop");      
+      proxy.invokeOperation("stop");
       assertFalse((Boolean)proxy.retrieveAttributeValue("Started"));
 
-      proxy.invokeOperation("start");      
+      proxy.invokeOperation("start");
       assertTrue((Boolean)proxy.retrieveAttributeValue("Started"));
    }
 
@@ -138,6 +142,7 @@
                                              randomPositiveInt(),
                                              randomBoolean(),
                                              randomBoolean(),
+                                             randomPositiveInt(),
                                              connectorPair);
 
       Configuration conf_1 = new ConfigurationImpl();
@@ -175,20 +180,18 @@
       server_1.stop();
 
       session = null;
-      
+
       server_0 = null;
-      
+
       server_1 = null;
-      
-      
+
       super.tearDown();
    }
-   
+
    protected CoreMessagingProxy createProxy(final String name) throws Exception
    {
-      CoreMessagingProxy proxy = new CoreMessagingProxy(session,
-                                                       ResourceNames.CORE_BRIDGE + name);
-      
+      CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_BRIDGE + name);
+
       return proxy;
    }
 

Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -136,6 +136,7 @@
                                                                    false,
                                                                    false,
                                                                    1,
+                                                                   1024,
                                                                    discoveryName);
       List<Pair<String, String>> connectorInfos = new ArrayList<Pair<String, String>>();
       connectorInfos.add(new Pair<String, String>("netty", null));

Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -204,6 +204,7 @@
                                                                    randomBoolean(),
                                                                    randomBoolean(),
                                                                    randomPositiveInt(),
+                                                                   randomPositiveInt(),
                                                                    pairs);
       
       clusterConnectionConfig2 = new ClusterConnectionConfiguration(randomString(),
@@ -212,6 +213,7 @@
                                                                     randomBoolean(),
                                                                     randomBoolean(),
                                                                     randomPositiveInt(),
+                                                                    randomPositiveInt(),
                                                                     randomString());
 
       Configuration conf_1 = new ConfigurationImpl();

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-10 20:03:11 UTC (rev 8256)
@@ -1125,6 +1125,18 @@
          return null;
       }
 
+      public int decrementRefCount(MessageReference reference) throws Exception
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
+      public int incrementRefCount(MessageReference reference) throws Exception
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
    }
 
    class FakeFilter implements Filter



More information about the hornetq-commits mailing list