[jboss-cvs] JBoss Messaging SVN: r1892 - in trunk: src/etc src/etc/META-INF src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory tests tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/tools tests/src/org/jboss/test/messaging/tools/jmx/rmi
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 4 17:02:26 EST 2007
Author: clebert.suconic at jboss.com
Date: 2007-01-04 17:02:16 -0500 (Thu, 04 Jan 2007)
New Revision: 1892
Added:
trunk/src/etc/META-INF/
trunk/src/etc/META-INF/multiplexer-stacks.xml
trunk/src/etc/server/default/deploy/multiplexer-service.xml
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/ChannelFactory.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/MultiplexorChannelFactory.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/NameChannelFactory.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/XMLChannelFactory.java
Modified:
trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-578 - I have created an intrerface ChannelFactory to delegate creationg of JGroups channels according to configuration options.
Our configuration files will aways have a reference to the multiplexor but if the multiplexor is not available we will create JChannel as we always used to do (using XML elements on the constructor).
Added: trunk/src/etc/META-INF/multiplexer-stacks.xml
===================================================================
--- trunk/src/etc/META-INF/multiplexer-stacks.xml 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/etc/META-INF/multiplexer-stacks.xml 2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,330 @@
+
+<!--
+ Sample file that defines a number of stacks, used by the multiplexer
+ Author: Bela Ban
+ Version: $Id: multiplexer-stacks.xml 56961 2006-09-19 03:55:11 +0000 (Tue, 19 Sep 2006) bstansberry at jboss.com $
+-->
+<protocol_stacks>
+ <stack name="udp"
+ description="Default: IP multicast based stack, with flow control and message bundling">
+ <config>
+ <UDP
+ mcast_port="${jgroups.udp.mcast_port:45688}"
+ mcast_addr="${jgroups.udp.mcast_addr:228.11.11.11}"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="false"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ use_outgoing_packet_handler="false"
+ ip_ttl="${jgroups.udp.ip_ttl:2}"
+ down_thread="false" up_thread="false"
+ enable_bundling="true"/>
+ <PING timeout="2000"
+ down_thread="false" up_thread="false" num_initial_members="3"/>
+ <MERGE2 max_interval="100000"
+ down_thread="false" up_thread="false" min_interval="20000"/>
+ <FD_SOCK down_thread="false" up_thread="false"/>
+ <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="60000"
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ down_thread="false" up_thread="false"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200,2400,3600"
+ down_thread="false" up_thread="false"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ down_thread="false" up_thread="false"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ down_thread="false" up_thread="false"
+ join_retry_timeout="2000" shun="true"
+ view_bundling="true"/>
+ <FC max_credits="2000000" down_thread="false" up_thread="false"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+ use_flush="true" use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+ <pbcast.FLUSH down_thread="false" up_thread="false"/>
+ </config>
+ </stack>
+
+
+ <stack name="udp-sync"
+ description="IP multicast based stack, without flow control and without message bundling. This should be used
+ instead of udp if (1) synchronous calls are used and (2) the message volume (rate and size)
+ is not that large. Don't use this configuration if you send messages at a high sustained rate, or you might
+ run out of memory">
+ <config>
+ <UDP
+ mcast_port="${jgroups.udp.mcast_port:45699}"
+ mcast_addr="${jgroups.udp.mcast_addr:229.11.11.11}"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="false"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ use_outgoing_packet_handler="false"
+ ip_ttl="${jgroups.udp.ip_ttl:2}"
+ down_thread="false" up_thread="false"
+ enable_bundling="false"/>
+ <PING timeout="2000"
+ down_thread="false" up_thread="false" num_initial_members="3"/>
+ <MERGE2 max_interval="100000"
+ down_thread="false" up_thread="false" min_interval="20000"/>
+ <FD_SOCK down_thread="false" up_thread="false"/>
+ <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="60000"
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ down_thread="false" up_thread="false"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200,2400,3600"
+ down_thread="false" up_thread="false"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ down_thread="false" up_thread="false"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ down_thread="false" up_thread="false"
+ join_retry_timeout="2000" shun="true"
+ view_bundling="true"/>
+ <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+ <!--pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+ use_flush="true" use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+ <pbcast.FLUSH down_thread="false" up_thread="false"/>
+ </config>
+ </stack>
+
+
+ <stack name="tcp"
+ description="TCP based stack, with flow control and message bundling. This is usually used when IP
+ multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast)">
+ <config>
+ <TCP start_port="7600"
+ loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ use_outgoing_packet_handler="false"
+ down_thread="false" up_thread="false"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"/>
+ <TCPPING timeout="3000"
+ down_thread="false" up_thread="false"
+ initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7600],localhost[7601]}"
+ port_range="1"
+ num_initial_members="3"/>
+ <MERGE2 max_interval="100000"
+ down_thread="false" up_thread="false" min_interval="20000"/>
+ <FD_SOCK down_thread="false" up_thread="false"/>
+ <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="60000"
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ down_thread="false" up_thread="false"
+ discard_delivered_msgs="true"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ down_thread="false" up_thread="false"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ down_thread="false" up_thread="false"
+ join_retry_timeout="2000" shun="true"
+ view_bundling="true"/>
+ <FC max_credits="2000000" down_thread="false" up_thread="false"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+ use_flush="true" use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+ <pbcast.FLUSH down_thread="false" up_thread="false"/>
+ </config>
+ </stack>
+
+
+ <stack name="tcp-sync"
+ description="TCP based stack, without flow control and without message bundling. This is usually used when IP
+ multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast). This
+ configuration should be used instead of tcp when (1) synchronous calls are used and (2) the message volume
+ (rate and size) is not that large">
+ <config>
+ <TCP start_port="7650"
+ loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ use_outgoing_packet_handler="false"
+ down_thread="false" up_thread="false"
+ enable_bundling="false"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"/>
+ <TCPPING timeout="3000"
+ down_thread="false" up_thread="false"
+ initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7650],localhost[7651]}"
+ port_range="1"
+ num_initial_members="3"/>
+ <MERGE2 max_interval="100000"
+ down_thread="false" up_thread="false" min_interval="20000"/>
+ <FD_SOCK down_thread="false" up_thread="false"/>
+ <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="60000"
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ down_thread="false" up_thread="false"
+ discard_delivered_msgs="true"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ down_thread="false" up_thread="false"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ down_thread="false" up_thread="false"
+ join_retry_timeout="2000" shun="true"
+ view_bundling="true"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+ use_flush="true" use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+ <pbcast.FLUSH down_thread="false" up_thread="false"/>
+ </config>
+ </stack>
+
+
+ <stack name="tcp_nio"
+ description="TCP_NIO based stack, with flow control and message bundling. This is usually used when IP
+ multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast)">
+ <config>
+ <TCP_NIO
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ loopback="false"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ use_outgoing_packet_handler="false"
+ down_thread="false" up_thread="false"
+ enable_bundling="true"
+ start_port="7700"
+ use_send_queues="false"
+ sock_conn_timeout="300" skip_suspected_members="true"
+ reader_threads="8"
+ writer_threads="8"
+ processor_threads="8"
+ processor_minThreads="8"
+ processor_maxThreads="8"
+ processor_queueSize="100"
+ processor_keepAliveTime="-1"/>
+ <TCPPING timeout="3000"
+ initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7700],localhost[7701]}"
+ port_range="1"
+ num_initial_members="3"
+ down_thread="false" up_thread="false"/>
+ <MERGE2 max_interval="100000"
+ down_thread="false" up_thread="false" min_interval="20000"/>
+ <FD_SOCK down_thread="false" up_thread="false"/>
+ <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="60000"
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ down_thread="false" up_thread="false"
+ discard_delivered_msgs="true"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ down_thread="false" up_thread="false"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ down_thread="false" up_thread="false"
+ join_retry_timeout="2000" shun="true"
+ view_bundling="true"/>
+ <FC max_credits="2000000" down_thread="false" up_thread="false"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+ use_flush="true" use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+ <pbcast.FLUSH down_thread="false" up_thread="false"/>
+ </config>
+ </stack>
+
+
+ <stack name="tcp_nio-sync"
+ description="TCP_NIO based stack, with flow control and message bundling. This is usually used when IP
+ multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast). This
+ configuration should be used instead of tcp when (1) synchronous calls are used and (2) the message volume
+ (rate and size) is not that large">
+ <config>
+ <TCP_NIO
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ loopback="false"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ use_outgoing_packet_handler="false"
+ down_thread="false" up_thread="false"
+ enable_bundling="false"
+ start_port="7750"
+ use_send_queues="false"
+ sock_conn_timeout="300" skip_suspected_members="true"
+ reader_threads="8"
+ writer_threads="8"
+ processor_threads="8"
+ processor_minThreads="8"
+ processor_maxThreads="8"
+ processor_queueSize="100"
+ processor_keepAliveTime="-1"/>
+ <TCPPING timeout="3000"
+ initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7750],localhost[7751]}"
+ port_range="1"
+ num_initial_members="3"
+ down_thread="false" up_thread="false"/>
+ <MERGE2 max_interval="100000"
+ down_thread="false" up_thread="false" min_interval="20000"/>
+ <FD_SOCK down_thread="false" up_thread="false"/>
+ <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="60000"
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ down_thread="false" up_thread="false"
+ discard_delivered_msgs="true"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ down_thread="false" up_thread="false"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+ down_thread="false" up_thread="false"
+ join_retry_timeout="2000" shun="true"
+ view_bundling="true"/>
+ <!-- pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
+ use_flush="true" use_reading_thread="true"/ -->
+ <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="false"/>
+ <pbcast.FLUSH down_thread="false" up_thread="false"/>
+ </config>
+ </stack>
+
+</protocol_stacks>
+
+
Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2007-01-04 22:02:16 UTC (rev 1892)
@@ -82,6 +82,12 @@
<attribute name="MessagePullPolicy">org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy</attribute>
<attribute name="ClusterRouterFactory">org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory</attribute>
+
+ <attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>
+ <attribute name="SyncChannelName">udp-sync</attribute>
+ <attribute name="AsyncChannelName">udp</attribute>
+ <attribute name="ChannelPartitionName">${jboss.partition.name:DefaultPartition}-JMS</attribute>
+
<attribute name="AsyncChannelConfig">
<config>
<UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
Added: trunk/src/etc/server/default/deploy/multiplexer-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/multiplexer-service.xml 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/etc/server/default/deploy/multiplexer-service.xml 2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE server
+ PUBLIC "-//JBoss//DTD MBean Service 3.2//EN"
+ "http://www.jboss.org/j2ee/dtd/jboss-service_3_2.dtd">
+
+
+<server>
+
+
+ <!--
+ An example for setting up the JChannelFactory MBean
+ Author: Bela Ban
+ Version: $Id$
+ -->
+ <mbean code="org.jgroups.jmx.JChannelFactory" name="jgroups.mux:name=Multiplexer">
+ <!--mbean code="org.jgroups.JChannelFactory" name="jgroups.mux:name=Multiplexer" xmbean-dd="resource:META-INF/multiplexer-xmbean.xml" -->
+ <attribute name="Domain">jgroups.mux</attribute>
+ <attribute name="MultiplexerConfig">META-INF/multiplexer-stacks.xml</attribute>
+ <attribute name="ExposeChannels">true</attribute>
+ <attribute name="ExposeProtocols">true</attribute>
+
+ <!-- The address used to determine the node name -->
+ <!-- <attribute name="NodeAddress">${jboss.bind.address}</attribute> -->
+
+ </mbean>
+
+
+</server>
Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2007-01-04 22:02:16 UTC (rev 1892)
@@ -111,8 +111,32 @@
<description>The ObjectName of the server peer this destination was deployed on</description>
<name>ServerPeer</name>
<type>javax.management.ObjectName</type>
- </attribute>
+ </attribute>
+ <attribute access="read-write" getMethod="getChannelFactoryName" setMethod="setChannelFactoryName">
+ <description>The ObjectName of the JGroups Multiplexer used to create JChannels</description>
+ <name>ChannelFactoryName</name>
+ <type>javax.management.ObjectName</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getSyncChannelName" setMethod="setSyncChannelName">
+ <description>The name of the stack used on multiplexer for Sync Channels</description>
+ <name>SyncChannelName</name>
+ <type>java.lang.String</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getAsyncChannelName" setMethod="setAsyncChannelName">
+ <description>The name of the stack used on multiplexer for Async Channels</description>
+ <name>AsyncChannelName</name>
+ <type>java.lang.String</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getChannelPartitionName" setMethod="setChannelPartitionName">
+ <description>The partition name used to identify JMS/Postoffice on the JGroups Multiplexor</description>
+ <name>ChannelPartitionName</name>
+ <type>java.lang.String</type>
+ </attribute>
+
<!-- Managed operations -->
<operation>
Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -43,6 +43,9 @@
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.Peer;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.ChannelFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.MultiplexorChannelFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.XMLChannelFactory;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.w3c.dom.Element;
@@ -71,8 +74,13 @@
private boolean started;
+ // This group of properties is used on JGroups Channel configuration
private Element syncChannelConfig;
private Element asyncChannelConfig;
+ private ObjectName channelFactoryName;
+ private String syncChannelName;
+ private String asyncChannelName;
+ private String channelPartitionName;
private ObjectName serverPeerObjectName;
@@ -160,6 +168,47 @@
this.officeName = name;
}
+
+ public ObjectName getChannelFactoryName()
+ {
+ return channelFactoryName;
+ }
+
+ public void setChannelFactoryName(ObjectName channelFactoryName)
+ {
+ this.channelFactoryName = channelFactoryName;
+ }
+
+ public String getSyncChannelName()
+ {
+ return syncChannelName;
+ }
+
+ public void setSyncChannelName(String syncChannelName)
+ {
+ this.syncChannelName = syncChannelName;
+ }
+
+ public String getAsyncChannelName()
+ {
+ return asyncChannelName;
+ }
+
+ public void setAsyncChannelName(String asyncChannelName)
+ {
+ this.asyncChannelName = asyncChannelName;
+ }
+
+ public String getChannelPartitionName()
+ {
+ return channelPartitionName;
+ }
+
+ public void setChannelPartitionName(String channelPartitionName)
+ {
+ this.channelPartitionName = channelPartitionName;
+ }
+
public void setSyncChannelConfig(Element config) throws Exception
{
syncChannelConfig = config;
@@ -282,13 +331,43 @@
FilterFactory ff = new SelectorFactory();
FailoverMapper mapper = new DefaultFailoverMapper();
+ ChannelFactory channelFactory = null;
+
+ if (this.channelFactoryName != null)
+ {
+ Object info = null;
+ try
+ {
+ info = server.getMBeanInfo(channelFactoryName);
+ }
+ catch (Exception e)
+ {
+ log.error("Error", e);
+ // noop... means we couldn't find the channel hence we should use regular XMLChannelFactories
+ }
+ if (info!=null)
+ {
+ log.debug("*********************************** Using Multiplexor");
+ channelFactory = new MultiplexorChannelFactory(server, channelFactoryName, channelPartitionName, syncChannelName, asyncChannelName);
+ }
+ else
+ {
+ log.debug("*********************************** Using XMLChannelFactory");
+ channelFactory = new XMLChannelFactory(syncChannelConfig, asyncChannelConfig);
+ }
+ }
+ else
+ {
+ log.debug("*********************************** Using XMLChannelFactory");
+ channelFactory = new XMLChannelFactory(syncChannelConfig, asyncChannelConfig);
+ }
+
postOffice = new DefaultClusteredPostOffice(ds, tm, sqlProperties,
createTablesOnStartup,
nodeId, officeName, ms,
pm, tr, ff, cf, pool,
groupName,
- syncChannelConfig,
- asyncChannelConfig,
+ channelFactory,
stateTimeout, castTimeout,
pullPolicy, rf,
mapper,
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -68,12 +68,12 @@
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.DefaultBinding;
import org.jboss.messaging.core.plugin.postoffice.DefaultPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.ChannelFactory;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.util.StreamUtils;
import org.jgroups.Address;
import org.jgroups.Channel;
-import org.jgroups.JChannel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.MessageListener;
@@ -82,7 +82,6 @@
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;
-import org.w3c.dom.Element;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -163,12 +162,10 @@
private boolean started;
- private Element syncChannelConfigElement;
- private String syncChannelConfig;
+ private ChannelFactory channelFactory;
+
private Channel syncChannel;
- private Element asyncChannelConfigElement;
- private String asyncChannelConfig;
private Channel asyncChannel;
private MessageDispatcher controlMessageDispatcher;
@@ -233,8 +230,7 @@
ConditionFactory conditionFactory,
QueuedExecutorPool pool,
String groupName,
- Element syncChannelConfig,
- Element asyncChannelConfig,
+ ChannelFactory channelFactory,
long stateTimeout, long castTimeout,
MessagePullPolicy redistributionPolicy,
ClusterRouterFactory rf,
@@ -242,65 +238,6 @@
long statsSendPeriod)
throws Exception
{
- this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
- pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout,
- redistributionPolicy, rf, failoverMapper, statsSendPeriod);
-
- this.syncChannelConfigElement = syncChannelConfig;
- this.asyncChannelConfigElement = asyncChannelConfig;
- }
-
- /*
- * Constructor using String for configuration
- */
- public DefaultClusteredPostOffice(DataSource ds,
- TransactionManager tm,
- Properties sqlProperties,
- boolean createTablesOnStartup,
- int nodeId,
- String officeName,
- MessageStore ms,
- PersistenceManager pm,
- TransactionRepository tr,
- FilterFactory filterFactory,
- ConditionFactory conditionFactory,
- QueuedExecutorPool pool,
- String groupName,
- String syncChannelConfig,
- String asyncChannelConfig,
- long stateTimeout, long castTimeout,
- MessagePullPolicy redistributionPolicy,
- ClusterRouterFactory rf,
- FailoverMapper failoverMapper,
- long statsSendPeriod) throws Exception
- {
- this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
- pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout,
- redistributionPolicy, rf, failoverMapper, statsSendPeriod);
-
- this.syncChannelConfig = syncChannelConfig;
- this.asyncChannelConfig = asyncChannelConfig;
- }
-
- private DefaultClusteredPostOffice(DataSource ds,
- TransactionManager tm,
- Properties sqlProperties,
- boolean createTablesOnStartup,
- int nodeId,
- String officeName,
- MessageStore ms,
- PersistenceManager pm,
- TransactionRepository tr,
- FilterFactory filterFactory,
- ConditionFactory conditionFactory,
- QueuedExecutorPool pool,
- String groupName,
- long stateTimeout, long castTimeout,
- MessagePullPolicy redistributionPolicy,
- ClusterRouterFactory rf,
- FailoverMapper failoverMapper,
- long statsSendPeriod)
- {
super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
filterFactory, conditionFactory, pool);
@@ -335,6 +272,8 @@
nbSupport = new NotificationBroadcasterSupport();
viewExecutor = new QueuedExecutor();
+
+ this.channelFactory = channelFactory;
}
// MessagingComponent overrides ----------------------------------
@@ -348,16 +287,8 @@
if (trace) { log.trace(this + " starting"); }
- if (syncChannelConfigElement != null)
- {
- this.syncChannel = new JChannel(syncChannelConfigElement);
- this.asyncChannel = new JChannel(asyncChannelConfigElement);
- }
- else
- {
- this.syncChannel = new JChannel(syncChannelConfig);
- this.asyncChannel = new JChannel(asyncChannelConfig);
- }
+ this.syncChannel = channelFactory.createSyncChannel();
+ this.asyncChannel = channelFactory.createASyncChannel();
// We don't want to receive local messages on any of the channels
syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/ChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/ChannelFactory.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/ChannelFactory.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,15 @@
+package org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory;
+
+import org.jgroups.JChannel;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ * <p/>
+ * $Id$
+ */
+public interface ChannelFactory
+{
+ public JChannel createSyncChannel() throws Exception;
+ public JChannel createASyncChannel() throws Exception;
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/MultiplexorChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/MultiplexorChannelFactory.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/MultiplexorChannelFactory.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,138 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.jgroups.JChannel;
+
+/**
+ * A ChannelFactory that will use the MBean JChannelFactory interface
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ * <p/>
+ * $Id$
+ */
+public class MultiplexorChannelFactory implements ChannelFactory
+{
+
+ // Constants
+ private static final String[] MUX_SIGNATURE = new String[]{"java.lang.String", "java.lang.String", "boolean", "java.lang.String"};
+
+ // Attributes
+ MBeanServer server;
+ ObjectName channelFactory;
+ String asyncStack;
+ String syncStack;
+ String uniqueID;
+ private static final String MUX_OPERATION = "createMultiplexerChannel";
+
+ // Static
+
+ // Constructors
+
+ public MultiplexorChannelFactory(MBeanServer server,
+ ObjectName channelFactory,
+ String uniqueID,
+ String syncStack,
+ String asyncStack)
+ {
+ this.server = server;
+ this.channelFactory = channelFactory;
+ this.uniqueID = uniqueID;
+ this.asyncStack = asyncStack;
+ this.syncStack = syncStack;
+ }
+
+ // Public
+
+ public MBeanServer getServer()
+ {
+ return server;
+ }
+
+ public void setServer(MBeanServer server)
+ {
+ this.server = server;
+ }
+
+ public ObjectName getChannelFactory()
+ {
+ return channelFactory;
+ }
+
+ public void setChannelFactory(ObjectName channelFactory)
+ {
+ this.channelFactory = channelFactory;
+ }
+
+ public String getAsyncStack()
+ {
+ return asyncStack;
+ }
+
+ public void setAsyncStack(String asyncStack)
+ {
+ this.asyncStack = asyncStack;
+ }
+
+ public String getSyncStack()
+ {
+ return syncStack;
+ }
+
+ public void setSyncStack(String syncStack)
+ {
+ this.syncStack = syncStack;
+ }
+
+ public String getUniqueID()
+ {
+ return uniqueID;
+ }
+
+ public void setUniqueID(String uniqueID)
+ {
+ this.uniqueID = uniqueID;
+ }
+
+ public JChannel createSyncChannel() throws Exception
+ {
+ return (JChannel) server.invoke(this.channelFactory, MUX_OPERATION, new Object[]{syncStack, uniqueID, Boolean.TRUE, uniqueID}, MUX_SIGNATURE);
+ }
+
+ public JChannel createASyncChannel() throws Exception
+ {
+ return (JChannel) server.invoke(this.channelFactory, MUX_OPERATION, new Object[]{asyncStack, uniqueID, Boolean.TRUE, uniqueID}, MUX_SIGNATURE);
+ }
+
+ // Package protected
+
+ // Protected
+
+ // Private
+
+ // Inner classes
+
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/NameChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/NameChannelFactory.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/NameChannelFactory.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,96 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory;
+
+import org.jgroups.JChannel;
+
+/**
+ * A ChannelFactory that will use config names (from channelfactory)
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ * <p/>
+ * $Id$
+ */
+public class NameChannelFactory implements ChannelFactory
+{
+
+ // Constants
+
+ // Attributes
+
+ String asyncConfig;
+ String syncConfig;
+
+ // Static
+
+ // Constructors
+
+ public NameChannelFactory(String syncConfig, String asyncConfig)
+ {
+ this.syncConfig = syncConfig;
+ this.asyncConfig = asyncConfig;
+ }
+
+ // Public
+
+ public String getAsyncConfig()
+ {
+ return asyncConfig;
+ }
+
+ public void setAsyncConfig(String asyncConfig)
+ {
+ this.asyncConfig = asyncConfig;
+ }
+
+ public String getSyncConfig()
+ {
+ return syncConfig;
+ }
+
+ public void setSyncConfig(String syncConfig)
+ {
+ this.syncConfig = syncConfig;
+ }
+
+ // ChannelFactory implementation
+
+ public JChannel createSyncChannel() throws Exception
+ {
+ return new JChannel(syncConfig);
+ }
+
+ public JChannel createASyncChannel() throws Exception
+ {
+ return new JChannel(asyncConfig);
+ }
+
+ // Package protected
+
+ // Protected
+
+ // Private
+
+ // Inner classes
+
+}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/XMLChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/XMLChannelFactory.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/channelfactory/XMLChannelFactory.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -0,0 +1,95 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory;
+
+import org.w3c.dom.Element;
+import org.jgroups.JChannel;
+
+/**
+ * A ChannelFactory that will use Elements to create channels.
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ * <p/>
+ * $Id:$
+ */
+public class XMLChannelFactory implements ChannelFactory
+{
+
+ // Constants
+
+ // Attributes
+ Element syncConfig;
+ Element asyncConfig;
+
+ // Static
+
+ // Constructors
+
+ public XMLChannelFactory(Element syncConfig, Element asyncConfig)
+ {
+ this.syncConfig = syncConfig;
+ this.asyncConfig = asyncConfig;
+ }
+
+ // Public
+
+ public Element getSyncConfig()
+ {
+ return syncConfig;
+ }
+
+ public void setSyncConfig(Element syncConfig)
+ {
+ this.syncConfig = syncConfig;
+ }
+
+ public Element getAsyncConfig()
+ {
+ return asyncConfig;
+ }
+
+ public void setAsyncConfig(Element asyncConfig)
+ {
+ this.asyncConfig = asyncConfig;
+ }
+
+ // implementation of ChannelFactory
+ public JChannel createSyncChannel() throws Exception
+ {
+ return new JChannel(syncConfig);
+ }
+
+ public JChannel createASyncChannel() throws Exception
+ {
+ return new JChannel(asyncConfig);
+ }
+
+ // Package protected
+
+ // Protected
+
+ // Private
+
+ // Inner classes
+
+}
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/build.xml 2007-01-04 22:02:16 UTC (rev 1892)
@@ -287,6 +287,7 @@
<sysproperty key="test.database" value="${functional.tests.database}"/>
<sysproperty key="test.serialization" value="${functional.tests.serialization}"/>
<sysproperty key="test.remoting" value="${test.remoting}"/>
+ <sysproperty key="java.net.preferIPv4Stack" value="true"/>
<!--
<jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=rmiserver"/>
-->
@@ -305,6 +306,7 @@
<sysproperty key="test.database" value="${clustering.tests.database}"/>
<sysproperty key="test.serialization" value="${functional.tests.serialization}"/>
<sysproperty key="test.clustered" value="true"/>
+ <sysproperty key="java.net.preferIPv4Stack" value="true"/>
<!--
<jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=rmiserver"/>
-->
@@ -783,6 +785,7 @@
<sysproperty key="test.clustered" value="true"/>
<sysproperty key="test.logfile.suffix" value="clustering-client"/>
<jvmarg value="-Xmx512M"/>
+ <jvmarg value="-Djava.net.preferIPv4Stack=true"/>
<!--
<jvmarg line="-Xmx512M -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=n,suspend=n,address=antjunit"/>
-->
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -40,6 +40,7 @@
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.NameChannelFactory;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.test.messaging.core.SimpleCondition;
import org.jboss.test.messaging.core.SimpleConditionFactory;
@@ -2275,16 +2276,16 @@
FailoverMapper mapper = new DefaultFailoverMapper();
ConditionFactory cf = new SimpleConditionFactory();
-
- DefaultClusteredPostOffice postOffice =
+
+ DefaultClusteredPostOffice postOffice =
new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
- "Clustered", ms, pm, tr, ff, cf, pool,
- groupName,
- JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties(),
- 5000, 5000, pullPolicy, rf, mapper, 1000);
-
+ sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
+ "Clustered", ms, pm, tr, ff, cf, pool,
+ groupName,
+ new NameChannelFactory(JGroupsUtil.getControlStackProperties(),
+ JGroupsUtil.getDataStackProperties()),
+ 5000, 5000, pullPolicy, rf, mapper, 1000);
+
postOffice.start();
return postOffice;
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -35,6 +35,7 @@
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.NameChannelFactory;
import org.jboss.test.messaging.core.SimpleCondition;
import org.jboss.test.messaging.core.SimpleConditionFactory;
import org.jboss.test.messaging.core.SimpleFilterFactory;
@@ -385,17 +386,17 @@
FailoverMapper mapper = new DefaultFailoverMapper();
- ConditionFactory cf = new SimpleConditionFactory();
-
- DefaultClusteredPostOffice postOffice =
+ ConditionFactory cf = new SimpleConditionFactory();
+
+ DefaultClusteredPostOffice postOffice =
new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
- "Clustered", ms, pm, tr, ff, cf, pool,
- groupName,
- JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties(),
- 5000, 5000, redistPolicy, rf, mapper, 1000);
-
+ sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
+ "Clustered", ms, pm, tr, ff, cf, pool,
+ groupName,
+ new NameChannelFactory(JGroupsUtil.getControlStackProperties(),
+ JGroupsUtil.getDataStackProperties()),
+ 5000, 5000, redistPolicy, rf, mapper, 1000);
+
postOffice.start();
return postOffice;
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -47,6 +47,7 @@
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.QueueStats;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.NameChannelFactory;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.test.messaging.core.SimpleConditionFactory;
import org.jboss.test.messaging.core.SimpleFilterFactory;
@@ -359,17 +360,17 @@
FailoverMapper mapper = new DefaultFailoverMapper();
- ConditionFactory cf = new SimpleConditionFactory();
-
- DefaultClusteredPostOffice postOffice =
+ ConditionFactory cf = new SimpleConditionFactory();
+
+ DefaultClusteredPostOffice postOffice =
new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
- "Clustered", ms, pm, tr, ff, cf, pool,
- groupName,
- JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties(),
- 5000, 5000, redistPolicy, rf, mapper, 1000);
-
+ sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
+ "Clustered", ms, pm, tr, ff, cf, pool,
+ groupName,
+ new NameChannelFactory(JGroupsUtil.getControlStackProperties(),
+ JGroupsUtil.getDataStackProperties()),
+ 5000, 5000, redistPolicy, rf, mapper, 1000);
+
postOffice.start();
return postOffice;
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -37,6 +37,7 @@
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.NameChannelFactory;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.test.messaging.core.SimpleCondition;
@@ -49,23 +50,21 @@
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
- *
* A RecoveryTest
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
+ * <p/>
+ * $Id$
*/
public class RecoveryTest extends PostOfficeTestBase
{
// Constants -----------------------------------------------------
// Static --------------------------------------------------------
-
+
// Attributes ----------------------------------------------------
-
+
// Constructors --------------------------------------------------
public RecoveryTest(String name)
@@ -81,305 +80,303 @@
}
public void tearDown() throws Exception
- {
+ {
super.tearDown();
}
-
+
public void testCrashBeforePersist() throws Exception
{
DefaultClusteredPostOffice office1 = null;
-
+
DefaultClusteredPostOffice office2 = null;
-
+
DefaultClusteredPostOffice office3 = null;
-
+
try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ {
+ office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+ office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding1 =
office1.bindClusteredQueue(new SimpleCondition("topic1"), queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding2 =
office2.bindClusteredQueue(new SimpleCondition("topic1"), queue2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding3 =
office3.bindClusteredQueue(new SimpleCondition("topic1"), queue3);
-
+
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue3.add(receiver3);
-
+
//This will make it fail after casting but before persisting the message in the db
office1.setFail(true, false, false);
-
+
Transaction tx = tr.createTransaction();
-
+
final int NUM_MESSAGES = 10;
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
- Message msg = CoreMessageFactory.createCoreMessage(i);
+ Message msg = CoreMessageFactory.createCoreMessage(i);
msg.setReliable(true);
-
- MessageReference ref = ms.reference(msg);
-
+
+ MessageReference ref = ms.reference(msg);
+
office1.route(ref, new SimpleCondition("topic1"), tx);
}
-
+
Thread.sleep(1000);
-
+
List msgs = receiver1.getMessages();
assertTrue(msgs.isEmpty());
-
+
msgs = receiver2.getMessages();
assertTrue(msgs.isEmpty());
-
+
msgs = receiver3.getMessages();
assertTrue(msgs.isEmpty());
-
+
try
{
//An exception should be thrown
tx.commit();
- fail();
+ fail();
}
catch (TransactionException e)
{
//Ok
}
-
+
Thread.sleep(1000);
-
+
msgs = receiver1.getMessages();
assertTrue(msgs.isEmpty());
-
+
msgs = receiver2.getMessages();
assertTrue(msgs.isEmpty());
-
+
msgs = receiver3.getMessages();
assertTrue(msgs.isEmpty());
-
+
//Nodes 2 and 3 should have a held tx
-
+
assertTrue(office1.getHoldingTransactions().isEmpty());
assertEquals(1, office2.getHoldingTransactions().size());
-
+
assertEquals(1, office3.getHoldingTransactions().size());
-
+
//We now kill the office - this should make the other offices do their transaction check
office1.stop();
-
+
Thread.sleep(1000);
-
+
//This should result in the held txs being rolled back
-
+
assertTrue(office1.getHoldingTransactions().isEmpty());
-
+
assertTrue(office2.getHoldingTransactions().isEmpty());
-
+
assertTrue(office3.getHoldingTransactions().isEmpty());
-
+
//The tx should be removed from the holding area and nothing should be received
//remember node1 has now crashed so no point checking receiver1
-
+
msgs = receiver2.getMessages();
assertTrue(msgs.isEmpty());
-
+
msgs = receiver3.getMessages();
assertTrue(msgs.isEmpty());
-
+
}
finally
{
if (office1 != null)
- {
+ {
office1.stop();
}
-
+
if (office2 != null)
- {
+ {
office2.stop();
}
-
- if (office3!= null)
- {
+
+ if (office3 != null)
+ {
office3.stop();
}
}
}
-
+
public void testCrashAfterPersist() throws Exception
{
DefaultClusteredPostOffice office1 = null;
-
+
DefaultClusteredPostOffice office2 = null;
-
+
DefaultClusteredPostOffice office3 = null;
-
+
try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ {
+ office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+ office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding1 =
office1.bindClusteredQueue(new SimpleCondition("topic1"), queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding2 =
office2.bindClusteredQueue(new SimpleCondition("topic1"), queue2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue3", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding3 =
office3.bindClusteredQueue(new SimpleCondition("topic1"), queue3);
-
+
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue2.add(receiver2);
SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue3.add(receiver3);
-
+
//This will make it fail after casting and persisting the message in the db
office1.setFail(false, true, false);
-
+
Transaction tx = tr.createTransaction();
-
+
final int NUM_MESSAGES = 10;
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
- Message msg = CoreMessageFactory.createCoreMessage(i);
+ Message msg = CoreMessageFactory.createCoreMessage(i);
msg.setReliable(true);
-
- MessageReference ref = ms.reference(msg);
-
+
+ MessageReference ref = ms.reference(msg);
+
office1.route(ref, new SimpleCondition("topic1"), tx);
}
-
+
Thread.sleep(1000);
-
+
List msgs = receiver1.getMessages();
assertTrue(msgs.isEmpty());
-
+
msgs = receiver2.getMessages();
assertTrue(msgs.isEmpty());
-
+
msgs = receiver3.getMessages();
assertTrue(msgs.isEmpty());
-
+
try
{
//An exception should be thrown
tx.commit();
- fail();
+ fail();
}
catch (TransactionException e)
{
//Ok
}
-
+
Thread.sleep(1000);
-
+
msgs = receiver1.getMessages();
assertTrue(msgs.isEmpty());
-
+
msgs = receiver2.getMessages();
assertTrue(msgs.isEmpty());
-
+
msgs = receiver3.getMessages();
assertTrue(msgs.isEmpty());
-
+
//There should be held tx in 2 and 3 but not in 1
-
+
assertTrue(office1.getHoldingTransactions().isEmpty());
-
+
assertEquals(1, office2.getHoldingTransactions().size());
-
+
assertEquals(1, office3.getHoldingTransactions().size());
-
+
//We now kill the office - this should make the other office do it's transaction check
office1.stop();
-
+
Thread.sleep(1000);
-
+
assertTrue(office1.getHoldingTransactions().isEmpty());
-
+
assertTrue(office2.getHoldingTransactions().isEmpty());
-
+
assertTrue(office3.getHoldingTransactions().isEmpty());
-
+
//The tx should be removed from the holding area and messages be received
//no point checking receiver1 since node1 has crashed
-
+
msgs = receiver2.getMessages();
assertEquals(NUM_MESSAGES, msgs.size());
-
+
msgs = receiver3.getMessages();
assertEquals(NUM_MESSAGES, msgs.size());
-
-
+
+
}
finally
{
if (office1 != null)
- {
+ {
office1.stop();
}
-
+
if (office2 != null)
- {
+ {
office2.stop();
}
}
}
-
-
-
-
+
+
protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
{
MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
-
+
FilterFactory ff = new SimpleFilterFactory();
-
+
ClusterRouterFactory rf = new DefaultRouterFactory();
-
+
FailoverMapper mapper = new DefaultFailoverMapper();
-
- ConditionFactory cf = new SimpleConditionFactory();
-
- DefaultClusteredPostOffice postOffice =
+
+ ConditionFactory cf = new SimpleConditionFactory();
+
+ DefaultClusteredPostOffice postOffice =
new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- sc.getClusteredPostOfficeSQLProperties(), true, nodeId, "Clustered",
- ms, pm, tr, ff, cf, pool,
- groupName,
- JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties(),
- 5000, 5000, redistPolicy, rf, mapper, 1000);
-
- postOffice.start();
-
+ sc.getClusteredPostOfficeSQLProperties(), true, nodeId, "Clustered",
+ ms, pm, tr, ff, cf, pool,
+ groupName,
+ new NameChannelFactory(JGroupsUtil.getControlStackProperties(),
+ JGroupsUtil.getDataStackProperties()),
+ 5000, 5000, redistPolicy, rf, mapper, 1000);
+
+ postOffice.start();
+
return postOffice;
}
// Private -------------------------------------------------------
-
+
// Inner classes -------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -44,6 +44,7 @@
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.channelfactory.NameChannelFactory;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.test.messaging.core.SimpleCondition;
import org.jboss.test.messaging.core.SimpleConditionFactory;
@@ -56,21 +57,19 @@
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
- *
* A RedistributionWithDefaultMessagePullPolicyTest
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
+ * <p/>
+ * $Id$
*/
public class RedistributionWithDefaultMessagePullPolicyTest extends PostOfficeTestBase
{
// Constants -----------------------------------------------------
// Static --------------------------------------------------------
-
+
// Attributes ----------------------------------------------------
// Constructors --------------------------------------------------
@@ -88,970 +87,970 @@
}
public void tearDown() throws Exception
- {
+ {
super.tearDown();
}
-
+
public void testConsumeAllNonPersistentNonRecoverable() throws Throwable
{
consumeAll(false, false);
}
-
+
public void testConsumeAllPersistentNonRecoverable() throws Throwable
{
consumeAll(true, false);
}
-
+
public void testConsumeAllNonPersistentRecoverable() throws Throwable
{
consumeAll(false, true);
}
-
+
public void testConsumeAllPersistentRecoverable() throws Throwable
{
consumeAll(true, true);
}
-
+
public void testConsumeBitByBitNonPersistentNonRecoverable() throws Throwable
{
consumeBitByBit(false, false);
}
-
+
public void testConsumeBitByBitPersistentNonRecoverable() throws Throwable
{
consumeBitByBit(true, false);
}
-
+
public void testConsumeBitByBitNonPersistentRecoverable() throws Throwable
{
consumeBitByBit(false, true);
}
-
+
public void testConsumeBitByBitPersistentRecoverable() throws Throwable
{
consumeBitByBit(true, true);
}
-
+
public void testSimpleMessagePull() throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
+
DefaultClusteredPostOffice office2 = null;
-
+
try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ {
+ office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding1 =
office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding2 =
office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-
- Message msg = CoreMessageFactory.createCoreMessage(1);
+
+ Message msg = CoreMessageFactory.createCoreMessage(1);
msg.setReliable(true);
-
- MessageReference ref = ms.reference(msg);
-
+
+ MessageReference ref = ms.reference(msg);
+
office1.route(ref, new SimpleCondition("queue1"), null);
-
+
Thread.sleep(2000);
-
+
//Messages should all be in queue1
-
+
List msgs = queue1.browse();
assertEquals(1, msgs.size());
-
+
msgs = queue2.browse();
assertTrue(msgs.isEmpty());
-
+
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
receiver1.setMaxRefs(0);
queue1.add(receiver1);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
receiver2.setMaxRefs(0);
queue2.add(receiver2);
-
+
//Prompt delivery so the channels know if the receivers are ready
queue1.deliver(false);
Thread.sleep(2000);
-
+
//Pull from 1 to 2
-
+
receiver2.setMaxRefs(1);
-
+
log.info("delivering");
- queue2.deliver(false);
-
+ queue2.deliver(false);
+
Thread.sleep(3000);
-
- assertTrue(office1.getHoldingTransactions().isEmpty());
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
assertTrue(office2.getHoldingTransactions().isEmpty());
-
+
log.info("r2 " + receiver2.getMessages().size());
-
+
log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
-
+
assertEquals(0, queue1.memoryRefCount());
assertEquals(0, queue1.getDeliveringCount());
-
+
assertEquals(0, queue2.memoryRefCount());
assertEquals(1, queue2.getDeliveringCount());
-
+
this.acknowledgeAll(receiver2);
-
+
assertEquals(0, queue2.memoryRefCount());
assertEquals(0, queue2.getDeliveringCount());
-
- assertTrue(office1.getHoldingTransactions().isEmpty());
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
assertTrue(office2.getHoldingTransactions().isEmpty());
-
+
}
finally
{
if (office1 != null)
- {
+ {
office1.stop();
}
-
+
if (office2 != null)
- {
+ {
office2.stop();
}
}
}
-
+
public void testSimpleMessagePullCrashBeforeCommit() throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
+
DefaultClusteredPostOffice office2 = null;
-
+
try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ {
+ office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding1 =
office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding2 =
office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-
- Message msg = CoreMessageFactory.createCoreMessage(1);
+
+ Message msg = CoreMessageFactory.createCoreMessage(1);
msg.setReliable(true);
-
- MessageReference ref = ms.reference(msg);
-
+
+ MessageReference ref = ms.reference(msg);
+
office1.route(ref, new SimpleCondition("queue1"), null);
-
+
Thread.sleep(2000);
-
+
//Messages should all be in queue1
-
+
List msgs = queue1.browse();
assertEquals(1, msgs.size());
-
+
msgs = queue2.browse();
assertTrue(msgs.isEmpty());
-
+
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
receiver1.setMaxRefs(0);
queue1.add(receiver1);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
receiver2.setMaxRefs(0);
queue2.add(receiver2);
-
+
//Prompt delivery so the channels know if the receivers are ready
queue1.deliver(false);
Thread.sleep(2000);
-
+
//Pull from 1 to 2
-
+
receiver2.setMaxRefs(1);
-
+
//Force a failure before commit
office2.setFail(true, false, false);
-
+
log.info("delivering");
- queue2.deliver(false);
-
+ queue2.deliver(false);
+
Thread.sleep(3000);
-
- assertEquals(1, office1.getHoldingTransactions().size());
+
+ assertEquals(1, office1.getHoldingTransactions().size());
assertTrue(office2.getHoldingTransactions().isEmpty());
-
+
log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
-
+
assertEquals(0, queue1.memoryRefCount());
assertEquals(1, queue1.getDeliveringCount());
-
+
assertEquals(0, queue2.memoryRefCount());
assertEquals(0, queue2.getDeliveringCount());
-
+
//Now kill office 2 - this should cause office1 to remove the dead held transaction
-
- office2.stop();
+
+ office2.stop();
Thread.sleep(2000);
-
- assertTrue(office1.getHoldingTransactions().isEmpty());
-
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+
//The delivery should be cancelled back to the queue too
-
+
assertEquals(1, queue1.memoryRefCount());
assertEquals(0, queue1.getDeliveringCount());
-
-
+
+
}
finally
{
if (office1 != null)
- {
+ {
office1.stop();
}
-
+
if (office2 != null)
- {
+ {
office2.stop();
}
}
}
-
+
public void testSimpleMessagePullCrashAfterCommit() throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
+
DefaultClusteredPostOffice office2 = null;
-
+
try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ {
+ office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding1 =
office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding2 =
office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-
- Message msg = CoreMessageFactory.createCoreMessage(1);
+
+ Message msg = CoreMessageFactory.createCoreMessage(1);
msg.setReliable(true);
-
- MessageReference ref = ms.reference(msg);
-
+
+ MessageReference ref = ms.reference(msg);
+
office1.route(ref, new SimpleCondition("queue1"), null);
-
+
Thread.sleep(2000);
-
+
//Messages should all be in queue1
-
+
List msgs = queue1.browse();
assertEquals(1, msgs.size());
-
+
msgs = queue2.browse();
assertTrue(msgs.isEmpty());
-
+
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
receiver1.setMaxRefs(0);
queue1.add(receiver1);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
receiver2.setMaxRefs(0);
queue2.add(receiver2);
-
+
//Prompt delivery so the channels know if the receivers are ready
queue1.deliver(false);
Thread.sleep(2000);
-
+
//Pull from 1 to 2
-
+
receiver2.setMaxRefs(1);
-
+
//Force a failure after commit the ack to storage
office2.setFail(false, true, false);
-
+
log.info("delivering");
- queue2.deliver(false);
-
+ queue2.deliver(false);
+
Thread.sleep(3000);
-
- assertEquals(1, office1.getHoldingTransactions().size());
+
+ assertEquals(1, office1.getHoldingTransactions().size());
assertTrue(office2.getHoldingTransactions().isEmpty());
-
+
log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
-
+
assertEquals(0, queue1.memoryRefCount());
assertEquals(1, queue1.getDeliveringCount());
-
+
//Now kill office 2 - this should cause office1 to remove the dead held transaction
-
- office2.stop();
+
+ office2.stop();
Thread.sleep(2000);
-
- assertTrue(office1.getHoldingTransactions().isEmpty());
-
+
+ assertTrue(office1.getHoldingTransactions().isEmpty());
+
//The delivery should be committed
-
+
assertEquals(0, queue1.memoryRefCount());
assertEquals(0, queue1.getDeliveringCount());
-
+
}
finally
{
if (office1 != null)
- {
+ {
office1.stop();
}
-
+
if (office2 != null)
- {
+ {
office2.stop();
}
}
}
-
+
public void testFailHandleMessagePullResult() throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
+
DefaultClusteredPostOffice office2 = null;
-
+
try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+ {
+ office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding1 =
office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, true, (QueuedExecutor) pool.get(), null, tr);
Binding binding2 =
office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-
- Message msg = CoreMessageFactory.createCoreMessage(1);
+
+ Message msg = CoreMessageFactory.createCoreMessage(1);
msg.setReliable(true);
-
- MessageReference ref = ms.reference(msg);
-
+
+ MessageReference ref = ms.reference(msg);
+
office1.route(ref, new SimpleCondition("queue1"), null);
-
+
Thread.sleep(2000);
-
+
//Messages should all be in queue1
-
+
List msgs = queue1.browse();
assertEquals(1, msgs.size());
-
+
msgs = queue2.browse();
assertTrue(msgs.isEmpty());
-
+
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
receiver1.setMaxRefs(0);
queue1.add(receiver1);
SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
receiver2.setMaxRefs(0);
queue2.add(receiver2);
-
+
//Prompt delivery so the channels know if the receivers are ready
queue1.deliver(false);
Thread.sleep(2000);
-
+
//Pull from 1 to 2
-
+
receiver2.setMaxRefs(1);
-
+
office2.setFail(false, false, true);
-
+
log.info("delivering");
- queue2.deliver(false);
-
+ queue2.deliver(false);
+
Thread.sleep(3000);
-
+
//The delivery should be rolled back
-
- assertTrue(office2.getHoldingTransactions().isEmpty());
+
assertTrue(office2.getHoldingTransactions().isEmpty());
-
+ assertTrue(office2.getHoldingTransactions().isEmpty());
+
log.info("queue1 refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2 refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
-
+
assertEquals(1, queue1.memoryRefCount());
assertEquals(0, queue1.getDeliveringCount());
-
+
assertEquals(0, queue2.memoryRefCount());
assertEquals(0, queue2.getDeliveringCount());
}
finally
{
if (office1 != null)
- {
+ {
office1.stop();
}
-
+
if (office2 != null)
- {
+ {
office2.stop();
}
}
}
-
+
protected void consumeAll(boolean persistent, boolean recoverable) throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
+
DefaultClusteredPostOffice office2 = null;
-
+
DefaultClusteredPostOffice office3 = null;
-
+
DefaultClusteredPostOffice office4 = null;
-
+
DefaultClusteredPostOffice office5 = null;
-
+
try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-
- office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
-
- office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ {
+ office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+ office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
+
+ office4 = (DefaultClusteredPostOffice) createClusteredPostOffice(4, "testgroup");
+
+ office5 = (DefaultClusteredPostOffice) createClusteredPostOffice(5, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
Binding binding1 = office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
Binding binding2 = office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
-
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
+ Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
Binding binding4 = office4.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
-
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
Binding binding5 = office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
-
+
final int NUM_MESSAGES = 100;
-
+
this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
-
+
Thread.sleep(2000);
-
+
//Check the sizes
-
- log.info("Here are the sizes:");
+
+ log.info("Here are the sizes:");
log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
assertEquals(0, queue1.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
assertEquals(0, queue2.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
assertEquals(0, queue3.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
assertEquals(0, queue4.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
assertEquals(0, queue5.getDeliveringCount());
-
+
SimpleReceiver receiver = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-
+
queue1.add(receiver);
-
+
queue1.deliver(false);
-
+
Thread.sleep(7000);
-
- log.info("Here are the sizes:");
+
+ log.info("Here are the sizes:");
log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-
+
assertEquals(0, queue1.memoryRefCount());
assertEquals(NUM_MESSAGES * 5, queue1.getDeliveringCount());
-
+
assertEquals(0, queue2.memoryRefCount());
assertEquals(0, queue2.getDeliveringCount());
-
+
assertEquals(0, queue3.memoryRefCount());
assertEquals(0, queue3.getDeliveringCount());
-
+
assertEquals(0, queue4.memoryRefCount());
assertEquals(0, queue4.getDeliveringCount());
-
+
assertEquals(0, queue5.memoryRefCount());
assertEquals(0, queue5.getDeliveringCount());
-
+
List messages = receiver.getMessages();
-
+
assertNotNull(messages);
-
+
assertEquals(NUM_MESSAGES * 5, messages.size());
-
+
Iterator iter = messages.iterator();
-
+
while (iter.hasNext())
{
- Message msg = (Message)iter.next();
-
+ Message msg = (Message) iter.next();
+
receiver.acknowledge(msg, null);
}
-
+
receiver.clear();
-
+
assertEquals(0, queue1.memoryRefCount());
assertEquals(0, queue1.getDeliveringCount());
-
+
assertTrue(office1.getHoldingTransactions().isEmpty());
assertTrue(office2.getHoldingTransactions().isEmpty());
assertTrue(office3.getHoldingTransactions().isEmpty());
assertTrue(office4.getHoldingTransactions().isEmpty());
assertTrue(office5.getHoldingTransactions().isEmpty());
-
+
if (checkNoMessageData())
{
fail("Message data still in database");
}
}
finally
- {
+ {
if (office1 != null)
{
office1.stop();
}
-
+
if (office2 != null)
- {
+ {
office2.stop();
}
-
+
if (office3 != null)
{
office3.stop();
}
-
+
if (office4 != null)
- {
+ {
office4.stop();
}
-
+
if (office5 != null)
{
office5.stop();
}
}
}
-
+
protected void consumeBitByBit(boolean persistent, boolean recoverable) throws Throwable
{
DefaultClusteredPostOffice office1 = null;
-
+
DefaultClusteredPostOffice office2 = null;
-
+
DefaultClusteredPostOffice office3 = null;
-
+
DefaultClusteredPostOffice office4 = null;
-
+
DefaultClusteredPostOffice office5 = null;
-
+
try
- {
- office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
-
- office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
-
- office3 = (DefaultClusteredPostOffice)createClusteredPostOffice(3, "testgroup");
-
- office4 = (DefaultClusteredPostOffice)createClusteredPostOffice(4, "testgroup");
-
- office5 = (DefaultClusteredPostOffice)createClusteredPostOffice(5, "testgroup");
-
- LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+ {
+ office1 = (DefaultClusteredPostOffice) createClusteredPostOffice(1, "testgroup");
+
+ office2 = (DefaultClusteredPostOffice) createClusteredPostOffice(2, "testgroup");
+
+ office3 = (DefaultClusteredPostOffice) createClusteredPostOffice(3, "testgroup");
+
+ office4 = (DefaultClusteredPostOffice) createClusteredPostOffice(4, "testgroup");
+
+ office5 = (DefaultClusteredPostOffice) createClusteredPostOffice(5, "testgroup");
+
+ LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
Binding binding1 = office1.bindClusteredQueue(new SimpleCondition("queue1"), queue1);
-
- LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
Binding binding2 = office2.bindClusteredQueue(new SimpleCondition("queue1"), queue2);
-
- LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
- Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
-
- LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
+ Binding binding3 = office3.bindClusteredQueue(new SimpleCondition("queue1"), queue3);
+
+ LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
Binding binding4 = office4.bindClusteredQueue(new SimpleCondition("queue1"), queue4);
-
- LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
+
+ LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm, true, recoverable, (QueuedExecutor) pool.get(), null, tr);
Binding binding5 = office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
-
+
final int NUM_MESSAGES = 100;
-
+
this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
-
+
Thread.sleep(2000);
-
+
//Check the sizes
-
- log.info("Here are the sizes 1:");
+
+ log.info("Here are the sizes 1:");
log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES, queue1.memoryRefCount());
assertEquals(0, queue1.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES, queue2.memoryRefCount());
assertEquals(0, queue2.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES, queue3.memoryRefCount());
assertEquals(0, queue3.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES, queue4.memoryRefCount());
assertEquals(0, queue4.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
assertEquals(0, queue5.getDeliveringCount());
-
+
assertTrue(office1.getHoldingTransactions().isEmpty());
assertTrue(office2.getHoldingTransactions().isEmpty());
assertTrue(office3.getHoldingTransactions().isEmpty());
assertTrue(office4.getHoldingTransactions().isEmpty());
assertTrue(office5.getHoldingTransactions().isEmpty());
-
- SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+
+ SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
queue1.add(receiver1);
- SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
queue2.add(receiver2);
- SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
queue3.add(receiver3);
- SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
queue4.add(receiver4);
- SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
+ SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING_TO_MAX);
queue5.add(receiver5);
-
- receiver1.setMaxRefs(5);
- queue1.deliver(false);
- Thread.sleep(1000);
+
+ receiver1.setMaxRefs(5);
+ queue1.deliver(false);
+ Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 5, queue1.memoryRefCount());
assertEquals(5, queue1.getDeliveringCount());
-
+
acknowledgeAll(receiver1);
assertEquals(0, queue1.getDeliveringCount());
receiver1.setMaxRefs(0);
-
- receiver2.setMaxRefs(10);
- queue2.deliver(false);
+
+ receiver2.setMaxRefs(10);
+ queue2.deliver(false);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
assertEquals(10, queue2.getDeliveringCount());
acknowledgeAll(receiver2);
receiver2.setMaxRefs(0);
-
- receiver3.setMaxRefs(15);
- queue3.deliver(false);
+
+ receiver3.setMaxRefs(15);
+ queue3.deliver(false);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
assertEquals(15, queue3.getDeliveringCount());
acknowledgeAll(receiver3);
receiver3.setMaxRefs(0);
-
- receiver4.setMaxRefs(20);
- queue4.deliver(false);
+
+ receiver4.setMaxRefs(20);
+ queue4.deliver(false);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
assertEquals(20, queue4.getDeliveringCount());
acknowledgeAll(receiver4);
receiver4.setMaxRefs(0);
-
- receiver5.setMaxRefs(25);
- queue5.deliver(false);
+
+ receiver5.setMaxRefs(25);
+ queue5.deliver(false);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 25, queue5.memoryRefCount());
assertEquals(25, queue5.getDeliveringCount());
acknowledgeAll(receiver5);
receiver5.setMaxRefs(0);
-
+
Thread.sleep(1000);
-
+
assertTrue(office1.getHoldingTransactions().isEmpty());
assertTrue(office2.getHoldingTransactions().isEmpty());
assertTrue(office3.getHoldingTransactions().isEmpty());
assertTrue(office4.getHoldingTransactions().isEmpty());
assertTrue(office5.getHoldingTransactions().isEmpty());
-
- log.info("Here are the sizes 2:");
+
+ log.info("Here are the sizes 2:");
log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-
+
//Consume the rest from queue 5
receiver5.setMaxRefs(NUM_MESSAGES - 25);
queue5.deliver(false);
- Thread.sleep(5000);
-
+ Thread.sleep(5000);
+
log.info("receiver5 msgs:" + receiver5.getMessages().size());
-
- log.info("Here are the sizes 3:");
+
+ log.info("Here are the sizes 3:");
log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-
+
//This will result in an extra one being pulled from queue1 - we cannot avoid this
//This is because the channel does not know that the receiver is full unless it tries
//with a ref so it needs to retrieve one
-
+
assertEquals(NUM_MESSAGES - 6, queue1.memoryRefCount());
assertEquals(0, queue1.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
assertEquals(0, queue2.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
assertEquals(0, queue3.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
assertEquals(0, queue4.getDeliveringCount());
-
- assertEquals(1, queue5.memoryRefCount());
+
+ assertEquals(1, queue5.memoryRefCount());
assertEquals(NUM_MESSAGES - 25, queue5.getDeliveringCount());
-
+
acknowledgeAll(receiver5);
-
+
assertEquals(0, queue5.getDeliveringCount());
-
+
receiver5.setMaxRefs(0);
-
+
assertTrue(office1.getHoldingTransactions().isEmpty());
assertTrue(office2.getHoldingTransactions().isEmpty());
assertTrue(office3.getHoldingTransactions().isEmpty());
assertTrue(office4.getHoldingTransactions().isEmpty());
assertTrue(office5.getHoldingTransactions().isEmpty());
-
+
//Now consume 5 more from queue5, they should come from queue1 which has the most messages
-
+
log.info("Consume 5 more from queue 5");
-
+
receiver5.setMaxRefs(5);
queue5.deliver(false);
Thread.sleep(5000);
-
- log.info("Here are the sizes 4:");
+
+ log.info("Here are the sizes 4:");
log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
-
+
assertEquals(0, queue1.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
assertEquals(0, queue2.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
assertEquals(0, queue3.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
assertEquals(0, queue4.getDeliveringCount());
-
- assertEquals(1, queue5.memoryRefCount());
+
+ assertEquals(1, queue5.memoryRefCount());
assertEquals(5, queue5.getDeliveringCount());
-
+
acknowledgeAll(receiver5);
assertEquals(0, queue5.getDeliveringCount());
-
+
receiver1.setMaxRefs(0);
-
+
assertTrue(office1.getHoldingTransactions().isEmpty());
assertTrue(office2.getHoldingTransactions().isEmpty());
assertTrue(office3.getHoldingTransactions().isEmpty());
assertTrue(office4.getHoldingTransactions().isEmpty());
assertTrue(office5.getHoldingTransactions().isEmpty());
-
+
//Consume 1 more - should pull one from queue2
-
+
receiver5.setMaxRefs(1);
queue5.deliver(false);
Thread.sleep(2000);
-
- log.info("Here are the sizes 5:");
+
+ log.info("Here are the sizes 5:");
log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES - 11, queue1.memoryRefCount());
assertEquals(0, queue1.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES - 11, queue2.memoryRefCount());
assertEquals(0, queue2.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
assertEquals(0, queue3.getDeliveringCount());
-
+
assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
assertEquals(0, queue4.getDeliveringCount());
-
- assertEquals(1, queue5.memoryRefCount());
+
+ assertEquals(1, queue5.memoryRefCount());
assertEquals(1, queue5.getDeliveringCount());
-
+
acknowledgeAll(receiver5);
-
+
assertEquals(0, queue5.getDeliveringCount());
-
+
receiver5.setMaxRefs(0);
-
+
assertTrue(office1.getHoldingTransactions().isEmpty());
assertTrue(office2.getHoldingTransactions().isEmpty());
assertTrue(office3.getHoldingTransactions().isEmpty());
assertTrue(office4.getHoldingTransactions().isEmpty());
assertTrue(office5.getHoldingTransactions().isEmpty());
-
+
//From queue 4 consume everything else
-
+
receiver4.setMaxRefs(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1);
queue4.deliver(false);
Thread.sleep(7000);
-
- log.info("Here are the sizes 6:");
+
+ log.info("Here are the sizes 6:");
log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.getDeliveringCount());
log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.getDeliveringCount());
log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.getDeliveringCount());
log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.getDeliveringCount());
-
+
assertEquals(0, queue1.memoryRefCount());
assertEquals(0, queue1.getDeliveringCount());
-
+
assertEquals(0, queue2.memoryRefCount());
assertEquals(0, queue2.getDeliveringCount());
-
+
assertEquals(0, queue3.memoryRefCount());
assertEquals(0, queue3.getDeliveringCount());
-
+
assertEquals(0, queue4.memoryRefCount());
assertEquals(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1, queue4.getDeliveringCount());
-
- assertEquals(0, queue5.memoryRefCount());
+
+ assertEquals(0, queue5.memoryRefCount());
assertEquals(0, queue5.getDeliveringCount());
-
+
acknowledgeAll(receiver4);
-
+
assertEquals(0, queue4.getDeliveringCount());
-
+
assertTrue(office1.getHoldingTransactions().isEmpty());
assertTrue(office2.getHoldingTransactions().isEmpty());
assertTrue(office3.getHoldingTransactions().isEmpty());
assertTrue(office4.getHoldingTransactions().isEmpty());
assertTrue(office5.getHoldingTransactions().isEmpty());
-
+
if (checkNoMessageData())
{
fail("Message data still in database");
}
}
finally
- {
+ {
if (office1 != null)
{
office1.stop();
}
-
+
if (office2 != null)
- {
+ {
office2.stop();
}
-
+
if (office3 != null)
{
office3.stop();
}
-
+
if (office4 != null)
- {
+ {
office4.stop();
}
-
+
if (office5 != null)
{
office5.stop();
}
}
- }
-
+ }
+
class ThrottleReceiver implements Receiver, Runnable
{
long pause;
-
+
volatile int totalCount;
-
+
int count;
-
+
int maxSize;
-
+
volatile boolean full;
-
+
Executor executor;
-
+
List dels;
-
+
Channel queue;
-
+
int getTotalCount()
{
return totalCount;
}
-
+
ThrottleReceiver(Channel queue, long pause, int maxSize)
{
this.queue = queue;
-
+
this.pause = pause;
-
+
this.maxSize = maxSize;
-
+
this.executor = new QueuedExecutor();
-
+
this.dels = new ArrayList();
}
@@ -1061,25 +1060,25 @@
{
return null;
}
-
+
//log.info(this + " got ref");
-
+
//log.info("cnt:" + totalCount);
-
+
SimpleDelivery del = new SimpleDelivery(observer, reference);
-
+
dels.add(del);
-
+
count++;
-
+
totalCount++;
-
+
if (count == maxSize)
{
full = true;
-
+
count = 0;
-
+
try
{
executor.execute(this);
@@ -1089,15 +1088,15 @@
//Ignore
}
}
-
+
return del;
-
+
}
-
+
public void run()
{
//Simulate processing of messages
-
+
try
{
Thread.sleep(pause);
@@ -1106,13 +1105,13 @@
{
//Ignore
}
-
+
Iterator iter = dels.iterator();
-
+
while (iter.hasNext())
{
- Delivery del = (Delivery)iter.next();
-
+ Delivery del = (Delivery) iter.next();
+
try
{
del.acknowledge(null);
@@ -1122,63 +1121,63 @@
//Ignore
}
}
-
+
dels.clear();
-
+
full = false;
-
+
queue.deliver(false);
}
-
+
}
-
+
private void acknowledgeAll(SimpleReceiver receiver) throws Throwable
{
List messages = receiver.getMessages();
-
+
Iterator iter = messages.iterator();
-
+
while (iter.hasNext())
{
- Message msg = (Message)iter.next();
-
+ Message msg = (Message) iter.next();
+
receiver.acknowledge(msg, null);
}
-
+
receiver.clear();
}
-
-
+
+
protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
{
MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
-
+
FilterFactory ff = new SimpleFilterFactory();
-
+
ClusterRouterFactory rf = new DefaultRouterFactory();
-
+
FailoverMapper mapper = new DefaultFailoverMapper();
-
- ConditionFactory cf = new SimpleConditionFactory();
-
- DefaultClusteredPostOffice postOffice =
+
+ ConditionFactory cf = new SimpleConditionFactory();
+
+ DefaultClusteredPostOffice postOffice =
new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
- sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
- "Clustered", ms, pm, tr, ff, cf, pool,
- groupName,
- JGroupsUtil.getControlStackProperties(),
- JGroupsUtil.getDataStackProperties(),
- 10000, 10000, pullPolicy, rf, mapper, 1000);
-
- postOffice.start();
-
+ sc.getClusteredPostOfficeSQLProperties(), true, nodeId,
+ "Clustered", ms, pm, tr, ff, cf, pool,
+ groupName,
+ new NameChannelFactory(JGroupsUtil.getControlStackProperties(),
+ JGroupsUtil.getDataStackProperties()),
+ 10000, 10000, pullPolicy, rf, mapper, 1000);
+
+ postOffice.start();
+
return postOffice;
}
-
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
+
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -412,6 +412,13 @@
sb.append("-cp").append(" \"").append(classPath).append("\" ");
}
+ // As there is a problem with Multicast and JGroups on Linux (in certain JDKs)
+ // The stack introduced by multiplexor might fail under Linux if we don't have this
+ if (System.getProperty("os.name").equals("Linux"))
+ {
+ sb.append(" -Djava.net.preferIPv4Stack=true ");
+ }
+
sb.append("org.jboss.test.messaging.tools.jmx.rmi.RMITestServer");
String commandLine = sb.toString();
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-01-04 16:08:37 UTC (rev 1891)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-01-04 22:02:16 UTC (rev 1892)
@@ -297,6 +297,33 @@
throw new Exception("Cannot find " + mainConfigFile + " in the classpath");
}
+ if (clustered)
+ {
+ log.info("Starting multiplexer");
+ String multiplexerConfigFile = "server/default/deploy/multiplexer-service.xml";
+ URL multiplexerCofigURL = getClass().getClassLoader().getResource(multiplexerConfigFile);
+ if (multiplexerCofigURL == null)
+ {
+ throw new Exception("Cannot find " + multiplexerCofigURL + " in the classpath");
+ }
+ ServiceDeploymentDescriptor multiplexerDD = new ServiceDeploymentDescriptor(multiplexerCofigURL);
+ List services = multiplexerDD.query("name","Multiplexer");
+ if (services.isEmpty())
+ {
+ log.info("Couldn't find multiplexer config");
+ }
+ else
+ {
+ log.info("Could find multiplexer config");
+ }
+
+ MBeanConfigurationElement multiplexerConfig = (MBeanConfigurationElement) services.iterator().next();
+ ObjectName nameMultiplexer = sc.registerAndConfigureService(multiplexerConfig);
+ sc.invoke(nameMultiplexer,"create", new Object[0], new String[0]);
+ sc.invoke(nameMultiplexer,"start", new Object[0], new String[0]);
+
+ }
+
String databaseType = sc.getDatabaseType();
String persistenceConfigFile;
More information about the jboss-cvs-commits
mailing list