[jboss-cvs] JBoss Messaging SVN: r1892 - in trunk: src/etc src/etc/META-INF src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory tests tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/tools tests/src/org/jboss/test/messaging/tools/jmx/rmi

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 4 17:02:26 EST 2007


Author: clebert.suconic at jboss.com
Date: 2007-01-04 17:02:16 -0500 (Thu, 04 Jan 2007)
New Revision: 1892

Added:
   trunk/src/etc/META-INF/
   trunk/src/etc/META-INF/multiplexer-stacks.xml
   trunk/src/etc/server/default/deploy/multiplexer-service.xml
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/ChannelFactory.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/MultiplexorChannelFactory.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/NameChannelFactory.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/XMLChannelFactory.java
Modified:
   trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
   trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
   trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-578 - I have created an intrerface ChannelFactory to delegate creationg of JGroups channels according to configuration options.

Our configuration files will aways have a reference to the multiplexor but if the multiplexor is not available we will create JChannel as we always used to do (using XML elements on the constructor).

Added: trunk/src/etc/META-INF/multiplexer-stacks.xml
===================================================================
--- trunk/src/etc/META-INF/multiplexer-stacks.xml	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/etc/META-INF/multiplexer-stacks.xml	2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,330 @@
+
+<!--
+  Sample file that defines a number of stacks, used by the multiplexer
+  Author: Bela Ban
+  Version: $Id: multiplexer-stacks.xml 56961 2006-09-19 03:55:11 +0000 (Tue, 19 Sep 2006) bstansberry at jboss.com $
+-->
+<protocol_stacks>
+    <stack name="udp"
+           description="Default: IP multicast based stack, with flow control and message bundling">
+        <config>
+          <UDP
+             mcast_port="${jgroups.udp.mcast_port:45688}"
+             mcast_addr="${jgroups.udp.mcast_addr:228.11.11.11}"
+             tos="8"
+             ucast_recv_buf_size="20000000"
+             ucast_send_buf_size="640000"
+             mcast_recv_buf_size="25000000"
+             mcast_send_buf_size="640000"
+             loopback="false"
+             discard_incompatible_packets="true"
+             max_bundle_size="64000"
+             max_bundle_timeout="30"
+             use_incoming_packet_handler="true"
+             use_outgoing_packet_handler="false"
+             ip_ttl="${jgroups.udp.ip_ttl:2}"
+             down_thread="false" up_thread="false"
+             enable_bundling="true"/>
+          <PING timeout="2000"
+             down_thread="false" up_thread="false" num_initial_members="3"/>
+          <MERGE2 max_interval="100000"
+             down_thread="false" up_thread="false" min_interval="20000"/>
+          <FD_SOCK down_thread="false" up_thread="false"/>
+          <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+          <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+          <pbcast.NAKACK max_xmit_size="60000"
+                   use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   down_thread="false" up_thread="false"
+                   discard_delivered_msgs="true"/>
+          <UNICAST timeout="300,600,1200,2400,3600"
+             down_thread="false" up_thread="false"/>
+          <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   down_thread="false" up_thread="false"
+                   max_bytes="400000"/>
+          <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                   down_thread="false" up_thread="false"
+                   join_retry_timeout="2000" shun="true"
+                   view_bundling="true"/>
+          <FC max_credits="2000000" down_thread="false" up_thread="false"
+              min_threshold="0.10"/>
+          <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+          <!-- pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+                   use_flush="true" use_reading_thread="true"/ -->
+          <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+          <pbcast.FLUSH down_thread="false" up_thread="false"/>
+        </config>
+    </stack>
+
+
+    <stack name="udp-sync"
+           description="IP multicast based stack, without flow control and without message bundling. This should be used
+           instead of udp if (1) synchronous calls are used and (2) the message volume (rate and size)
+            is not that large. Don't use this configuration if you send messages at a high sustained rate, or you might
+            run out of memory">
+        <config>
+            <UDP
+                 mcast_port="${jgroups.udp.mcast_port:45699}"
+                 mcast_addr="${jgroups.udp.mcast_addr:229.11.11.11}"
+                 tos="8"
+                 ucast_recv_buf_size="20000000"
+                 ucast_send_buf_size="640000"
+                 mcast_recv_buf_size="25000000"
+                 mcast_send_buf_size="640000"
+                 loopback="false"
+                 discard_incompatible_packets="true"
+                 max_bundle_size="64000"
+                 max_bundle_timeout="30"
+                 use_incoming_packet_handler="true"
+                 use_outgoing_packet_handler="false"
+                 ip_ttl="${jgroups.udp.ip_ttl:2}"
+                 down_thread="false" up_thread="false"
+                 enable_bundling="false"/>
+            <PING timeout="2000"
+                  down_thread="false" up_thread="false" num_initial_members="3"/>
+            <MERGE2 max_interval="100000"
+                    down_thread="false" up_thread="false" min_interval="20000"/>
+            <FD_SOCK down_thread="false" up_thread="false"/>
+            <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+            <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+            <pbcast.NAKACK max_xmit_size="60000"
+                    use_mcast_xmit="false" gc_lag="0"
+                    retransmit_timeout="300,600,1200,2400,4800"
+                    down_thread="false" up_thread="false"
+                    discard_delivered_msgs="true"/>
+            <UNICAST timeout="300,600,1200,2400,3600"
+                    down_thread="false" up_thread="false"/>
+            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                    down_thread="false" up_thread="false"
+                    max_bytes="400000"/>
+            <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                    down_thread="false" up_thread="false"
+                    join_retry_timeout="2000" shun="true"
+                    view_bundling="true"/>
+            <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+            <!--pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+                    use_flush="true" use_reading_thread="true"/ -->
+            <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+            <pbcast.FLUSH down_thread="false" up_thread="false"/>
+        </config>
+    </stack>
+
+
+    <stack name="tcp"
+           description="TCP based stack, with flow control and message bundling. This is usually used when IP
+           multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast)">
+        <config>
+            <TCP start_port="7600"
+                 loopback="true"
+                 recv_buf_size="20000000"
+                 send_buf_size="640000"
+                 discard_incompatible_packets="true"
+                 max_bundle_size="64000"
+                 max_bundle_timeout="30"
+                 use_incoming_packet_handler="true"
+                 use_outgoing_packet_handler="false"
+                 down_thread="false" up_thread="false"
+                 enable_bundling="true"
+                 use_send_queues="false"
+                 sock_conn_timeout="300"
+                 skip_suspected_members="true"/>
+            <TCPPING timeout="3000"
+                     down_thread="false" up_thread="false"
+                     initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7600],localhost[7601]}"
+                     port_range="1"
+                     num_initial_members="3"/>
+            <MERGE2 max_interval="100000"
+                    down_thread="false" up_thread="false" min_interval="20000"/>
+            <FD_SOCK down_thread="false" up_thread="false"/>
+            <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+            <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+            <pbcast.NAKACK max_xmit_size="60000"
+                           use_mcast_xmit="false" gc_lag="0"
+                           retransmit_timeout="300,600,1200,2400,4800"
+                           down_thread="false" up_thread="false"
+                           discard_delivered_msgs="true"/>
+            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                           down_thread="false" up_thread="false"
+                           max_bytes="400000"/>
+            <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                        down_thread="false" up_thread="false"
+                        join_retry_timeout="2000" shun="true"
+                        view_bundling="true"/>
+            <FC max_credits="2000000" down_thread="false" up_thread="false"
+                min_threshold="0.10"/>
+            <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+            <!-- pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+                      use_flush="true" use_reading_thread="true"/ -->
+            <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+            <pbcast.FLUSH down_thread="false" up_thread="false"/>
+        </config>
+    </stack>
+
+
+    <stack name="tcp-sync"
+           description="TCP based stack, without flow control and without message bundling. This is usually used when IP
+           multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast). This
+           configuration should be used instead of tcp when (1) synchronous calls are used and (2) the message volume
+           (rate and size) is not that large">
+        <config>
+            <TCP start_port="7650"
+                 loopback="true"
+                 recv_buf_size="20000000"
+                 send_buf_size="640000"
+                 discard_incompatible_packets="true"
+                 max_bundle_size="64000"
+                 max_bundle_timeout="30"
+                 use_incoming_packet_handler="true"
+                 use_outgoing_packet_handler="false"
+                 down_thread="false" up_thread="false"
+                 enable_bundling="false"
+                 use_send_queues="false"
+                 sock_conn_timeout="300"
+                 skip_suspected_members="true"/>
+            <TCPPING timeout="3000"
+                     down_thread="false" up_thread="false"
+                     initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7650],localhost[7651]}"
+                     port_range="1"
+                     num_initial_members="3"/>
+            <MERGE2 max_interval="100000"
+                    down_thread="false" up_thread="false" min_interval="20000"/>
+            <FD_SOCK down_thread="false" up_thread="false"/>
+            <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+            <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+            <pbcast.NAKACK max_xmit_size="60000"
+                           use_mcast_xmit="false" gc_lag="0"
+                           retransmit_timeout="300,600,1200,2400,4800"
+                           down_thread="false" up_thread="false"
+                           discard_delivered_msgs="true"/>
+            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                           down_thread="false" up_thread="false"
+                           max_bytes="400000"/>
+            <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                        down_thread="false" up_thread="false"
+                        join_retry_timeout="2000" shun="true"
+                        view_bundling="true"/>
+            <!-- pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+                         use_flush="true" use_reading_thread="true"/ -->
+            <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+            <pbcast.FLUSH down_thread="false" up_thread="false"/>
+        </config>
+    </stack>
+
+
+    <stack name="tcp_nio"
+           description="TCP_NIO based stack, with flow control and message bundling. This is usually used when IP
+           multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast)">
+        <config>
+            <TCP_NIO
+                   recv_buf_size="20000000"
+                   send_buf_size="640000"
+                   loopback="false"
+                   discard_incompatible_packets="true"
+                   max_bundle_size="64000"
+                   max_bundle_timeout="30"
+                   use_incoming_packet_handler="true"
+                   use_outgoing_packet_handler="false"
+                   down_thread="false" up_thread="false"
+                   enable_bundling="true"
+                   start_port="7700"
+                   use_send_queues="false"
+                   sock_conn_timeout="300" skip_suspected_members="true"
+                   reader_threads="8"
+                   writer_threads="8"
+                   processor_threads="8"
+                   processor_minThreads="8"
+                   processor_maxThreads="8"
+                   processor_queueSize="100"
+                   processor_keepAliveTime="-1"/>
+            <TCPPING timeout="3000"
+                     initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7700],localhost[7701]}"
+                     port_range="1"
+                     num_initial_members="3"
+                     down_thread="false" up_thread="false"/>
+            <MERGE2 max_interval="100000"
+                  down_thread="false" up_thread="false" min_interval="20000"/>
+            <FD_SOCK down_thread="false" up_thread="false"/>
+            <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+            <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+            <pbcast.NAKACK max_xmit_size="60000"
+                   use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   down_thread="false" up_thread="false"
+                   discard_delivered_msgs="true"/>
+            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   down_thread="false" up_thread="false"
+                   max_bytes="400000"/>
+            <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                   down_thread="false" up_thread="false"
+                   join_retry_timeout="2000" shun="true"
+                   view_bundling="true"/>
+            <FC max_credits="2000000" down_thread="false" up_thread="false"
+                min_threshold="0.10"/>
+            <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+            <!-- pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+                   use_flush="true" use_reading_thread="true"/ -->
+            <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+            <pbcast.FLUSH down_thread="false" up_thread="false"/>
+        </config>
+    </stack>
+
+
+    <stack name="tcp_nio-sync"
+           description="TCP_NIO based stack, with flow control and message bundling. This is usually used when IP
+           multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast). This
+           configuration should be used instead of tcp when (1) synchronous calls are used and (2) the message volume
+           (rate and size) is not that large">
+        <config>
+            <TCP_NIO
+                     recv_buf_size="20000000"
+                     send_buf_size="640000"
+                     loopback="false"
+                     discard_incompatible_packets="true"
+                     max_bundle_size="64000"
+                     max_bundle_timeout="30"
+                     use_incoming_packet_handler="true"
+                     use_outgoing_packet_handler="false"
+                     down_thread="false" up_thread="false"
+                     enable_bundling="false"
+                     start_port="7750"
+                     use_send_queues="false"
+                     sock_conn_timeout="300" skip_suspected_members="true"
+                     reader_threads="8"
+                     writer_threads="8"
+                     processor_threads="8"
+                     processor_minThreads="8"
+                     processor_maxThreads="8"
+                     processor_queueSize="100"
+                     processor_keepAliveTime="-1"/>
+            <TCPPING timeout="3000"
+                     initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7750],localhost[7751]}"
+                     port_range="1"
+                     num_initial_members="3"
+                     down_thread="false" up_thread="false"/>
+            <MERGE2 max_interval="100000"
+                    down_thread="false" up_thread="false" min_interval="20000"/>
+            <FD_SOCK down_thread="false" up_thread="false"/>
+            <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+            <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+            <pbcast.NAKACK max_xmit_size="60000"
+                        use_mcast_xmit="false" gc_lag="0"
+                        retransmit_timeout="300,600,1200,2400,4800"
+                        down_thread="false" up_thread="false"
+                        discard_delivered_msgs="true"/>
+            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                        down_thread="false" up_thread="false"
+                        max_bytes="400000"/>
+            <pbcast.GMS print_local_addr="true" join_timeout="3000"
+                        down_thread="false" up_thread="false"
+                        join_retry_timeout="2000" shun="true"
+                        view_bundling="true"/>
+            <!-- pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+                        use_flush="true" use_reading_thread="true"/ -->
+            <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+            <pbcast.FLUSH down_thread="false" up_thread="false"/>
+        </config>
+    </stack>
+
+</protocol_stacks>
+
+

Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2007-01-04 22:02:16 UTC (rev 1892)
@@ -82,6 +82,12 @@
       <attribute name="MessagePullPolicy">org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy</attribute>
       <attribute name="ClusterRouterFactory">org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory</attribute>
 
+
+      <attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>
+      <attribute name="SyncChannelName">udp-sync</attribute>
+      <attribute name="AsyncChannelName">udp</attribute>
+      <attribute name="ChannelPartitionName">${jboss.partition.name:DefaultPartition}-JMS</attribute>
+
       <attribute name="AsyncChannelConfig">
          <config>
             <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"

Added: trunk/src/etc/server/default/deploy/multiplexer-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/multiplexer-service.xml	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/etc/server/default/deploy/multiplexer-service.xml	2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE server
+        PUBLIC "-//JBoss//DTD MBean Service 3.2//EN"
+        "http://www.jboss.org/j2ee/dtd/jboss-service_3_2.dtd">
+
+
+<server>
+
+
+    <!--
+    An example for setting up the JChannelFactory MBean
+    Author: Bela Ban
+    Version: $Id$
+    -->
+    <mbean code="org.jgroups.jmx.JChannelFactory" name="jgroups.mux:name=Multiplexer">
+    <!--mbean code="org.jgroups.JChannelFactory" name="jgroups.mux:name=Multiplexer" xmbean-dd="resource:META-INF/multiplexer-xmbean.xml" -->
+        <attribute name="Domain">jgroups.mux</attribute>
+        <attribute name="MultiplexerConfig">META-INF/multiplexer-stacks.xml</attribute>
+        <attribute name="ExposeChannels">true</attribute>
+        <attribute name="ExposeProtocols">true</attribute>
+
+        <!-- The address used to determine the node name  -->
+        <!-- <attribute name="NodeAddress">${jboss.bind.address}</attribute> -->
+
+    </mbean>
+
+
+</server>

Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2007-01-04 22:02:16 UTC (rev 1892)
@@ -111,8 +111,32 @@
       <description>The ObjectName of the server peer this destination was deployed on</description>
       <name>ServerPeer</name>
       <type>javax.management.ObjectName</type>
-   </attribute>   
+   </attribute>
 
+   <attribute access="read-write" getMethod="getChannelFactoryName" setMethod="setChannelFactoryName">
+      <description>The ObjectName of the JGroups Multiplexer used to create JChannels</description>
+      <name>ChannelFactoryName</name>
+      <type>javax.management.ObjectName</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getSyncChannelName" setMethod="setSyncChannelName">
+      <description>The name of the stack used on multiplexer for Sync Channels</description>
+      <name>SyncChannelName</name>
+      <type>java.lang.String</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getAsyncChannelName" setMethod="setAsyncChannelName">
+      <description>The name of the stack used on multiplexer for Async Channels</description>
+      <name>AsyncChannelName</name>
+      <type>java.lang.String</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getChannelPartitionName" setMethod="setChannelPartitionName">
+      <description>The partition name used to identify JMS/Postoffice on the JGroups Multiplexor</description>
+      <name>ChannelPartitionName</name>
+      <type>java.lang.String</type>
+   </attribute>
+
    <!-- Managed operations -->
 
     <operation>

Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -43,6 +43,9 @@
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.Peer;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.ChannelFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.MultiplexorChannelFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.XMLChannelFactory;
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.w3c.dom.Element;
 
@@ -71,8 +74,13 @@
 
    private boolean started;
 
+   // This group of properties is used on JGroups Channel configuration
    private Element syncChannelConfig;
    private Element asyncChannelConfig;
+   private ObjectName channelFactoryName;
+   private String syncChannelName;
+   private String asyncChannelName;
+   private String channelPartitionName;
 
    private ObjectName serverPeerObjectName;
 
@@ -160,6 +168,47 @@
       this.officeName = name;
    }
 
+
+   public ObjectName getChannelFactoryName()
+   {
+      return channelFactoryName;
+   }
+
+   public void setChannelFactoryName(ObjectName channelFactoryName)
+   {
+      this.channelFactoryName = channelFactoryName;
+   }
+
+   public String getSyncChannelName()
+   {
+      return syncChannelName;
+   }
+
+   public void setSyncChannelName(String syncChannelName)
+   {
+      this.syncChannelName = syncChannelName;
+   }
+
+   public String getAsyncChannelName()
+   {
+      return asyncChannelName;
+   }
+
+   public void setAsyncChannelName(String asyncChannelName)
+   {
+      this.asyncChannelName = asyncChannelName;
+   }
+
+   public String getChannelPartitionName()
+   {
+      return channelPartitionName;
+   }
+
+   public void setChannelPartitionName(String channelPartitionName)
+   {
+      this.channelPartitionName = channelPartitionName;
+   }
+
    public void setSyncChannelConfig(Element config) throws Exception
    {
       syncChannelConfig = config;
@@ -282,13 +331,43 @@
          FilterFactory ff = new SelectorFactory();
          FailoverMapper mapper = new DefaultFailoverMapper();
 
+         ChannelFactory channelFactory = null;
+
+         if (this.channelFactoryName != null)
+         {
+            Object info = null;
+            try
+            {
+               info = server.getMBeanInfo(channelFactoryName);
+            }
+            catch (Exception e)
+            {
+               log.error("Error", e);
+               // noop... means we couldn't find the channel hence we should use regular XMLChannelFactories
+            }
+            if (info!=null)
+            {
+               log.debug("*********************************** Using Multiplexor");
+               channelFactory = new MultiplexorChannelFactory(server, channelFactoryName, channelPartitionName, syncChannelName, asyncChannelName);
+            }
+            else
+            {
+               log.debug("*********************************** Using XMLChannelFactory");
+               channelFactory = new XMLChannelFactory(syncChannelConfig, asyncChannelConfig);
+            }
+         }
+         else
+         {
+            log.debug("*********************************** Using XMLChannelFactory");
+            channelFactory = new XMLChannelFactory(syncChannelConfig, asyncChannelConfig);
+         }
+
          postOffice =  new DefaultClusteredPostOffice(ds, tm, sqlProperties,
                                                       createTablesOnStartup,
                                                       nodeId, officeName, ms,
                                                       pm, tr, ff, cf, pool,
                                                       groupName,
-                                                      syncChannelConfig,
-                                                      asyncChannelConfig,
+                                                      channelFactory,
                                                       stateTimeout, castTimeout,
                                                       pullPolicy, rf,
                                                       mapper,

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -68,12 +68,12 @@
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.plugin.postoffice.DefaultBinding;
 import org.jboss.messaging.core.plugin.postoffice.DefaultPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.ChannelFactory;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.messaging.util.StreamUtils;
 import org.jgroups.Address;
 import org.jgroups.Channel;
-import org.jgroups.JChannel;
 import org.jgroups.MembershipListener;
 import org.jgroups.Message;
 import org.jgroups.MessageListener;
@@ -82,7 +82,6 @@
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.MessageDispatcher;
 import org.jgroups.blocks.RequestHandler;
-import org.w3c.dom.Element;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -163,12 +162,10 @@
 
    private boolean started;
 
-   private Element syncChannelConfigElement;
-   private String syncChannelConfig;
+   private ChannelFactory channelFactory;
+
    private Channel syncChannel;
 
-   private Element asyncChannelConfigElement;
-   private String asyncChannelConfig;
    private Channel asyncChannel;
 
    private MessageDispatcher controlMessageDispatcher;
@@ -233,8 +230,7 @@
                                      ConditionFactory conditionFactory,
                                      QueuedExecutorPool pool,
                                      String groupName,
-                                     Element syncChannelConfig,
-                                     Element asyncChannelConfig,
+                                     ChannelFactory channelFactory,
                                      long stateTimeout, long castTimeout,
                                      MessagePullPolicy redistributionPolicy,
                                      ClusterRouterFactory rf,
@@ -242,65 +238,6 @@
                                      long statsSendPeriod)
       throws Exception
    {
-      this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
-           pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout,
-           redistributionPolicy, rf, failoverMapper, statsSendPeriod);
-
-      this.syncChannelConfigElement = syncChannelConfig;
-      this.asyncChannelConfigElement = asyncChannelConfig;
-   }
-
-   /*
-    * Constructor using String for configuration
-    */
-   public DefaultClusteredPostOffice(DataSource ds,
-                                     TransactionManager tm,
-                                     Properties sqlProperties,
-                                     boolean createTablesOnStartup,
-                                     int nodeId,
-                                     String officeName,
-                                     MessageStore ms,
-                                     PersistenceManager pm,
-                                     TransactionRepository tr,
-                                     FilterFactory filterFactory,
-                                     ConditionFactory conditionFactory,
-                                     QueuedExecutorPool pool,
-                                     String groupName,
-                                     String syncChannelConfig,
-                                     String asyncChannelConfig,
-                                     long stateTimeout, long castTimeout,
-                                     MessagePullPolicy redistributionPolicy,
-                                     ClusterRouterFactory rf,
-                                     FailoverMapper failoverMapper,
-                                     long statsSendPeriod) throws Exception
-   {
-      this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
-           pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout,
-           redistributionPolicy, rf, failoverMapper, statsSendPeriod);
-
-      this.syncChannelConfig = syncChannelConfig;
-      this.asyncChannelConfig = asyncChannelConfig;
-   }
-
-   private DefaultClusteredPostOffice(DataSource ds,
-                                      TransactionManager tm,
-                                      Properties sqlProperties,
-                                      boolean createTablesOnStartup,
-                                      int nodeId,
-                                      String officeName,
-                                      MessageStore ms,
-                                      PersistenceManager pm,
-                                      TransactionRepository tr,
-                                      FilterFactory filterFactory,
-                                      ConditionFactory conditionFactory,
-                                      QueuedExecutorPool pool,
-                                      String groupName,
-                                      long stateTimeout, long castTimeout,
-                                      MessagePullPolicy redistributionPolicy,
-                                      ClusterRouterFactory rf,
-                                      FailoverMapper failoverMapper,
-                                      long statsSendPeriod)
-   {
       super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
              filterFactory, conditionFactory, pool);
 
@@ -335,6 +272,8 @@
       nbSupport = new NotificationBroadcasterSupport();
 
       viewExecutor = new QueuedExecutor();
+
+      this.channelFactory = channelFactory;
    }
 
    // MessagingComponent overrides ----------------------------------
@@ -348,16 +287,8 @@
 
       if (trace) { log.trace(this + " starting"); }
 
-      if (syncChannelConfigElement != null)
-      {
-         this.syncChannel = new JChannel(syncChannelConfigElement);
-         this.asyncChannel = new JChannel(asyncChannelConfigElement);
-      }
-      else
-      {
-         this.syncChannel = new JChannel(syncChannelConfig);
-         this.asyncChannel = new JChannel(asyncChannelConfig);
-      }
+      this.syncChannel = channelFactory.createSyncChannel();
+      this.asyncChannel = channelFactory.createASyncChannel();
 
       // We don't want to receive local messages on any of the channels
       syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/ChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/ChannelFactory.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/ChannelFactory.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,15 @@
+package org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory;
+
+import org.jgroups.JChannel;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id$
+ */
+public interface ChannelFactory
+{
+   public JChannel createSyncChannel() throws Exception;
+   public JChannel createASyncChannel() throws Exception;
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/MultiplexorChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/MultiplexorChannelFactory.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/MultiplexorChannelFactory.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,138 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.jgroups.JChannel;
+
+/**
+ * A ChannelFactory that will use the MBean JChannelFactory interface
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id$
+ */
+public class MultiplexorChannelFactory implements ChannelFactory
+{
+
+   // Constants
+   private static final String[] MUX_SIGNATURE = new String[]{"java.lang.String", "java.lang.String", "boolean", "java.lang.String"};
+
+   // Attributes
+   MBeanServer server;
+   ObjectName channelFactory;
+   String asyncStack;
+   String syncStack;
+   String uniqueID;
+   private static final String MUX_OPERATION = "createMultiplexerChannel";
+
+   // Static
+
+   // Constructors
+
+   public MultiplexorChannelFactory(MBeanServer server,
+                                    ObjectName channelFactory,
+                                    String uniqueID,
+                                    String syncStack,
+                                    String asyncStack)
+   {
+      this.server = server;
+      this.channelFactory = channelFactory;
+      this.uniqueID = uniqueID;
+      this.asyncStack = asyncStack;
+      this.syncStack = syncStack;
+   }
+
+   // Public
+
+   public MBeanServer getServer()
+   {
+      return server;
+   }
+
+   public void setServer(MBeanServer server)
+   {
+      this.server = server;
+   }
+
+   public ObjectName getChannelFactory()
+   {
+      return channelFactory;
+   }
+
+   public void setChannelFactory(ObjectName channelFactory)
+   {
+      this.channelFactory = channelFactory;
+   }
+
+   public String getAsyncStack()
+   {
+      return asyncStack;
+   }
+
+   public void setAsyncStack(String asyncStack)
+   {
+      this.asyncStack = asyncStack;
+   }
+
+   public String getSyncStack()
+   {
+      return syncStack;
+   }
+
+   public void setSyncStack(String syncStack)
+   {
+      this.syncStack = syncStack;
+   }
+
+   public String getUniqueID()
+   {
+      return uniqueID;
+   }
+
+   public void setUniqueID(String uniqueID)
+   {
+      this.uniqueID = uniqueID;
+   }
+
+   public JChannel createSyncChannel() throws Exception
+   {
+      return (JChannel) server.invoke(this.channelFactory, MUX_OPERATION, new Object[]{syncStack, uniqueID, Boolean.TRUE, uniqueID}, MUX_SIGNATURE);
+   }
+
+   public JChannel createASyncChannel() throws Exception
+   {
+      return (JChannel) server.invoke(this.channelFactory, MUX_OPERATION, new Object[]{asyncStack, uniqueID, Boolean.TRUE, uniqueID}, MUX_SIGNATURE);
+   }
+
+   // Package protected
+
+   // Protected
+
+   // Private
+
+   // Inner classes
+
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/NameChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/NameChannelFactory.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/NameChannelFactory.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,96 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory;
+
+import org.jgroups.JChannel;
+
+/**
+ * A ChannelFactory that will use config names (from channelfactory)
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id$
+ */
+public class NameChannelFactory implements ChannelFactory
+{
+
+   // Constants
+
+   // Attributes
+
+   String asyncConfig;
+   String syncConfig;
+
+   // Static
+
+   // Constructors
+
+   public NameChannelFactory(String syncConfig, String asyncConfig)
+   {
+      this.syncConfig = syncConfig;
+      this.asyncConfig = asyncConfig;
+   }
+
+   // Public
+
+   public String getAsyncConfig()
+   {
+      return asyncConfig;
+   }
+
+   public void setAsyncConfig(String asyncConfig)
+   {
+      this.asyncConfig = asyncConfig;
+   }
+
+   public String getSyncConfig()
+   {
+      return syncConfig;
+   }
+
+   public void setSyncConfig(String syncConfig)
+   {
+      this.syncConfig = syncConfig;
+   }
+
+   // ChannelFactory implementation
+
+   public JChannel createSyncChannel() throws Exception
+   {
+      return new JChannel(syncConfig);
+   }
+
+   public JChannel createASyncChannel() throws Exception
+   {
+      return new JChannel(asyncConfig);
+   }
+
+   // Package protected
+
+   // Protected
+
+   // Private
+
+   // Inner classes
+
+}

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/XMLChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/XMLChannelFactory.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/XMLChannelFactory.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,95 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory;
+
+import org.w3c.dom.Element;
+import org.jgroups.JChannel;
+
+/**
+ * A ChannelFactory that will use Elements to create channels.
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ *          <p/>
+ *          $Id:$
+ */
+public class XMLChannelFactory implements ChannelFactory
+{
+
+   // Constants
+
+   // Attributes
+   Element syncConfig;
+   Element asyncConfig;
+
+   // Static
+
+   // Constructors
+
+   public XMLChannelFactory(Element syncConfig, Element asyncConfig)
+   {
+      this.syncConfig = syncConfig;
+      this.asyncConfig = asyncConfig;
+   }
+
+   // Public
+
+   public Element getSyncConfig()
+   {
+      return syncConfig;
+   }
+
+   public void setSyncConfig(Element syncConfig)
+   {
+      this.syncConfig = syncConfig;
+   }
+
+   public Element getAsyncConfig()
+   {
+      return asyncConfig;
+   }
+
+   public void setAsyncConfig(Element asyncConfig)
+   {
+      this.asyncConfig = asyncConfig;
+   }
+
+   // implementation of ChannelFactory
+   public JChannel createSyncChannel() throws Exception
+   {
+      return new JChannel(syncConfig);
+   }
+
+   public JChannel createASyncChannel() throws Exception
+   {
+      return new JChannel(asyncConfig);
+   }
+
+   // Package protected
+
+   // Protected
+
+   // Private
+
+   // Inner classes
+
+}

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/build.xml	2007-01-04 22:02:16 UTC (rev 1892)
@@ -287,6 +287,7 @@
          <sysproperty key="test.database" value="${functional.tests.database}"/>
          <sysproperty key="test.serialization" value="${functional.tests.serialization}"/>
          <sysproperty key="test.remoting" value="${test.remoting}"/>
+         <sysproperty key="java.net.preferIPv4Stack" value="true"/>
          <!--
          <jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=rmiserver"/>
          -->
@@ -305,6 +306,7 @@
          <sysproperty key="test.database" value="${clustering.tests.database}"/>
          <sysproperty key="test.serialization" value="${functional.tests.serialization}"/>
          <sysproperty key="test.clustered" value="true"/>
+         <sysproperty key="java.net.preferIPv4Stack" value="true"/>
          <!--
          <jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=rmiserver"/>
          -->
@@ -783,6 +785,7 @@
          <sysproperty key="test.clustered" value="true"/>
          <sysproperty key="test.logfile.suffix" value="clustering-client"/>
          <jvmarg value="-Xmx512M"/>
+         <jvmarg value="-Djava.net.preferIPv4Stack=true"/>
          <!--
          <jvmarg line="-Xmx512M -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=n,suspend=n,address=antjunit"/>
           -->

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -40,6 +40,7 @@
 import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.NameChannelFactory;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.test.messaging.core.SimpleCondition;
 import org.jboss.test.messaging.core.SimpleConditionFactory;
@@ -2275,16 +2276,16 @@
       FailoverMapper mapper = new DefaultFailoverMapper();
       
       ConditionFactory cf = new SimpleConditionFactory();
-      
-      DefaultClusteredPostOffice postOffice = 
+
+      DefaultClusteredPostOffice postOffice =
          new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
-                                 sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
-                                 "Clustered", ms, pm, tr, ff, cf, pool,
-                                 groupName,
-                                 JGroupsUtil.getControlStackProperties(),
-                                 JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, pullPolicy, rf, mapper, 1000);
-      
+            sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
+            "Clustered", ms, pm, tr, ff, cf, pool,
+            groupName,
+            new NameChannelFactory(JGroupsUtil.getControlStackProperties(),
+               JGroupsUtil.getDataStackProperties()),
+            5000, 5000, pullPolicy, rf, mapper, 1000);
+
       postOffice.start();      
       
       return postOffice;

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -35,6 +35,7 @@
 import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.NameChannelFactory;
 import org.jboss.test.messaging.core.SimpleCondition;
 import org.jboss.test.messaging.core.SimpleConditionFactory;
 import org.jboss.test.messaging.core.SimpleFilterFactory;
@@ -385,17 +386,17 @@
       
       FailoverMapper mapper = new DefaultFailoverMapper();
       
-      ConditionFactory cf = new SimpleConditionFactory();      
-      
-      DefaultClusteredPostOffice postOffice = 
+      ConditionFactory cf = new SimpleConditionFactory();
+
+      DefaultClusteredPostOffice postOffice =
          new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
-                                 sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
-                                 "Clustered", ms, pm, tr, ff, cf, pool,
-                                 groupName,
-                                 JGroupsUtil.getControlStackProperties(),
-                                 JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, redistPolicy, rf, mapper, 1000);
-      
+            sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
+            "Clustered", ms, pm, tr, ff, cf, pool,
+            groupName,
+            new NameChannelFactory(JGroupsUtil.getControlStackProperties(),
+               JGroupsUtil.getDataStackProperties()),
+            5000, 5000, redistPolicy, rf, mapper, 1000);
+
       postOffice.start();      
       
       return postOffice;

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -47,6 +47,7 @@
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.QueueStats;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.NameChannelFactory;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.test.messaging.core.SimpleConditionFactory;
 import org.jboss.test.messaging.core.SimpleFilterFactory;
@@ -359,17 +360,17 @@
       
       FailoverMapper mapper = new DefaultFailoverMapper();
       
-      ConditionFactory cf = new SimpleConditionFactory();      
-      
-      DefaultClusteredPostOffice postOffice = 
+      ConditionFactory cf = new SimpleConditionFactory();
+
+      DefaultClusteredPostOffice postOffice =
          new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
-                                 sc.getClusteredPostOfficeSQLProperties(), true, nodeId, 
-                                 "Clustered", ms, pm, tr, ff, cf, pool,
-                                 groupName,
-                                 JGroupsUtil.getControlStackProperties(),
-                                 JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, redistPolicy, rf, mapper, 1000);
-      
+            sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
+            "Clustered", ms, pm, tr, ff, cf, pool,
+            groupName,
+            new NameChannelFactory(JGroupsUtil.getControlStackProperties(),
+               JGroupsUtil.getDataStackProperties()),
+            5000, 5000, redistPolicy, rf, mapper, 1000);
+
       postOffice.start();      
       
       return postOffice;

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -37,6 +37,7 @@
 import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.NameChannelFactory;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TransactionException;
 import org.jboss.test.messaging.core.SimpleCondition;
@@ -49,23 +50,21 @@
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
 /**
- * 
  * A RecoveryTest
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
+ *          <p/>
+ *          $Id$
  */
 public class RecoveryTest extends PostOfficeTestBase
 {
    // Constants -----------------------------------------------------
 
    // Static --------------------------------------------------------
-   
+
    // Attributes ----------------------------------------------------
-   
+
    // Constructors --------------------------------------------------
 
    public RecoveryTest(String name)
@@ -81,305 +80,303 @@
    }
 
    public void tearDown() throws Exception
-   {      
+   {
       super.tearDown();
    }
-   
+
    public void testCrashBeforePersist() throws Exception
    {
       DefaultClusteredPostOffice office1 = null;
-      
+
       DefaultClusteredPostOffice office2 = null;
-      
+
       DefaultClusteredPostOffice office3 = null;
-      
+
       try
-      {      
-         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-         
-         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-         
-         office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+      {
+         office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+         office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+         office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
+
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding1 =
             office1.bindClusteredQueue(new SimpleCondition("topic1"), queue1);
-         
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding2 =
             office2.bindClusteredQueue(new SimpleCondition("topic1"), queue2);
-         
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding3 =
             office3.bindClusteredQueue(new SimpleCondition("topic1"), queue3);
-         
+
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.add(receiver1);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue2.add(receiver2);
          SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue3.add(receiver3);
-         
+
          //This will make it fail after casting but before persisting the message in the db
          office1.setFail(true, false, false);
-         
+
          Transaction tx = tr.createTransaction();
-         
+
          final int NUM_MESSAGES = 10;
-         
+
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            Message msg = CoreMessageFactory.createCoreMessage(i);   
+            Message msg = CoreMessageFactory.createCoreMessage(i);
             msg.setReliable(true);
-            
-            MessageReference ref = ms.reference(msg);  
-            
+
+            MessageReference ref = ms.reference(msg);
+
             office1.route(ref, new SimpleCondition("topic1"), tx);
          }
-         
+
          Thread.sleep(1000);
-         
+
          List msgs = receiver1.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          msgs = receiver2.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          msgs = receiver3.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          try
          {
             //An exception should be thrown            
             tx.commit();
-            fail();                        
+            fail();
          }
          catch (TransactionException e)
          {
             //Ok
          }
-         
+
          Thread.sleep(1000);
-         
+
          msgs = receiver1.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          msgs = receiver2.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          msgs = receiver3.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          //Nodes 2 and 3 should have a held tx
-         
+
          assertTrue(office1.getHoldingTransactions().isEmpty());
 
          assertEquals(1, office2.getHoldingTransactions().size());
-         
+
          assertEquals(1, office3.getHoldingTransactions().size());
-         
+
          //We now kill the office - this should make the other offices do their transaction check
          office1.stop();
-         
+
          Thread.sleep(1000);
-         
+
          //This should result in the held txs being rolled back
-         
+
          assertTrue(office1.getHoldingTransactions().isEmpty());
-         
+
          assertTrue(office2.getHoldingTransactions().isEmpty());
-         
+
          assertTrue(office3.getHoldingTransactions().isEmpty());
-         
+
          //The tx should be removed from the holding area and nothing should be received
          //remember node1 has now crashed so no point checking receiver1
-         
+
          msgs = receiver2.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          msgs = receiver3.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
       }
       finally
       {
          if (office1 != null)
-         {           
+         {
             office1.stop();
          }
-         
+
          if (office2 != null)
-         {           
+         {
             office2.stop();
          }
-         
-         if (office3!= null)
-         {           
+
+         if (office3 != null)
+         {
             office3.stop();
          }
       }
    }
-   
+
    public void testCrashAfterPersist() throws Exception
    {
       DefaultClusteredPostOffice office1 = null;
-      
+
       DefaultClusteredPostOffice office2 = null;
-      
+
       DefaultClusteredPostOffice office3 = null;
-      
+
       try
-      {      
-         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-         
-         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-         
-         office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+      {
+         office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+         office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+         office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
+
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding1 =
             office1.bindClusteredQueue(new SimpleCondition("topic1"), queue1);
-         
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding2 =
             office2.bindClusteredQueue(new SimpleCondition("topic1"), queue2);
-         
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding3 =
             office3.bindClusteredQueue(new SimpleCondition("topic1"), queue3);
-         
+
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.add(receiver1);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue2.add(receiver2);
          SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue3.add(receiver3);
-         
+
          //This will make it fail after casting and persisting the message in the db
          office1.setFail(false, true, false);
-         
+
          Transaction tx = tr.createTransaction();
-         
+
          final int NUM_MESSAGES = 10;
-         
+
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            Message msg = CoreMessageFactory.createCoreMessage(i);   
+            Message msg = CoreMessageFactory.createCoreMessage(i);
             msg.setReliable(true);
-            
-            MessageReference ref = ms.reference(msg);  
-            
+
+            MessageReference ref = ms.reference(msg);
+
             office1.route(ref, new SimpleCondition("topic1"), tx);
          }
-         
+
          Thread.sleep(1000);
-         
+
          List msgs = receiver1.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          msgs = receiver2.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          msgs = receiver3.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          try
          {
             //An exception should be thrown            
             tx.commit();
-            fail();                       
+            fail();
          }
          catch (TransactionException e)
          {
             //Ok
          }
-         
+
          Thread.sleep(1000);
-         
+
          msgs = receiver1.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          msgs = receiver2.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          msgs = receiver3.getMessages();
          assertTrue(msgs.isEmpty());
-         
+
          //There should be held tx in 2 and 3 but not in 1
-         
+
          assertTrue(office1.getHoldingTransactions().isEmpty());
-         
+
          assertEquals(1, office2.getHoldingTransactions().size());
-         
+
          assertEquals(1, office3.getHoldingTransactions().size());
-         
+
          //We now kill the office - this should make the other office do it's transaction check
          office1.stop();
-         
+
          Thread.sleep(1000);
-         
+
          assertTrue(office1.getHoldingTransactions().isEmpty());
-         
+
          assertTrue(office2.getHoldingTransactions().isEmpty());
-         
+
          assertTrue(office3.getHoldingTransactions().isEmpty());
-         
+
          //The tx should be removed from the holding area and messages be received
          //no point checking receiver1 since node1 has crashed
-         
+
          msgs = receiver2.getMessages();
          assertEquals(NUM_MESSAGES, msgs.size());
-         
+
          msgs = receiver3.getMessages();
          assertEquals(NUM_MESSAGES, msgs.size());
-         
-         
+
+
       }
       finally
       {
          if (office1 != null)
-         {           
+         {
             office1.stop();
          }
-         
+
          if (office2 != null)
-         {           
+         {
             office2.stop();
          }
       }
    }
-   
-   
-  
-   
+
+
    protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
    {
       MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
-      
+
       FilterFactory ff = new SimpleFilterFactory();
-      
+
       ClusterRouterFactory rf = new DefaultRouterFactory();
-      
+
       FailoverMapper mapper = new DefaultFailoverMapper();
-      
-      ConditionFactory cf = new SimpleConditionFactory();            
-      
-      DefaultClusteredPostOffice postOffice = 
+
+      ConditionFactory cf = new SimpleConditionFactory();
+
+      DefaultClusteredPostOffice postOffice =
          new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
-                                 sc.getClusteredPostOfficeSQLProperties(), true, nodeId, "Clustered",
-                                 ms, pm, tr, ff, cf, pool,
-                                 groupName,
-                                 JGroupsUtil.getControlStackProperties(),
-                                 JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, redistPolicy, rf, mapper, 1000);
-      
-      postOffice.start();      
-      
+            sc.getClusteredPostOfficeSQLProperties(), true, nodeId, "Clustered",
+            ms, pm, tr, ff, cf, pool,
+            groupName,
+            new NameChannelFactory(JGroupsUtil.getControlStackProperties(),
+               JGroupsUtil.getDataStackProperties()),
+            5000, 5000, redistPolicy, rf, mapper, 1000);
+
+      postOffice.start();
+
       return postOffice;
    }
 
    // Private -------------------------------------------------------
-      
+
    // Inner classes -------------------------------------------------   
 
 }

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -44,6 +44,7 @@
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
 import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.NameChannelFactory;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.test.messaging.core.SimpleCondition;
 import org.jboss.test.messaging.core.SimpleConditionFactory;
@@ -56,21 +57,19 @@
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
 /**
- * 
  * A RedistributionWithDefaultMessagePullPolicyTest
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
+ *          <p/>
+ *          $Id$
  */
 public class RedistributionWithDefaultMessagePullPolicyTest extends PostOfficeTestBase
 {
    // Constants -----------------------------------------------------
 
    // Static --------------------------------------------------------
-   
+
    // Attributes ----------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -88,970 +87,970 @@
    }
 
    public void tearDown() throws Exception
-   {      
+   {
       super.tearDown();
    }
-   
+
    public void testConsumeAllNonPersistentNonRecoverable() throws Throwable
    {
       consumeAll(false, false);
    }
-   
+
    public void testConsumeAllPersistentNonRecoverable() throws Throwable
    {
       consumeAll(true, false);
    }
-   
+
    public void testConsumeAllNonPersistentRecoverable() throws Throwable
    {
       consumeAll(false, true);
    }
-   
+
    public void testConsumeAllPersistentRecoverable() throws Throwable
    {
       consumeAll(true, true);
    }
-         
+
    public void testConsumeBitByBitNonPersistentNonRecoverable() throws Throwable
    {
       consumeBitByBit(false, false);
    }
-   
+
    public void testConsumeBitByBitPersistentNonRecoverable() throws Throwable
    {
       consumeBitByBit(true, false);
    }
-   
+
    public void testConsumeBitByBitNonPersistentRecoverable() throws Throwable
    {
       consumeBitByBit(false, true);
    }
-   
+
    public void testConsumeBitByBitPersistentRecoverable() throws Throwable
    {
       consumeBitByBit(true, true);
    }
-   
+
    public void testSimpleMessagePull() throws Throwable
    {
       DefaultClusteredPostOffice office1 = null;
-      
+
       DefaultClusteredPostOffice office2 = null;
-      
+
       try
-      {      
-         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-         
-         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+      {
+         office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+         office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding1 =
             office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-         
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding2 =
             office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-                          
-         Message msg = CoreMessageFactory.createCoreMessage(1);   
+
+         Message msg = CoreMessageFactory.createCoreMessage(1);
          msg.setReliable(true);
-         
-         MessageReference ref = ms.reference(msg);  
-         
+
+         MessageReference ref = ms.reference(msg);
+
          office1.route(ref, new SimpleCondition("queue1"), null);
-                  
+
          Thread.sleep(2000);
-         
+
          //Messages should all be in queue1
-         
+
          List msgs = queue1.browse();
          assertEquals(1, msgs.size());
-         
+
          msgs = queue2.browse();
          assertTrue(msgs.isEmpty());
-         
+
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          receiver1.setMaxRefs(0);
          queue1.add(receiver1);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          receiver2.setMaxRefs(0);
          queue2.add(receiver2);
-         
+
          //Prompt delivery so the channels know if the receivers are ready
          queue1.deliver(false);
          Thread.sleep(2000);
-           
+
          //Pull from 1 to 2
-         
+
          receiver2.setMaxRefs(1);
-         
+
          log.info("delivering");
-         queue2.deliver(false);                 
-         
+         queue2.deliver(false);
+
          Thread.sleep(3000);
-         
-         assertTrue(office1.getHoldingTransactions().isEmpty());         
+
+         assertTrue(office1.getHoldingTransactions().isEmpty());
          assertTrue(office2.getHoldingTransactions().isEmpty());
-         
+
          log.info("r2 " + receiver2.getMessages().size());
-         
+
          log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
-         
+
          assertEquals(0, queue1.memoryRefCount());
          assertEquals(0, queue1.getDeliveringCount());
-         
+
          assertEquals(0, queue2.memoryRefCount());
          assertEquals(1, queue2.getDeliveringCount());
-         
+
          this.acknowledgeAll(receiver2);
-         
+
          assertEquals(0, queue2.memoryRefCount());
          assertEquals(0, queue2.getDeliveringCount());
-         
-         assertTrue(office1.getHoldingTransactions().isEmpty());         
+
+         assertTrue(office1.getHoldingTransactions().isEmpty());
          assertTrue(office2.getHoldingTransactions().isEmpty());
-           
+
       }
       finally
       {
          if (office1 != null)
-         {           
+         {
             office1.stop();
          }
-         
+
          if (office2 != null)
-         {           
+         {
             office2.stop();
          }
       }
    }
-   
+
    public void testSimpleMessagePullCrashBeforeCommit() throws Throwable
    {
       DefaultClusteredPostOffice office1 = null;
-      
+
       DefaultClusteredPostOffice office2 = null;
-      
+
       try
-      {      
-         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-         
-         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+      {
+         office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+         office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding1 =
             office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-         
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding2 =
             office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-                          
-         Message msg = CoreMessageFactory.createCoreMessage(1);   
+
+         Message msg = CoreMessageFactory.createCoreMessage(1);
          msg.setReliable(true);
-         
-         MessageReference ref = ms.reference(msg);  
-         
+
+         MessageReference ref = ms.reference(msg);
+
          office1.route(ref, new SimpleCondition("queue1"), null);
-                  
+
          Thread.sleep(2000);
-         
+
          //Messages should all be in queue1
-         
+
          List msgs = queue1.browse();
          assertEquals(1, msgs.size());
-         
+
          msgs = queue2.browse();
          assertTrue(msgs.isEmpty());
-         
+
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          receiver1.setMaxRefs(0);
          queue1.add(receiver1);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          receiver2.setMaxRefs(0);
          queue2.add(receiver2);
-         
+
          //Prompt delivery so the channels know if the receivers are ready
          queue1.deliver(false);
          Thread.sleep(2000);
-           
+
          //Pull from 1 to 2
-         
+
          receiver2.setMaxRefs(1);
-         
+
          //Force a failure before commit
          office2.setFail(true, false, false);
-         
+
          log.info("delivering");
-         queue2.deliver(false);                 
-         
+         queue2.deliver(false);
+
          Thread.sleep(3000);
-         
-         assertEquals(1, office1.getHoldingTransactions().size());         
+
+         assertEquals(1, office1.getHoldingTransactions().size());
          assertTrue(office2.getHoldingTransactions().isEmpty());
-         
+
          log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
-         
+
          assertEquals(0, queue1.memoryRefCount());
          assertEquals(1, queue1.getDeliveringCount());
-         
+
          assertEquals(0, queue2.memoryRefCount());
          assertEquals(0, queue2.getDeliveringCount());
-         
+
          //Now kill office 2 - this should cause office1 to remove the dead held transaction
-         
-         office2.stop();         
+
+         office2.stop();
          Thread.sleep(2000);
-         
-         assertTrue(office1.getHoldingTransactions().isEmpty());        
- 
+
+         assertTrue(office1.getHoldingTransactions().isEmpty());
+
          //The delivery should be cancelled back to the queue too
-         
+
          assertEquals(1, queue1.memoryRefCount());
          assertEquals(0, queue1.getDeliveringCount());
-         
-            
+
+
       }
       finally
       {
          if (office1 != null)
-         {           
+         {
             office1.stop();
          }
-         
+
          if (office2 != null)
-         {           
+         {
             office2.stop();
          }
       }
    }
-   
+
    public void testSimpleMessagePullCrashAfterCommit() throws Throwable
    {
       DefaultClusteredPostOffice office1 = null;
-      
+
       DefaultClusteredPostOffice office2 = null;
-      
+
       try
-      {      
-         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-         
-         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+      {
+         office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+         office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding1 =
             office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-         
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding2 =
             office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-                          
-         Message msg = CoreMessageFactory.createCoreMessage(1);   
+
+         Message msg = CoreMessageFactory.createCoreMessage(1);
          msg.setReliable(true);
-         
-         MessageReference ref = ms.reference(msg);  
-         
+
+         MessageReference ref = ms.reference(msg);
+
          office1.route(ref, new SimpleCondition("queue1"), null);
-                  
+
          Thread.sleep(2000);
-         
+
          //Messages should all be in queue1
-         
+
          List msgs = queue1.browse();
          assertEquals(1, msgs.size());
-         
+
          msgs = queue2.browse();
          assertTrue(msgs.isEmpty());
-         
+
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          receiver1.setMaxRefs(0);
          queue1.add(receiver1);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          receiver2.setMaxRefs(0);
          queue2.add(receiver2);
-         
+
          //Prompt delivery so the channels know if the receivers are ready
          queue1.deliver(false);
          Thread.sleep(2000);
-           
+
          //Pull from 1 to 2
-         
+
          receiver2.setMaxRefs(1);
-         
+
          //Force a failure after commit the ack to storage
          office2.setFail(false, true, false);
-         
+
          log.info("delivering");
-         queue2.deliver(false);                 
-         
+         queue2.deliver(false);
+
          Thread.sleep(3000);
-         
-         assertEquals(1, office1.getHoldingTransactions().size());         
+
+         assertEquals(1, office1.getHoldingTransactions().size());
          assertTrue(office2.getHoldingTransactions().isEmpty());
-         
+
          log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
-         
+
          assertEquals(0, queue1.memoryRefCount());
          assertEquals(1, queue1.getDeliveringCount());
-            
+
          //Now kill office 2 - this should cause office1 to remove the dead held transaction
-         
-         office2.stop();         
+
+         office2.stop();
          Thread.sleep(2000);
-         
-         assertTrue(office1.getHoldingTransactions().isEmpty());        
-         
+
+         assertTrue(office1.getHoldingTransactions().isEmpty());
+
          //The delivery should be committed
-         
+
          assertEquals(0, queue1.memoryRefCount());
          assertEquals(0, queue1.getDeliveringCount());
-         
+
       }
       finally
       {
          if (office1 != null)
-         {           
+         {
             office1.stop();
          }
-         
+
          if (office2 != null)
-         {           
+         {
             office2.stop();
          }
       }
    }
-   
+
    public void testFailHandleMessagePullResult() throws Throwable
    {
       DefaultClusteredPostOffice office1 = null;
-      
+
       DefaultClusteredPostOffice office2 = null;
-      
+
       try
-      {      
-         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-         
-         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+      {
+         office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+         office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding1 =
             office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-         
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
          Binding binding2 =
             office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-                          
-         Message msg = CoreMessageFactory.createCoreMessage(1);   
+
+         Message msg = CoreMessageFactory.createCoreMessage(1);
          msg.setReliable(true);
-         
-         MessageReference ref = ms.reference(msg);  
-         
+
+         MessageReference ref = ms.reference(msg);
+
          office1.route(ref, new SimpleCondition("queue1"), null);
-                  
+
          Thread.sleep(2000);
-         
+
          //Messages should all be in queue1
-         
+
          List msgs = queue1.browse();
          assertEquals(1, msgs.size());
-         
+
          msgs = queue2.browse();
          assertTrue(msgs.isEmpty());
-         
+
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          receiver1.setMaxRefs(0);
          queue1.add(receiver1);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          receiver2.setMaxRefs(0);
          queue2.add(receiver2);
-         
+
          //Prompt delivery so the channels know if the receivers are ready
          queue1.deliver(false);
          Thread.sleep(2000);
-           
+
          //Pull from 1 to 2
-         
+
          receiver2.setMaxRefs(1);
-         
+
          office2.setFail(false, false, true);
-         
+
          log.info("delivering");
-         queue2.deliver(false);                 
-         
+         queue2.deliver(false);
+
          Thread.sleep(3000);
-         
+
          //The delivery should be rolled back
-         
-         assertTrue(office2.getHoldingTransactions().isEmpty());        
+
          assertTrue(office2.getHoldingTransactions().isEmpty());
-         
+         assertTrue(office2.getHoldingTransactions().isEmpty());
+
          log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
-         
+
          assertEquals(1, queue1.memoryRefCount());
          assertEquals(0, queue1.getDeliveringCount());
-         
+
          assertEquals(0, queue2.memoryRefCount());
          assertEquals(0, queue2.getDeliveringCount());
       }
       finally
       {
          if (office1 != null)
-         {           
+         {
             office1.stop();
          }
-         
+
          if (office2 != null)
-         {           
+         {
             office2.stop();
          }
       }
    }
-   
+
    protected void consumeAll(boolean persistent, boolean recoverable) throws Throwable
    {
       DefaultClusteredPostOffice office1 = null;
-      
+
       DefaultClusteredPostOffice office2 = null;
-      
+
       DefaultClusteredPostOffice office3 = null;
-      
+
       DefaultClusteredPostOffice office4 = null;
-      
+
       DefaultClusteredPostOffice office5 = null;
-          
+
       try
-      {   
-         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-         
-         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-         
-         office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-         
-         office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
-         
-         office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+      {
+         office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+         office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+         office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
+
+         office4 = (DefaultClusteredPostOffice) createClusteredPostOffice(4, "testgroup");
+
+         office5 = (DefaultClusteredPostOffice) createClusteredPostOffice(5, "testgroup");
+
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
          Binding binding1 = office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-                  
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
          Binding binding2 = office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-                  
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
-         Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);         
-         
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
+         Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
+
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
          Binding binding4 = office4.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
-                  
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
          Binding binding5 = office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
-                   
+
          final int NUM_MESSAGES = 100;
-         
+
          this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
-                 
+
          Thread.sleep(2000);
-         
+
          //Check the sizes
-         
-         log.info("Here are the sizes:");         
+
+         log.info("Here are the sizes:");
          log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
          log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
          log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
          log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-               
+
          assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
          assertEquals(0, queue1.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
          assertEquals(0, queue2.getDeliveringCount());
-           
+
          assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
          assertEquals(0, queue3.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
          assertEquals(0, queue4.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
          assertEquals(0, queue5.getDeliveringCount());
-         
+
          SimpleReceiver receiver = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         
+
          queue1.add(receiver);
-         
+
          queue1.deliver(false);
-         
+
          Thread.sleep(7000);
-         
-         log.info("Here are the sizes:");         
+
+         log.info("Here are the sizes:");
          log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
          log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
          log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
          log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-         
+
          assertEquals(0, queue1.memoryRefCount());
          assertEquals(NUM_MESSAGES * 5, queue1.getDeliveringCount());
-         
+
          assertEquals(0, queue2.memoryRefCount());
          assertEquals(0, queue2.getDeliveringCount());
-           
+
          assertEquals(0, queue3.memoryRefCount());
          assertEquals(0, queue3.getDeliveringCount());
-         
+
          assertEquals(0, queue4.memoryRefCount());
          assertEquals(0, queue4.getDeliveringCount());
-         
+
          assertEquals(0, queue5.memoryRefCount());
          assertEquals(0, queue5.getDeliveringCount());
-         
+
          List messages = receiver.getMessages();
-         
+
          assertNotNull(messages);
-         
+
          assertEquals(NUM_MESSAGES * 5, messages.size());
-         
+
          Iterator iter = messages.iterator();
-         
+
          while (iter.hasNext())
          {
-            Message msg = (Message)iter.next();
-            
+            Message msg = (Message) iter.next();
+
             receiver.acknowledge(msg, null);
          }
-         
+
          receiver.clear();
-         
+
          assertEquals(0, queue1.memoryRefCount());
          assertEquals(0, queue1.getDeliveringCount());
-         
+
          assertTrue(office1.getHoldingTransactions().isEmpty());
          assertTrue(office2.getHoldingTransactions().isEmpty());
          assertTrue(office3.getHoldingTransactions().isEmpty());
          assertTrue(office4.getHoldingTransactions().isEmpty());
          assertTrue(office5.getHoldingTransactions().isEmpty());
-         
+
          if (checkNoMessageData())
          {
             fail("Message data still in database");
          }
       }
       finally
-      { 
+      {
          if (office1 != null)
          {
             office1.stop();
          }
-         
+
          if (office2 != null)
-         {            
+         {
             office2.stop();
          }
-         
+
          if (office3 != null)
          {
             office3.stop();
          }
-         
+
          if (office4 != null)
-         {            
+         {
             office4.stop();
          }
-         
+
          if (office5 != null)
          {
             office5.stop();
          }
       }
    }
-   
+
    protected void consumeBitByBit(boolean persistent, boolean recoverable) throws Throwable
    {
       DefaultClusteredPostOffice office1 = null;
-      
+
       DefaultClusteredPostOffice office2 = null;
-      
+
       DefaultClusteredPostOffice office3 = null;
-      
+
       DefaultClusteredPostOffice office4 = null;
-      
+
       DefaultClusteredPostOffice office5 = null;
-          
+
       try
-      {   
-         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-         
-         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-         
-         office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-         
-         office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
-         
-         office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+      {
+         office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+         office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+         office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
+
+         office4 = (DefaultClusteredPostOffice) createClusteredPostOffice(4, "testgroup");
+
+         office5 = (DefaultClusteredPostOffice) createClusteredPostOffice(5, "testgroup");
+
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
          Binding binding1 = office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-                  
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
          Binding binding2 = office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-                  
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
-         Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);         
-         
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
+         Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
+
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
          Binding binding4 = office4.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
-                  
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
          Binding binding5 = office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
-                  
+
          final int NUM_MESSAGES = 100;
-          
+
          this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
          this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
-                          
+
          Thread.sleep(2000);
-                
+
          //Check the sizes
-         
-         log.info("Here are the sizes 1:");         
+
+         log.info("Here are the sizes 1:");
          log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
          log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
          log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
          log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-               
+
          assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
          assertEquals(0, queue1.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
          assertEquals(0, queue2.getDeliveringCount());
-           
+
          assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
          assertEquals(0, queue3.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
          assertEquals(0, queue4.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
          assertEquals(0, queue5.getDeliveringCount());
-         
+
          assertTrue(office1.getHoldingTransactions().isEmpty());
          assertTrue(office2.getHoldingTransactions().isEmpty());
          assertTrue(office3.getHoldingTransactions().isEmpty());
          assertTrue(office4.getHoldingTransactions().isEmpty());
          assertTrue(office5.getHoldingTransactions().isEmpty());
-                 
-         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);         
+
+         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          queue1.add(receiver1);
-         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);         
+         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          queue2.add(receiver2);
-         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);         
+         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          queue3.add(receiver3);
-         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);         
+         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          queue4.add(receiver4);
-         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);         
+         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
          queue5.add(receiver5);
-         
-         receiver1.setMaxRefs(5);         
-         queue1.deliver(false);         
-         Thread.sleep(1000);         
+
+         receiver1.setMaxRefs(5);
+         queue1.deliver(false);
+         Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 5, queue1.memoryRefCount());
          assertEquals(5, queue1.getDeliveringCount());
-         
+
          acknowledgeAll(receiver1);
          assertEquals(0, queue1.getDeliveringCount());
          receiver1.setMaxRefs(0);
-         
-         receiver2.setMaxRefs(10);         
-         queue2.deliver(false);         
+
+         receiver2.setMaxRefs(10);
+         queue2.deliver(false);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
          assertEquals(10, queue2.getDeliveringCount());
          acknowledgeAll(receiver2);
          receiver2.setMaxRefs(0);
-                  
-         receiver3.setMaxRefs(15);         
-         queue3.deliver(false);         
+
+         receiver3.setMaxRefs(15);
+         queue3.deliver(false);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
          assertEquals(15, queue3.getDeliveringCount());
          acknowledgeAll(receiver3);
          receiver3.setMaxRefs(0);
-         
-         receiver4.setMaxRefs(20);         
-         queue4.deliver(false);         
+
+         receiver4.setMaxRefs(20);
+         queue4.deliver(false);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
          assertEquals(20, queue4.getDeliveringCount());
          acknowledgeAll(receiver4);
          receiver4.setMaxRefs(0);
-         
-         receiver5.setMaxRefs(25);         
-         queue5.deliver(false);         
+
+         receiver5.setMaxRefs(25);
+         queue5.deliver(false);
          Thread.sleep(1000);
          assertEquals(NUM_MESSAGES - 25, queue5.memoryRefCount());
          assertEquals(25, queue5.getDeliveringCount());
          acknowledgeAll(receiver5);
          receiver5.setMaxRefs(0);
-         
+
          Thread.sleep(1000);
-         
+
          assertTrue(office1.getHoldingTransactions().isEmpty());
          assertTrue(office2.getHoldingTransactions().isEmpty());
          assertTrue(office3.getHoldingTransactions().isEmpty());
          assertTrue(office4.getHoldingTransactions().isEmpty());
          assertTrue(office5.getHoldingTransactions().isEmpty());
-         
-         log.info("Here are the sizes 2:");         
+
+         log.info("Here are the sizes 2:");
          log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
          log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
          log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
          log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-     
+
          //Consume the rest from queue 5
          receiver5.setMaxRefs(NUM_MESSAGES - 25);
          queue5.deliver(false);
-         Thread.sleep(5000);         
-         
+         Thread.sleep(5000);
+
          log.info("receiver5 msgs:" + receiver5.getMessages().size());
-         
-         log.info("Here are the sizes 3:");         
+
+         log.info("Here are the sizes 3:");
          log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
          log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
          log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
          log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-         
+
          //This will result in an extra one being pulled from queue1 - we cannot avoid this
          //This is because the channel does not know that the receiver is full unless it tries
          //with a ref so it needs to retrieve one
-     
+
          assertEquals(NUM_MESSAGES - 6, queue1.memoryRefCount());
          assertEquals(0, queue1.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
          assertEquals(0, queue2.getDeliveringCount());
-          
+
          assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
          assertEquals(0, queue3.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
          assertEquals(0, queue4.getDeliveringCount());
-         
-         assertEquals(1, queue5.memoryRefCount());         
+
+         assertEquals(1, queue5.memoryRefCount());
          assertEquals(NUM_MESSAGES - 25, queue5.getDeliveringCount());
-         
+
          acknowledgeAll(receiver5);
-         
+
          assertEquals(0, queue5.getDeliveringCount());
-         
+
          receiver5.setMaxRefs(0);
-         
+
          assertTrue(office1.getHoldingTransactions().isEmpty());
          assertTrue(office2.getHoldingTransactions().isEmpty());
          assertTrue(office3.getHoldingTransactions().isEmpty());
          assertTrue(office4.getHoldingTransactions().isEmpty());
          assertTrue(office5.getHoldingTransactions().isEmpty());
-         
+
          //Now consume 5 more from queue5, they should come from queue1 which has the most messages
-         
+
          log.info("Consume 5 more from queue 5");
-         
+
          receiver5.setMaxRefs(5);
          queue5.deliver(false);
          Thread.sleep(5000);
-           
-         log.info("Here are the sizes 4:");         
+
+         log.info("Here are the sizes 4:");
          log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
          log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
          log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
          log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
-          
+
          assertEquals(0, queue1.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
          assertEquals(0, queue2.getDeliveringCount());
-          
+
          assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
          assertEquals(0, queue3.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
          assertEquals(0, queue4.getDeliveringCount());
-         
-         assertEquals(1, queue5.memoryRefCount());          
+
+         assertEquals(1, queue5.memoryRefCount());
          assertEquals(5, queue5.getDeliveringCount());
-               
+
          acknowledgeAll(receiver5);
 
          assertEquals(0, queue5.getDeliveringCount());
-         
+
          receiver1.setMaxRefs(0);
-           
+
          assertTrue(office1.getHoldingTransactions().isEmpty());
          assertTrue(office2.getHoldingTransactions().isEmpty());
          assertTrue(office3.getHoldingTransactions().isEmpty());
          assertTrue(office4.getHoldingTransactions().isEmpty());
          assertTrue(office5.getHoldingTransactions().isEmpty());
-          
+
          //Consume 1 more - should pull one from queue2
-         
+
          receiver5.setMaxRefs(1);
          queue5.deliver(false);
          Thread.sleep(2000);
-          
-         log.info("Here are the sizes 5:");         
+
+         log.info("Here are the sizes 5:");
          log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
          log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
          log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
          log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
          assertEquals(0, queue1.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES - 11, queue2.memoryRefCount());
          assertEquals(0, queue2.getDeliveringCount());
-          
+
          assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
          assertEquals(0, queue3.getDeliveringCount());
-         
+
          assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
          assertEquals(0, queue4.getDeliveringCount());
-         
-         assertEquals(1, queue5.memoryRefCount());          
+
+         assertEquals(1, queue5.memoryRefCount());
          assertEquals(1, queue5.getDeliveringCount());
-                  
+
          acknowledgeAll(receiver5);
-         
+
          assertEquals(0, queue5.getDeliveringCount());
-         
+
          receiver5.setMaxRefs(0);
-         
+
          assertTrue(office1.getHoldingTransactions().isEmpty());
          assertTrue(office2.getHoldingTransactions().isEmpty());
          assertTrue(office3.getHoldingTransactions().isEmpty());
          assertTrue(office4.getHoldingTransactions().isEmpty());
          assertTrue(office5.getHoldingTransactions().isEmpty());
-         
+
          //From queue 4 consume everything else
-         
+
          receiver4.setMaxRefs(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1);
          queue4.deliver(false);
          Thread.sleep(7000);
-         
-         log.info("Here are the sizes 6:");         
+
+         log.info("Here are the sizes 6:");
          log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
          log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
          log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
          log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
          log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-         
+
          assertEquals(0, queue1.memoryRefCount());
          assertEquals(0, queue1.getDeliveringCount());
-         
+
          assertEquals(0, queue2.memoryRefCount());
          assertEquals(0, queue2.getDeliveringCount());
-          
+
          assertEquals(0, queue3.memoryRefCount());
          assertEquals(0, queue3.getDeliveringCount());
-         
+
          assertEquals(0, queue4.memoryRefCount());
          assertEquals(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1, queue4.getDeliveringCount());
-         
-         assertEquals(0, queue5.memoryRefCount());          
+
+         assertEquals(0, queue5.memoryRefCount());
          assertEquals(0, queue5.getDeliveringCount());
-                  
+
          acknowledgeAll(receiver4);
-         
+
          assertEquals(0, queue4.getDeliveringCount());
-         
+
          assertTrue(office1.getHoldingTransactions().isEmpty());
          assertTrue(office2.getHoldingTransactions().isEmpty());
          assertTrue(office3.getHoldingTransactions().isEmpty());
          assertTrue(office4.getHoldingTransactions().isEmpty());
          assertTrue(office5.getHoldingTransactions().isEmpty());
-         
+
          if (checkNoMessageData())
          {
             fail("Message data still in database");
          }
       }
       finally
-      { 
+      {
          if (office1 != null)
          {
             office1.stop();
          }
-         
+
          if (office2 != null)
-         {            
+         {
             office2.stop();
          }
-         
+
          if (office3 != null)
          {
             office3.stop();
          }
-         
+
          if (office4 != null)
-         {            
+         {
             office4.stop();
          }
-         
+
          if (office5 != null)
          {
             office5.stop();
          }
       }
-   }      
-   
+   }
+
    class ThrottleReceiver implements Receiver, Runnable
    {
       long pause;
-      
+
       volatile int totalCount;
-      
+
       int count;
-      
+
       int maxSize;
-      
+
       volatile boolean full;
-      
+
       Executor executor;
-      
+
       List dels;
-      
+
       Channel queue;
-      
+
       int getTotalCount()
       {
          return totalCount;
       }
-      
+
       ThrottleReceiver(Channel queue, long pause, int maxSize)
       {
          this.queue = queue;
-         
+
          this.pause = pause;
-         
+
          this.maxSize = maxSize;
-         
+
          this.executor = new QueuedExecutor();
-         
+
          this.dels = new ArrayList();
       }
 
@@ -1061,25 +1060,25 @@
          {
             return null;
          }
-         
+
          //log.info(this + " got ref");
-         
+
          //log.info("cnt:" + totalCount);
-         
+
          SimpleDelivery del = new SimpleDelivery(observer, reference);
-         
+
          dels.add(del);
-         
+
          count++;
-         
+
          totalCount++;
-         
+
          if (count == maxSize)
          {
             full = true;
-            
+
             count = 0;
-            
+
             try
             {
                executor.execute(this);
@@ -1089,15 +1088,15 @@
                //Ignore
             }
          }
-         
+
          return del;
-          
+
       }
-      
+
       public void run()
       {
          //Simulate processing of messages
-         
+
          try
          {
             Thread.sleep(pause);
@@ -1106,13 +1105,13 @@
          {
             //Ignore
          }
-         
+
          Iterator iter = dels.iterator();
-         
+
          while (iter.hasNext())
          {
-            Delivery del = (Delivery)iter.next();
-            
+            Delivery del = (Delivery) iter.next();
+
             try
             {
                del.acknowledge(null);
@@ -1122,63 +1121,63 @@
                //Ignore
             }
          }
-         
+
          dels.clear();
-         
+
          full = false;
-         
+
          queue.deliver(false);
       }
-      
+
    }
-   
+
    private void acknowledgeAll(SimpleReceiver receiver) throws Throwable
    {
       List messages = receiver.getMessages();
-      
+
       Iterator iter = messages.iterator();
-      
+
       while (iter.hasNext())
       {
-         Message msg = (Message)iter.next();
-         
+         Message msg = (Message) iter.next();
+
          receiver.acknowledge(msg, null);
       }
-      
+
       receiver.clear();
    }
-   
-      
+
+
    protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
    {
       MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
-       
+
       FilterFactory ff = new SimpleFilterFactory();
-      
+
       ClusterRouterFactory rf = new DefaultRouterFactory();
-      
+
       FailoverMapper mapper = new DefaultFailoverMapper();
-      
-      ConditionFactory cf = new SimpleConditionFactory();           
-      
-      DefaultClusteredPostOffice postOffice = 
+
+      ConditionFactory cf = new SimpleConditionFactory();
+
+      DefaultClusteredPostOffice postOffice =
          new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
-                                 sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
-                                 "Clustered", ms, pm, tr, ff, cf, pool,
-                                 groupName,
-                                 JGroupsUtil.getControlStackProperties(),
-                                 JGroupsUtil.getDataStackProperties(),
-                                 10000, 10000, pullPolicy, rf, mapper, 1000);
-      
-      postOffice.start();      
-      
+            sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
+            "Clustered", ms, pm, tr, ff, cf, pool,
+            groupName,
+            new NameChannelFactory(JGroupsUtil.getControlStackProperties(),
+               JGroupsUtil.getDataStackProperties()),
+            10000, 10000, pullPolicy, rf, mapper, 1000);
+
+      postOffice.start();
+
       return postOffice;
    }
-   
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
-   
+
 }
 
 

Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -412,6 +412,13 @@
          sb.append("-cp").append(" \"").append(classPath).append("\" ");
       }
 
+      // As there is a problem with Multicast and JGroups on Linux (in certain JDKs)
+      // The stack introduced by multiplexor might fail under Linux if we don't have this
+      if (System.getProperty("os.name").equals("Linux"))
+      {
+         sb.append(" -Djava.net.preferIPv4Stack=true ");
+      }
+
       sb.append("org.jboss.test.messaging.tools.jmx.rmi.RMITestServer");
 
       String commandLine = sb.toString();

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2007-01-04 22:02:16 UTC (rev 1892)
@@ -297,6 +297,33 @@
             throw new Exception("Cannot find " + mainConfigFile + " in the classpath");
          }
 
+         if (clustered)
+         {
+            log.info("Starting multiplexer");
+            String multiplexerConfigFile = "server/default/deploy/multiplexer-service.xml";
+            URL multiplexerCofigURL = getClass().getClassLoader().getResource(multiplexerConfigFile);
+            if (multiplexerCofigURL == null)
+            {
+               throw new Exception("Cannot find " + multiplexerCofigURL + " in the classpath");
+            }
+            ServiceDeploymentDescriptor multiplexerDD = new ServiceDeploymentDescriptor(multiplexerCofigURL);
+            List services = multiplexerDD.query("name","Multiplexer");
+            if (services.isEmpty())
+            {
+               log.info("Couldn't find multiplexer config");
+            }
+            else
+            {
+               log.info("Could find multiplexer config");
+            }
+
+            MBeanConfigurationElement multiplexerConfig = (MBeanConfigurationElement) services.iterator().next();
+            ObjectName nameMultiplexer = sc.registerAndConfigureService(multiplexerConfig);
+            sc.invoke(nameMultiplexer,"create", new Object[0], new String[0]);
+            sc.invoke(nameMultiplexer,"start", new Object[0], new String[0]);
+
+         }
+
          String databaseType = sc.getDatabaseType();
          String persistenceConfigFile;
 




More information about the jboss-cvs-commits mailing list