JBoss hornetq SVN: r9763 - in trunk/examples: jms/client-kickoff/server0 and 13 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-10 20:56:51 -0400 (Sun, 10 Oct 2010)
New Revision: 9763
Modified:
trunk/examples/javaee/xarecovery/server/hornetq-jms.xml
trunk/examples/jms/client-kickoff/server0/hornetq-jms.xml
trunk/examples/jms/jms-bridge/server0/hornetq-jms.xml
trunk/examples/jms/jms-bridge/server1/hornetq-jms.xml
trunk/examples/jms/jms-bridge/src/org/hornetq/jms/example/JMSBridgeExample.java
trunk/examples/jms/jmx/server0/hornetq-jms.xml
trunk/examples/jms/management/server0/hornetq-jms.xml
trunk/examples/jms/message-counters/server0/hornetq-jms.xml
trunk/examples/jms/queue-requestor/server0/hornetq-jms.xml
trunk/examples/jms/symmetric-cluster/src/org/hornetq/jms/example/SymmetricClusterExample.java
trunk/examples/jms/xa-heuristic/server0/hornetq-jms.xml
trunk/examples/jms/xa-receive/server0/hornetq-jms.xml
trunk/examples/jms/xa-send/server0/hornetq-jms.xml
trunk/examples/jms/xa-with-jta/server0/hornetq-jms.xml
trunk/examples/soak/tx-restarts/server0/hornetq-jms.xml
Log:
HORNETQ-515 -- example configuration changes
Modified: trunk/examples/javaee/xarecovery/server/hornetq-jms.xml
===================================================================
--- trunk/examples/javaee/xarecovery/server/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/javaee/xarecovery/server/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -16,4 +17,4 @@
<queue name="testQueue">
<entry name="/queue/testQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: trunk/examples/jms/client-kickoff/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/client-kickoff/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/client-kickoff/server0/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -2,7 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
+ <connection-factory name="ConnectionFactory" signature="queue">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -11,4 +11,4 @@
</entries>
</connection-factory>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: trunk/examples/jms/jms-bridge/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/jms-bridge/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/jms-bridge/server0/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -11,6 +12,15 @@
</entries>
</connection-factory>
+ <connection-factory name="ClientConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/client/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<topic name="topic">
<entry name="/source/topic"/>
</topic>
Modified: trunk/examples/jms/jms-bridge/server1/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/jms-bridge/server1/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/jms-bridge/server1/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -11,6 +12,15 @@
</entries>
</connection-factory>
+ <connection-factory name="ClientConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/client/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<queue name="target">
<entry name="/target/queue"/>
</queue>
Modified: trunk/examples/jms/jms-bridge/src/org/hornetq/jms/example/JMSBridgeExample.java
===================================================================
--- trunk/examples/jms/jms-bridge/src/org/hornetq/jms/example/JMSBridgeExample.java 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/jms-bridge/src/org/hornetq/jms/example/JMSBridgeExample.java 2010-10-11 00:56:51 UTC (rev 9763)
@@ -55,7 +55,7 @@
try
{
// Step 2. Lookup the *source* JMS resources
- ConnectionFactory sourceConnectionFactory = (ConnectionFactory)sourceContext.lookup("/source/ConnectionFactory");
+ ConnectionFactory sourceConnectionFactory = (ConnectionFactory)sourceContext.lookup("/client/ConnectionFactory");
Topic sourceTopic = (Topic)sourceContext.lookup("/source/topic");
// Step 3. Create a connection, a session and a message producer for the *source* topic
@@ -75,7 +75,7 @@
sourceConnection.close();
// Step 6. Lookup the *target* JMS resources
- ConnectionFactory targetConnectionFactory = (ConnectionFactory)targetContext.lookup("/target/ConnectionFactory");
+ ConnectionFactory targetConnectionFactory = (ConnectionFactory)targetContext.lookup("/client/ConnectionFactory");
Queue targetQueue = (Queue)targetContext.lookup("/target/queue");
// Step 7. Create a connection, a session and a message consumer for the *target* queue
Modified: trunk/examples/jms/jmx/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/jmx/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/jmx/server0/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -2,7 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
+ <connection-factory name="ConnectionFactory" signature="queue">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -16,4 +16,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: trunk/examples/jms/management/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/management/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/management/server0/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -2,7 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
+ <connection-factory name="ConnectionFactory" signature="queue">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -15,4 +15,4 @@
<queue name="exampleQueue">
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: trunk/examples/jms/message-counters/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/message-counters/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/message-counters/server0/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -2,7 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
+ <connection-factory name="ConnectionFactory" signature="queue">
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -21,4 +21,4 @@
<entry name="/queue/expiryQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: trunk/examples/jms/queue-requestor/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/queue-requestor/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/queue-requestor/server0/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -2,7 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
+ <connection-factory name="ConnectionFactory" signature="queue">
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -16,4 +16,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: trunk/examples/jms/symmetric-cluster/src/org/hornetq/jms/example/SymmetricClusterExample.java
===================================================================
--- trunk/examples/jms/symmetric-cluster/src/org/hornetq/jms/example/SymmetricClusterExample.java 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/symmetric-cluster/src/org/hornetq/jms/example/SymmetricClusterExample.java 2010-10-11 00:56:51 UTC (rev 9763)
@@ -23,6 +23,7 @@
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.common.example.HornetQExample;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* This example demonstrates a cluster of three nodes set up in a symmetric topology - i.e. each node
@@ -79,7 +80,7 @@
// connection factory directly we avoid having to worry about a JNDI look-up.
// In an app server environment you could use HA-JNDI to lookup from the clustered JNDI servers without
// having to know about a specific one.
- ConnectionFactory cf = HornetQJMSClient.createConnectionFactory("231.7.7.7", 9876);
+ ConnectionFactory cf = (ConnectionFactory)HornetQJMSClient.createConnectionFactory("231.7.7.7", 9876, JMSFactoryType.CF);
// We give a little while for each server to broadcast its whereabouts to the client
Thread.sleep(2000);
Modified: trunk/examples/jms/xa-heuristic/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/xa-heuristic/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/xa-heuristic/server0/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -16,4 +17,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: trunk/examples/jms/xa-receive/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/xa-receive/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/xa-receive/server0/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -16,4 +17,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: trunk/examples/jms/xa-send/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/xa-send/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/xa-send/server0/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -16,4 +17,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: trunk/examples/jms/xa-with-jta/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/xa-with-jta/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/jms/xa-with-jta/server0/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -16,4 +17,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: trunk/examples/soak/tx-restarts/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/soak/tx-restarts/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
+++ trunk/examples/soak/tx-restarts/server0/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
13 years, 6 months
JBoss hornetq SVN: r9762 - in branches/HORNETQ-515: examples/jms/client-kickoff/server0 and 45 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-10 08:15:41 -0400 (Sun, 10 Oct 2010)
New Revision: 9762
Added:
branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQJMSConnectionFactory.java
branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java
branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java
branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java
branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java
branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java
branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSFactoryType.java
Modified:
branches/HORNETQ-515/examples/javaee/xarecovery/server/hornetq-jms.xml
branches/HORNETQ-515/examples/jms/client-kickoff/server0/hornetq-jms.xml
branches/HORNETQ-515/examples/jms/jms-bridge/server0/hornetq-jms.xml
branches/HORNETQ-515/examples/jms/jms-bridge/server1/hornetq-jms.xml
branches/HORNETQ-515/examples/jms/jms-bridge/src/org/hornetq/jms/example/JMSBridgeExample.java
branches/HORNETQ-515/examples/jms/jmx/server0/hornetq-jms.xml
branches/HORNETQ-515/examples/jms/management/server0/hornetq-jms.xml
branches/HORNETQ-515/examples/jms/message-counters/server0/hornetq-jms.xml
branches/HORNETQ-515/examples/jms/queue-requestor/server0/hornetq-jms.xml
branches/HORNETQ-515/examples/jms/symmetric-cluster/src/org/hornetq/jms/example/SymmetricClusterExample.java
branches/HORNETQ-515/examples/jms/xa-heuristic/server0/hornetq-jms.xml
branches/HORNETQ-515/examples/jms/xa-receive/server0/hornetq-jms.xml
branches/HORNETQ-515/examples/jms/xa-send/server0/hornetq-jms.xml
branches/HORNETQ-515/examples/jms/xa-with-jta/server0/hornetq-jms.xml
branches/HORNETQ-515/examples/soak/tx-restarts/server0/hornetq-jms.xml
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/HornetqBootstrapListener.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/RestMessagingBootstrapListener.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/ServletContextBindingRegistry.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/FilePushStore.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/CustomHeaderLinkStrategy.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/TimeoutTask.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/RawRestartTest.java
branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/TransformTest.java
branches/HORNETQ-515/src/config/common/schema/hornetq-jms.xsd
branches/HORNETQ-515/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/HORNETQ-515/src/main/org/hornetq/api/jms/HornetQJMSClient.java
branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/ConnectionFactoryFactory.java
branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/impl/JNDIConnectionFactoryFactory.java
branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
branches/HORNETQ-515/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/HORNETQ-515/src/main/org/hornetq/jms/server/JMSServerManager.java
branches/HORNETQ-515/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
branches/HORNETQ-515/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/HORNETQ-515/src/main/org/hornetq/ra/HornetQResourceAdapter.java
branches/HORNETQ-515/tests/jms-tests/config/hornetq-jms.xml
branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionClosedTest.java
branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java
branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java
branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/message/JMSXDeliveryCountTest.java
branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
branches/HORNETQ-515/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
branches/HORNETQ-515/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
full code changes
Modified: branches/HORNETQ-515/examples/javaee/xarecovery/server/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/javaee/xarecovery/server/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/javaee/xarecovery/server/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -16,4 +17,4 @@
<queue name="testQueue">
<entry name="/queue/testQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: branches/HORNETQ-515/examples/jms/client-kickoff/server0/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/jms/client-kickoff/server0/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/client-kickoff/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -2,7 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
+ <connection-factory name="ConnectionFactory" signature="queue">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -11,4 +11,4 @@
</entries>
</connection-factory>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: branches/HORNETQ-515/examples/jms/jms-bridge/server0/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/jms/jms-bridge/server0/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/jms-bridge/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -11,6 +12,15 @@
</entries>
</connection-factory>
+ <connection-factory name="ClientConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/client/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<topic name="topic">
<entry name="/source/topic"/>
</topic>
Modified: branches/HORNETQ-515/examples/jms/jms-bridge/server1/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/jms/jms-bridge/server1/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/jms-bridge/server1/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -11,6 +12,15 @@
</entries>
</connection-factory>
+ <connection-factory name="ClientConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/client/ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
<queue name="target">
<entry name="/target/queue"/>
</queue>
Modified: branches/HORNETQ-515/examples/jms/jms-bridge/src/org/hornetq/jms/example/JMSBridgeExample.java
===================================================================
--- branches/HORNETQ-515/examples/jms/jms-bridge/src/org/hornetq/jms/example/JMSBridgeExample.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/jms-bridge/src/org/hornetq/jms/example/JMSBridgeExample.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -55,7 +55,7 @@
try
{
// Step 2. Lookup the *source* JMS resources
- ConnectionFactory sourceConnectionFactory = (ConnectionFactory)sourceContext.lookup("/source/ConnectionFactory");
+ ConnectionFactory sourceConnectionFactory = (ConnectionFactory)sourceContext.lookup("/client/ConnectionFactory");
Topic sourceTopic = (Topic)sourceContext.lookup("/source/topic");
// Step 3. Create a connection, a session and a message producer for the *source* topic
@@ -75,7 +75,7 @@
sourceConnection.close();
// Step 6. Lookup the *target* JMS resources
- ConnectionFactory targetConnectionFactory = (ConnectionFactory)targetContext.lookup("/target/ConnectionFactory");
+ ConnectionFactory targetConnectionFactory = (ConnectionFactory)targetContext.lookup("/client/ConnectionFactory");
Queue targetQueue = (Queue)targetContext.lookup("/target/queue");
// Step 7. Create a connection, a session and a message consumer for the *target* queue
Modified: branches/HORNETQ-515/examples/jms/jmx/server0/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/jms/jmx/server0/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/jmx/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -2,7 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
+ <connection-factory name="ConnectionFactory" signature="queue">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -16,4 +16,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: branches/HORNETQ-515/examples/jms/management/server0/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/jms/management/server0/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/management/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -2,7 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
+ <connection-factory name="ConnectionFactory" signature="queue">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
@@ -15,4 +15,4 @@
<queue name="exampleQueue">
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: branches/HORNETQ-515/examples/jms/message-counters/server0/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/jms/message-counters/server0/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/message-counters/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -2,7 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
+ <connection-factory name="ConnectionFactory" signature="queue">
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -21,4 +21,4 @@
<entry name="/queue/expiryQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: branches/HORNETQ-515/examples/jms/queue-requestor/server0/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/jms/queue-requestor/server0/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/queue-requestor/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -2,7 +2,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
- <connection-factory name="ConnectionFactory">
+ <connection-factory name="ConnectionFactory" signature="queue">
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -16,4 +16,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: branches/HORNETQ-515/examples/jms/symmetric-cluster/src/org/hornetq/jms/example/SymmetricClusterExample.java
===================================================================
--- branches/HORNETQ-515/examples/jms/symmetric-cluster/src/org/hornetq/jms/example/SymmetricClusterExample.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/symmetric-cluster/src/org/hornetq/jms/example/SymmetricClusterExample.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -23,6 +23,7 @@
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.common.example.HornetQExample;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* This example demonstrates a cluster of three nodes set up in a symmetric topology - i.e. each node
@@ -79,7 +80,7 @@
// connection factory directly we avoid having to worry about a JNDI look-up.
// In an app server environment you could use HA-JNDI to lookup from the clustered JNDI servers without
// having to know about a specific one.
- ConnectionFactory cf = HornetQJMSClient.createConnectionFactory("231.7.7.7", 9876);
+ ConnectionFactory cf = (ConnectionFactory)HornetQJMSClient.createConnectionFactory("231.7.7.7", 9876, JMSFactoryType.CF);
// We give a little while for each server to broadcast its whereabouts to the client
Thread.sleep(2000);
Modified: branches/HORNETQ-515/examples/jms/xa-heuristic/server0/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/jms/xa-heuristic/server0/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/xa-heuristic/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -16,4 +17,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: branches/HORNETQ-515/examples/jms/xa-receive/server0/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/jms/xa-receive/server0/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/xa-receive/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -16,4 +17,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: branches/HORNETQ-515/examples/jms/xa-send/server0/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/jms/xa-send/server0/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/xa-send/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -16,4 +17,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: branches/HORNETQ-515/examples/jms/xa-with-jta/server0/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/jms/xa-with-jta/server0/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/jms/xa-with-jta/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
@@ -16,4 +17,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</configuration>
\ No newline at end of file
+</configuration>
Modified: branches/HORNETQ-515/examples/soak/tx-restarts/server0/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/examples/soak/tx-restarts/server0/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/examples/soak/tx-restarts/server0/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -3,6 +3,7 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
+ <xa>true</xa>
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/HornetqBootstrapListener.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/HornetqBootstrapListener.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/HornetqBootstrapListener.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -14,7 +14,6 @@
{
private EmbeddedJMS jms;
- @Override
public void contextInitialized(ServletContextEvent contextEvent)
{
ServletContext context = contextEvent.getServletContext();
@@ -30,7 +29,6 @@
}
}
- @Override
public void contextDestroyed(ServletContextEvent servletContextEvent)
{
try
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/RestMessagingBootstrapListener.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/RestMessagingBootstrapListener.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/RestMessagingBootstrapListener.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -16,7 +16,6 @@
{
MessageServiceManager manager;
- @Override
public void contextInitialized(ServletContextEvent contextEvent)
{
ServletContext context = contextEvent.getServletContext();
@@ -44,7 +43,6 @@
}
}
- @Override
public void contextDestroyed(ServletContextEvent servletContextEvent)
{
if (manager != null)
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/ServletContextBindingRegistry.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/ServletContextBindingRegistry.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/ServletContextBindingRegistry.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -17,37 +17,31 @@
this.servletContext = servletContext;
}
- @Override
public Object lookup(String name)
{
return servletContext.getAttribute(name);
}
- @Override
public boolean bind(String name, Object obj)
{
servletContext.setAttribute(name, obj);
return true;
}
- @Override
public void unbind(String name)
{
servletContext.removeAttribute(name);
}
- @Override
public void close()
{
}
- @Override
public Object getContext()
{
return servletContext;
}
- @Override
public void setContext(Object o)
{
servletContext = (ServletContext)o;
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -78,7 +78,6 @@
private Object timeoutLock = new Object();
- @Override
public void testTimeout(String target)
{
synchronized (timeoutLock)
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/FilePushStore.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/FilePushStore.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/FilePushStore.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -57,7 +57,6 @@
return list;
}
- @Override
public synchronized List<PushRegistration> getByDestination(String destination)
{
List<PushRegistration> list = new ArrayList<PushRegistration>();
@@ -71,7 +70,6 @@
return list;
}
- @Override
public synchronized void update(PushRegistration reg) throws Exception
{
if (reg.getLoadedFrom() == null) return;
@@ -86,7 +84,6 @@
marshaller.marshal(reg, (File) reg.getLoadedFrom());
}
- @Override
public synchronized void add(PushRegistration reg) throws Exception
{
map.put(reg.getId(), reg);
@@ -97,7 +94,6 @@
save(reg);
}
- @Override
public synchronized void remove(PushRegistration reg) throws Exception
{
map.remove(reg.getId());
@@ -106,7 +102,6 @@
fp.delete();
}
- @Override
public synchronized void removeAll() throws Exception
{
ArrayList<PushRegistration> copy = new ArrayList<PushRegistration>();
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -100,7 +100,6 @@
}
}
- @Override
public void onMessage(ClientMessage clientMessage)
{
if (strategy.push(clientMessage) == false)
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -30,7 +30,6 @@
protected String method;
protected String contentType;
- @Override
public void setRegistration(PushRegistration reg)
{
this.registration = reg;
@@ -63,12 +62,10 @@
}
}
- @Override
public void stop()
{
}
- @Override
public boolean push(ClientMessage message)
{
String uri = createUri(message);
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -18,7 +18,6 @@
super(dirname);
}
- @Override
public synchronized List<PushTopicRegistration> getByTopic(String topic)
{
List<PushTopicRegistration> list = new ArrayList<PushTopicRegistration>();
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -84,7 +84,6 @@
private Object timeoutLock = new Object();
- @Override
public void testTimeout(String target)
{
synchronized (timeoutLock)
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/CustomHeaderLinkStrategy.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/CustomHeaderLinkStrategy.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/CustomHeaderLinkStrategy.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -8,7 +8,6 @@
*/
public class CustomHeaderLinkStrategy implements LinkStrategy
{
- @Override
public void setLinkHeader(Response.ResponseBuilder builder, String title, String rel, String href, String type)
{
String headerName = null;
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/TimeoutTask.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/TimeoutTask.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/TimeoutTask.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -58,7 +58,6 @@
thread.start();
}
- @Override
public void run()
{
while (running)
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -2,6 +2,7 @@
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.rest.HttpHeaderProperty;
import org.hornetq.rest.Jms;
import org.hornetq.rest.queue.QueueDeployment;
@@ -39,7 +40,7 @@
@BeforeClass
public static void setup() throws Exception
{
- connectionFactory = new HornetQConnectionFactory(manager.getQueueManager().getSessionFactory());
+ connectionFactory = new HornetQJMSConnectionFactory(manager.getQueueManager().getSessionFactory());
}
@XmlRootElement
@@ -128,7 +129,6 @@
public static Order order;
public static CountDownLatch latch = new CountDownLatch(1);
- @Override
public void onMessage(Message message)
{
try
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/RawRestartTest.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/RawRestartTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/RawRestartTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -80,7 +80,6 @@
private static class MyListener implements MessageHandler
{
- @Override
public void onMessage(ClientMessage message)
{
int size = message.getBodyBuffer().readInt();
Modified: branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/TransformTest.java
===================================================================
--- branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/TransformTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/TransformTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -180,7 +180,6 @@
public static Order order;
public static CountDownLatch latch = new CountDownLatch(1);
- @Override
public void onMessage(ClientMessage clientMessage)
{
System.out.println("onMessage!");
Modified: branches/HORNETQ-515/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- branches/HORNETQ-515/src/config/common/schema/hornetq-jms.xsd 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/config/common/schema/hornetq-jms.xsd 2010-10-10 12:15:41 UTC (rev 9762)
@@ -26,6 +26,7 @@
<xsd:element name="connection-factory">
<xsd:complexType>
<xsd:all>
+ <xsd:element name="xa" type="xsd:boolean" maxOccurs="1" minOccurs="0"></xsd:element>
<xsd:element name="discovery-group-ref" type="discovery-group-refType" maxOccurs="1" minOccurs="0"></xsd:element>
<xsd:element name="discovery-initial-wait-timeout" type="xsd:long" maxOccurs="1" minOccurs="0"></xsd:element>
@@ -135,6 +136,7 @@
</xsd:element>
</xsd:all>
<xsd:attribute name="name" type="xsd:string"></xsd:attribute>
+ <xsd:attribute name="signature" type="xsd:string"></xsd:attribute>
</xsd:complexType>
</xsd:element>
Modified: branches/HORNETQ-515/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -91,6 +91,8 @@
public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;
public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;
+
+ public static final boolean DEFAULT_XA = false;
/**
* Creates a ClientSessionFactory using all the defaults.
Modified: branches/HORNETQ-515/src/main/org/hornetq/api/jms/HornetQJMSClient.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/api/jms/HornetQJMSClient.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/api/jms/HornetQJMSClient.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -23,6 +23,13 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.jms.client.HornetQQueueConnectionFactory;
+import org.hornetq.jms.client.HornetQTopicConnectionFactory;
+import org.hornetq.jms.client.HornetQXAConnectionFactory;
+import org.hornetq.jms.client.HornetQXAQueueConnectionFactory;
+import org.hornetq.jms.client.HornetQXATopicConnectionFactory;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* A utility class for creating HornetQ client-side JMS managed resources.
@@ -49,9 +56,9 @@
* @param sessionFactory The underlying ClientSessionFactory to use.
* @return The HornetQConnectionFactory.
*/
- public static HornetQConnectionFactory createConnectionFactory(final ClientSessionFactory sessionFactory)
+ public static HornetQJMSConnectionFactory createConnectionFactory(final ClientSessionFactory sessionFactory)
{
- return new HornetQConnectionFactory(sessionFactory);
+ return new HornetQJMSConnectionFactory(sessionFactory);
}
/**
@@ -59,11 +66,38 @@
*
* @param discoveryAddress The address to use for discovery.
* @param discoveryPort The port to use for discovery.
+ * @param jmsFactoryType
* @return The HornetQConnectionFactory.
*/
- public static HornetQConnectionFactory createConnectionFactory(final String discoveryAddress, final int discoveryPort)
+ public static HornetQConnectionFactory createConnectionFactory(final String discoveryAddress, final int discoveryPort, JMSFactoryType jmsFactoryType)
{
- return new HornetQConnectionFactory(discoveryAddress, discoveryPort);
+ HornetQConnectionFactory factory = null;
+ if (jmsFactoryType.equals(JMSFactoryType.CF))
+ {
+ factory = new HornetQJMSConnectionFactory(discoveryAddress, discoveryPort);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_CF))
+ {
+ factory = new HornetQQueueConnectionFactory(discoveryAddress, discoveryPort);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_CF))
+ {
+ factory = new HornetQTopicConnectionFactory(discoveryAddress, discoveryPort);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.XA_CF))
+ {
+ factory = new HornetQXAConnectionFactory(discoveryAddress, discoveryPort);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_XA_CF))
+ {
+ factory = new HornetQXAQueueConnectionFactory(discoveryAddress, discoveryPort);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_XA_CF))
+ {
+ factory = new HornetQXATopicConnectionFactory(discoveryAddress, discoveryPort);
+ }
+
+ return factory;
}
/**
@@ -72,22 +106,75 @@
* @param staticConnectors The list of TransportConfiguration to use.
* @return The HornetQConnectionFactory.
*/
- public static HornetQConnectionFactory createConnectionFactory(final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors)
+ public static HornetQConnectionFactory createConnectionFactory(final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors,
+ final JMSFactoryType jmsFactoryType)
{
- return new HornetQConnectionFactory(staticConnectors);
+ HornetQConnectionFactory factory = null;
+ if (jmsFactoryType.equals(JMSFactoryType.CF))
+ {
+ factory = new HornetQJMSConnectionFactory(staticConnectors);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_CF))
+ {
+ factory = new HornetQQueueConnectionFactory(staticConnectors);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_CF))
+ {
+ factory = new HornetQTopicConnectionFactory(staticConnectors);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.XA_CF))
+ {
+ factory = new HornetQXAConnectionFactory(staticConnectors);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_XA_CF))
+ {
+ factory = new HornetQXAQueueConnectionFactory(staticConnectors);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_XA_CF))
+ {
+ factory = new HornetQXATopicConnectionFactory(staticConnectors);
+ }
+
+ return factory;
}
/**
* Creates a HornetQConnectionFactory using a single pair of live-backup TransportConfiguration.
*
- * @param connectorConfig The TransportConfiguration of the server to connect to.
- * @param backupConnectorConfig The TransportConfiguration of the backup server to connect to. can be null.
+ * @param connectorConfigs The TransportConfiguration of the server to connect to.
* @return The HornetQConnectionFactory.
*/
public static HornetQConnectionFactory createConnectionFactory(final TransportConfiguration connectorConfig,
- final TransportConfiguration backupConnectorConfig)
+ final TransportConfiguration backupConnectorConfig,
+ final JMSFactoryType jmsFactoryType)
{
- return new HornetQConnectionFactory(connectorConfig, backupConnectorConfig);
+ HornetQConnectionFactory factory = null;
+ if (jmsFactoryType.equals(JMSFactoryType.CF))
+ {
+ factory = new HornetQJMSConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_CF))
+ {
+ factory = new HornetQQueueConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_CF))
+ {
+ factory = new HornetQTopicConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.XA_CF))
+ {
+ factory = new HornetQXAConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_XA_CF))
+ {
+ factory = new HornetQXAQueueConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_XA_CF))
+ {
+ factory = new HornetQXATopicConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+
+ return factory;
}
/**
@@ -96,11 +183,16 @@
* @param connectorConfig The TransportConfiguration of the server.
* @return The HornetQConnectionFactory.
*/
- public static HornetQConnectionFactory createConnectionFactory(final TransportConfiguration connectorConfig)
+ public static HornetQJMSConnectionFactory createConnectionFactory(final TransportConfiguration connectorConfig)
{
- return new HornetQConnectionFactory(connectorConfig);
+ return new HornetQJMSConnectionFactory(connectorConfig);
}
+ public static HornetQXAConnectionFactory createXAConnectionFactory(final TransportConfiguration connectorConfig)
+ {
+ return new HornetQXAConnectionFactory(connectorConfig);
+ }
+
/**
* Creates a client-side representation of a JMS Topic.
*
Modified: branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/ConnectionFactoryFactory.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/ConnectionFactoryFactory.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/ConnectionFactoryFactory.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -13,8 +13,6 @@
package org.hornetq.jms.bridge;
-import javax.jms.ConnectionFactory;
-
/**
* A ConnectionFactoryFactory
*
@@ -26,5 +24,5 @@
*/
public interface ConnectionFactoryFactory
{
- ConnectionFactory createConnectionFactory() throws Exception;
+ Object createConnectionFactory() throws Exception;
}
Modified: branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -969,7 +969,7 @@
{
Connection conn;
- ConnectionFactory cf = cff.createConnectionFactory();
+ Object cf = cff.createConnectionFactory();
if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE && !(cf instanceof XAConnectionFactory))
{
@@ -992,7 +992,7 @@
{
JMSBridgeImpl.log.trace("Creating a non XA connection");
}
- conn = cf.createConnection();
+ conn = ((ConnectionFactory)cf).createConnection();
}
}
else
@@ -1011,7 +1011,7 @@
{
JMSBridgeImpl.log.trace("Creating a non XA connection");
}
- conn = cf.createConnection(username, password);
+ conn = ((ConnectionFactory)cf).createConnection(username, password);
}
}
Modified: branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/impl/JNDIConnectionFactoryFactory.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/impl/JNDIConnectionFactoryFactory.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/bridge/impl/JNDIConnectionFactoryFactory.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -15,8 +15,6 @@
import java.util.Hashtable;
-import javax.jms.ConnectionFactory;
-
import org.hornetq.jms.bridge.ConnectionFactoryFactory;
@@ -36,9 +34,9 @@
super(jndiProperties, lookup);
}
- public ConnectionFactory createConnectionFactory() throws Exception
+ public Object createConnectionFactory() throws Exception
{
- return (ConnectionFactory)createObject();
+ return createObject();
}
}
Modified: branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -17,18 +17,12 @@
import java.util.List;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
import javax.jms.XAConnection;
-import javax.jms.XAConnectionFactory;
import javax.jms.XAQueueConnection;
-import javax.jms.XAQueueConnectionFactory;
import javax.jms.XATopicConnection;
-import javax.jms.XATopicConnectionFactory;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
@@ -48,8 +42,7 @@
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt> $Id$
*/
-public class HornetQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
- XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory, Serializable, Referenceable
+public class HornetQConnectionFactory implements Serializable, Referenceable
{
// Constants ------------------------------------------------------------------------------------
Added: branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQJMSConnectionFactory.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQJMSConnectionFactory.java (rev 0)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQJMSConnectionFactory.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.ConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+
+
+/**
+ * A class that represents a ConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class HornetQJMSConnectionFactory extends HornetQConnectionFactory implements ConnectionFactory
+{
+
+ private final static long serialVersionUID = -2810634789345348326L;
+
+ public HornetQJMSConnectionFactory(TransportConfiguration transportConfiguration)
+ {
+ super(transportConfiguration);
+ }
+
+ public HornetQJMSConnectionFactory(ClientSessionFactory sessionFactory)
+ {
+ super(sessionFactory);
+ }
+
+ public HornetQJMSConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQJMSConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQJMSConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+}
Added: branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java (rev 0)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.QueueConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+
+/**
+ * A class that represents a QueueConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class HornetQQueueConnectionFactory extends HornetQConnectionFactory implements QueueConnectionFactory
+{
+ private static final long serialVersionUID = 5312455021322463546L;
+
+ public HornetQQueueConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQQueueConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQQueueConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+}
Added: branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java (rev 0)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.TopicConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+
+/**
+ * A class that represents a TopicConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class HornetQTopicConnectionFactory extends HornetQConnectionFactory implements TopicConnectionFactory
+{
+ private static final long serialVersionUID = 7317051989866548455L;
+
+ public HornetQTopicConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQTopicConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQTopicConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+}
Added: branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java (rev 0)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.XAConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+
+/**
+ * A class that represents a XAConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class HornetQXAConnectionFactory extends HornetQConnectionFactory implements XAConnectionFactory
+{
+ private static final long serialVersionUID = 743611571839154115L;
+
+ public HornetQXAConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQXAConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQXAConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+
+ public HornetQXAConnectionFactory(TransportConfiguration connectorConfig)
+ {
+ super(connectorConfig);
+ }
+}
Added: branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java (rev 0)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.XAQueueConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+
+/**
+ * A class that represents a XAQueueConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class HornetQXAQueueConnectionFactory extends HornetQConnectionFactory implements XAQueueConnectionFactory
+{
+ private static final long serialVersionUID = 8612457847251087454L;
+
+ public HornetQXAQueueConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQXAQueueConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQXAQueueConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+}
Added: branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java (rev 0)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.XATopicConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+
+/**
+ * A class that represents a XATopicConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class HornetQXATopicConnectionFactory extends HornetQConnectionFactory implements XATopicConnectionFactory
+{
+ private static final long serialVersionUID = -7018290426884419693L;
+
+ public HornetQXATopicConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQXATopicConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQXATopicConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+}
Modified: branches/HORNETQ-515/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -38,6 +38,7 @@
import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.management.impl.MBeanInfoHelper;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@@ -163,7 +164,7 @@
{
TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
- server.createConnectionFactory(name, liveTC, JMSServerControlImpl.convert(jndiBindings));
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.CF, JMSServerControlImpl.convert(jndiBindings));
sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
}
@@ -173,6 +174,121 @@
}
}
+ public void createXAConnectionFactory(final String name,
+ final String liveTransportClassName,
+ final Map<String, Object> liveTransportParams,
+ final Object[] jndiBindings) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.XA_CF, JMSServerControlImpl.convert(jndiBindings));
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public void createQueueConnectionFactory(final String name,
+ final String liveTransportClassName,
+ final Map<String, Object> liveTransportParams,
+ final Object[] jndiBindings) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.QUEUE_CF, JMSServerControlImpl.convert(jndiBindings));
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public void createTopicConnectionFactory(final String name,
+ final String liveTransportClassName,
+ final Map<String, Object> liveTransportParams,
+ final Object[] jndiBindings) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.TOPIC_CF, JMSServerControlImpl.convert(jndiBindings));
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public void createXAQueueConnectionFactory(final String name,
+ final String liveTransportClassName,
+ final Map<String, Object> liveTransportParams,
+ final Object[] jndiBindings) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.QUEUE_XA_CF, JMSServerControlImpl.convert(jndiBindings));
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public void createXATopicConnectionFactory(final String name,
+ final String liveTransportClassName,
+ final Map<String, Object> liveTransportParams,
+ final Object[] jndiBindings) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.TOPIC_XA_CF, JMSServerControlImpl.convert(jndiBindings));
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
public void createConnectionFactory(final String name,
final Object[] liveConnectorsTransportClassNames,
final Object[] liveConnectorTransportParams,
Modified: branches/HORNETQ-515/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.spi.core.naming.BindingRegistry;
/**
@@ -170,7 +171,7 @@
TransportConfiguration backupTC,
String ... bindings) throws Exception;
- void createConnectionFactory(String name, TransportConfiguration liveTC, String ... bindings) throws Exception;
+ void createConnectionFactory(String name, TransportConfiguration liveTC, JMSFactoryType cfType, String ... bindings) throws Exception;
void createConnectionFactory(String name,
String clientID,
@@ -222,6 +223,7 @@
boolean failoverOnInitialConnection,
boolean failoverOnServerShutdown,
String groupId,
+ JMSFactoryType factoryType,
String ... bindings) throws Exception;
void createConnectionFactory(String name,
Modified: branches/HORNETQ-515/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* A ConnectionFactoryConfiguration
@@ -202,4 +203,8 @@
String getGroupID();
void setGroupID(String groupID);
+
+ void setFactoryType(JMSFactoryType factType);
+
+ JMSFactoryType getFactoryType();
}
Modified: branches/HORNETQ-515/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.utils.BufferHelper;
import org.hornetq.utils.DataConstants;
@@ -115,6 +116,8 @@
private String groupID = null;
+ private JMSFactoryType factoryType = JMSFactoryType.CF;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -671,6 +674,8 @@
failoverOnServerShutdown = buffer.readBoolean();
groupID = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ factoryType = JMSFactoryType.valueOf(buffer.readInt());
}
/* (non-Javadoc)
@@ -762,6 +767,8 @@
buffer.writeBoolean(failoverOnServerShutdown);
BufferHelper.writeAsNullableSimpleString(buffer, groupID);
+
+ buffer.writeInt(factoryType.intValue());
}
@@ -860,8 +867,20 @@
DataConstants.SIZE_BOOLEAN + // failoverOnServerShutdown
- BufferHelper.sizeOfNullableSimpleString(groupID);
+ BufferHelper.sizeOfNullableSimpleString(groupID) +
+
+ DataConstants.SIZE_INT; //factoryType
}
+
+ public void setFactoryType(JMSFactoryType factoryType)
+ {
+ this.factoryType = factoryType;
+ }
+
+ public JMSFactoryType getFactoryType()
+ {
+ return factoryType;
+ }
// Public --------------------------------------------------------
Added: branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSFactoryType.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSFactoryType.java (rev 0)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSFactoryType.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.impl;
+
+/**
+ * A JMSFactoryType
+ *
+ * @author howard
+ *
+ *
+ */
+public enum JMSFactoryType
+{
+ CF, QUEUE_CF, TOPIC_CF, XA_CF, QUEUE_XA_CF, TOPIC_XA_CF;
+
+ public int intValue()
+ {
+ int val = 0;
+ switch (this)
+ {
+ case CF:
+ val = 0;
+ break;
+ case QUEUE_CF:
+ val = 1;
+ break;
+ case TOPIC_CF:
+ val = 2;
+ break;
+ case XA_CF:
+ val = 3;
+ break;
+ case QUEUE_XA_CF:
+ val = 4;
+ break;
+ case TOPIC_XA_CF:
+ val = 5;
+ break;
+ }
+ return val;
+ }
+
+ public static JMSFactoryType valueOf(int val)
+ {
+ JMSFactoryType type;
+ switch (val)
+ {
+ case 0:
+ type = CF;
+ break;
+ case 1:
+ type = QUEUE_CF;
+ break;
+ case 2:
+ type = TOPIC_CF;
+ break;
+ case 3:
+ type = XA_CF;
+ break;
+ case 4:
+ type = QUEUE_XA_CF;
+ break;
+ case 5:
+ type = TOPIC_XA_CF;
+ break;
+ default:
+ type = XA_CF;
+ break;
+ }
+ return type;
+ }
+}
Modified: branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -205,7 +205,14 @@
Element e = (Element)node;
String name = node.getAttributes().getNamedItem(JMSServerConfigParserImpl.NAME_ATTR).getNodeValue();
+
+ String fact = e.getAttribute("signature");
+ boolean isXA = XMLConfigurationUtil.getBoolean(e,
+ "xa",
+ HornetQClient.DEFAULT_XA);
+ JMSFactoryType factType = resolveFactoryType(fact, isXA);
+
long clientFailureCheckPeriod = XMLConfigurationUtil.getLong(e,
"client-failure-check-period",
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
@@ -381,6 +388,7 @@
cfConfig.setConnectorNames(connectorNames);
}
+ cfConfig.setFactoryType(factType);
cfConfig.setClientID(clientID);
cfConfig.setClientFailureCheckPeriod(clientFailureCheckPeriod);
cfConfig.setConnectionTTL(connectionTTL);
@@ -413,6 +421,46 @@
return cfConfig;
}
+ private JMSFactoryType resolveFactoryType(String fact, boolean isXA) throws HornetQException
+ {
+ if ("".equals(fact))
+ {
+ fact = "generic";
+ }
+ if (isXA)
+ {
+ if ("generic".equals(fact))
+ {
+ return JMSFactoryType.XA_CF;
+ }
+ if ("queue".equals(fact))
+ {
+ return JMSFactoryType.QUEUE_XA_CF;
+ }
+ if ("topic".equals(fact))
+ {
+ return JMSFactoryType.TOPIC_XA_CF;
+ }
+ }
+ else
+ {
+ if ("generic".equals(fact))
+ {
+ return JMSFactoryType.CF;
+ }
+ if ("queue".equals(fact))
+ {
+ return JMSFactoryType.QUEUE_CF;
+ }
+ if ("topic".equals(fact))
+ {
+ return JMSFactoryType.TOPIC_CF;
+ }
+ }
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Invalid signature " + fact +
+ " at parseConnectionFactory");
+ }
+
/**
* hook for integration layers
* @param topicName
Modified: branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -23,7 +23,6 @@
import javax.naming.Context;
import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
import javax.naming.NamingException;
import org.hornetq.api.core.HornetQException;
@@ -754,6 +753,7 @@
final boolean failoverOnInitialConnection,
final boolean failoverOnServerShutdown,
final String groupId,
+ final JMSFactoryType factoryType,
String... jndiBindings) throws Exception
{
checkInitialised();
@@ -790,6 +790,7 @@
configuration.setFailoverOnInitialConnection(failoverOnInitialConnection);
configuration.setFailoverOnServerShutdown(failoverOnServerShutdown);
configuration.setGroupID(groupId);
+ configuration.setFactoryType(factoryType);
createConnectionFactory(true, configuration, jndiBindings);
}
}
@@ -969,13 +970,14 @@
final int reconnectAttempts,
final boolean failoverOnInitialConnection,
final boolean failoverOnServerShutdown,
- final String groupId) throws Exception
+ final String groupId,
+ final JMSFactoryType jmsFactoryType) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort);
+ cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort, jmsFactoryType);
cf.setClientID(clientID);
cf.setLocalBindAddress(localBindAddress);
cf.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
@@ -1043,13 +1045,14 @@
final int reconnectAttempts,
final boolean failoverOnInitialConnection,
final boolean failoverOnServerShutdown,
- final String groupId) throws Exception
+ final String groupId,
+ final JMSFactoryType jmsFactoryType) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(connectorConfigs);
+ cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(connectorConfigs, jmsFactoryType);
cf.setClientID(clientID);
cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
cf.setConnectionTTL(connectionTTL);
@@ -1213,7 +1216,8 @@
cfConfig.getReconnectAttempts(),
cfConfig.isFailoverOnInitialConnection(),
cfConfig.isFailoverOnServerShutdown(),
- cfConfig.getGroupID());
+ cfConfig.getGroupID(),
+ cfConfig.getFactoryType());
}
else
{
@@ -1247,7 +1251,8 @@
cfConfig.getReconnectAttempts(),
cfConfig.isFailoverOnInitialConnection(),
cfConfig.isFailoverOnServerShutdown(),
- cfConfig.getGroupID());
+ cfConfig.getGroupID(),
+ cfConfig.getFactoryType());
}
connectionFactories.put(cfConfig.getName(), cf);
@@ -1258,6 +1263,7 @@
public synchronized void createConnectionFactory(final String name,
final TransportConfiguration liveTC,
+ final JMSFactoryType cfType,
final String... jndiBindings) throws Exception
{
checkInitialised();
@@ -1265,6 +1271,7 @@
if (cf == null)
{
ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, liveTC);
+ configuration.setFactoryType(cfType);
createConnectionFactory(true, configuration, jndiBindings);
}
}
Modified: branches/HORNETQ-515/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/HORNETQ-515/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -35,6 +35,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.logging.Logger;
import org.hornetq.ra.inflow.HornetQActivation;
@@ -1389,13 +1390,13 @@
: new TransportConfiguration(backUpCOnnectorClassname,
backupConnectionParams);
- cf = HornetQJMSClient.createConnectionFactory(transportConf, backup);
+ cf = HornetQJMSClient.createConnectionFactory(transportConf, backup, JMSFactoryType.XA_CF);
}
else if (discoveryAddress != null)
{
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
: getDiscoveryPort();
- cf = HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort);
+ cf = HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort, JMSFactoryType.XA_CF);
}
else
{
Modified: branches/HORNETQ-515/tests/jms-tests/config/hornetq-jms.xml
===================================================================
--- branches/HORNETQ-515/tests/jms-tests/config/hornetq-jms.xml 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/jms-tests/config/hornetq-jms.xml 2010-10-10 12:15:41 UTC (rev 9762)
@@ -15,4 +15,112 @@
</entries>
</connection-factory>
-</configuration>
\ No newline at end of file
+ <connection-factory name="JMSConnectionFactory1">
+ <xa>true</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_XA_TRUE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory2">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_XA_FALSE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory3" signature="generic">
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_GENERIC"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory4" signature="generic">
+ <xa>true</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_GENERIC_XA_TRUE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory5" signature="generic">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_GENERIC_XA_FALSE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory6" signature="queue">
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_QUEUE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory7" signature="queue">
+ <xa>true</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_QUEUE_XA_TRUE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory8" signature="queue">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_QUEUE_XA_FALSE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory9" signature="topic">
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_TOPIC"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory10" signature="topic">
+ <xa>true</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_TOPIC_XA_TRUE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory11" signature="topic">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_TOPIC_XA_FALSE"/>
+ </entries>
+ </connection-factory>
+
+</configuration>
+
Modified: branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -25,6 +25,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* Safeguards for previously detected TCK failures.
@@ -95,6 +96,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
null,
+ JMSFactoryType.CF,
"/StrictTCKConnectionFactory");
CTSMiscellaneousTest.cf = (HornetQConnectionFactory)getInitialContext().lookup("/StrictTCKConnectionFactory");
Modified: branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionClosedTest.java
===================================================================
--- branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionClosedTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionClosedTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -66,8 +66,8 @@
/** See TCK test: topicconntests.connNotStartedTopicTest */
public void testCannotReceiveMessageOnStoppedConnection() throws Exception
{
- TopicConnection conn1 = ((TopicConnectionFactory)JMSTestCase.cf).createTopicConnection();
- TopicConnection conn2 = ((TopicConnectionFactory)JMSTestCase.cf).createTopicConnection();
+ TopicConnection conn1 = ((TopicConnectionFactory)JMSTestCase.topicCf).createTopicConnection();
+ TopicConnection conn2 = ((TopicConnectionFactory)JMSTestCase.topicCf).createTopicConnection();
TopicSession sess1 = conn1.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSession sess2 = conn2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Modified: branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java
===================================================================
--- branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -28,7 +28,11 @@
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XAQueueConnectionFactory;
+import javax.jms.XATopicConnectionFactory;
+import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.tests.util.ProxyAssertSupport;
/**
@@ -62,7 +66,7 @@
*/
public void testQueueConnectionFactory() throws Exception
{
- QueueConnectionFactory qcf = (QueueConnectionFactory)JMSTestCase.ic.lookup("/ConnectionFactory");
+ QueueConnectionFactory qcf = (QueueConnectionFactory)JMSTestCase.ic.lookup("/CF_QUEUE_XA_FALSE");
QueueConnection qc = qcf.createQueueConnection();
qc.close();
}
@@ -73,7 +77,7 @@
*/
public void testTopicConnectionFactory() throws Exception
{
- TopicConnectionFactory qcf = (TopicConnectionFactory)JMSTestCase.ic.lookup("/ConnectionFactory");
+ TopicConnectionFactory qcf = (TopicConnectionFactory)JMSTestCase.ic.lookup("/CF_TOPIC_XA_FALSE");
TopicConnection tc = qcf.createTopicConnection();
tc.close();
}
@@ -107,7 +111,7 @@
// the ConnectionFactories that ship with HornetQ do not have their clientID
// administratively configured.
- ConnectionFactory cf = (ConnectionFactory)JMSTestCase.ic.lookup("/ConnectionFactory");
+ ConnectionFactory cf = (ConnectionFactory)JMSTestCase.ic.lookup("/CF_XA_FALSE");
Connection c = cf.createConnection();
ProxyAssertSupport.assertNull(c.getClientID());
@@ -120,7 +124,7 @@
// the ConnectionFactories that ship with HornetQ do not have their clientID
// administratively configured.
- ConnectionFactory cf = (ConnectionFactory)JMSTestCase.ic.lookup("/ConnectionFactory");
+ ConnectionFactory cf = (ConnectionFactory)JMSTestCase.ic.lookup("/CF_XA_FALSE");
Connection c = cf.createConnection();
// set the client id immediately after the connection is created
@@ -320,7 +324,102 @@
}
}
+
+ public void testFactoryTypes() throws Exception
+ {
+ HornetQConnectionFactory factory = null;
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/ConnectionFactory");
+
+ assertTrue(factory instanceof ConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_XA_TRUE");
+
+ assertTrue(factory instanceof XAConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_XA_FALSE");
+
+ assertTrue(factory instanceof ConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_GENERIC");
+
+ assertTrue(factory instanceof ConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_GENERIC_XA_TRUE");
+
+ assertTrue(factory instanceof XAConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_GENERIC_XA_FALSE");
+
+ assertTrue(factory instanceof ConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_QUEUE");
+
+ assertTrue(factory instanceof QueueConnectionFactory);
+ assertEquals(2, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_QUEUE_XA_TRUE");
+
+ assertTrue(factory instanceof XAQueueConnectionFactory);
+ assertEquals(4, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_QUEUE_XA_FALSE");
+
+ assertTrue(factory instanceof QueueConnectionFactory);
+ assertEquals(2, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_TOPIC");
+
+ assertTrue(factory instanceof TopicConnectionFactory);
+ assertEquals(2, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_TOPIC_XA_TRUE");
+
+ assertTrue(factory instanceof XATopicConnectionFactory);
+ assertEquals(4, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_TOPIC_XA_FALSE");
+
+ assertTrue(factory instanceof TopicConnectionFactory);
+ assertEquals(2, getTypes(factory));
+ }
+ private int getTypes(HornetQConnectionFactory factory)
+ {
+ int num = 0;
+ if (factory instanceof ConnectionFactory)
+ {
+ num++;
+ }
+ if (factory instanceof XAConnectionFactory)
+ {
+ num++;
+ }
+ if (factory instanceof QueueConnectionFactory)
+ {
+ num++;
+ }
+ if (factory instanceof TopicConnectionFactory)
+ {
+ num++;
+ }
+ if (factory instanceof XAQueueConnectionFactory)
+ {
+ num++;
+ }
+ if (factory instanceof XATopicConnectionFactory)
+ {
+ num++;
+ }
+ return num;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
===================================================================
--- branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -211,7 +211,7 @@
*/
public void testQueueConnection1() throws Exception
{
- QueueConnectionFactory qcf = JMSTestCase.cf;
+ QueueConnectionFactory qcf = JMSTestCase.queueCf;
QueueConnection qc = qcf.createQueueConnection();
@@ -225,7 +225,7 @@
*/
public void testQueueConnection2() throws Exception
{
- TopicConnectionFactory tcf = JMSTestCase.cf;
+ TopicConnectionFactory tcf = JMSTestCase.topicCf;
TopicConnection tc = tcf.createTopicConnection();
@@ -280,7 +280,7 @@
*/
public void testDurableSubscriberOnQueueConnection() throws Exception
{
- QueueConnection queueConnection = ((QueueConnectionFactory)JMSTestCase.cf).createQueueConnection();
+ QueueConnection queueConnection = ((QueueConnectionFactory)JMSTestCase.queueCf).createQueueConnection();
try
{
Modified: branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java
===================================================================
--- branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -318,12 +318,12 @@
public TopicConnectionFactory getTopicConnectionFactory() throws Exception
{
- return (TopicConnectionFactory)getInitialContext().lookup("/ConnectionFactory");
+ return (TopicConnectionFactory)getInitialContext().lookup("/CF_TOPIC");
}
public XAConnectionFactory getXAConnectionFactory() throws Exception
{
- return (XAConnectionFactory)getInitialContext().lookup("/ConnectionFactory");
+ return (XAConnectionFactory)getInitialContext().lookup("/CF_XA_TRUE");
}
public InitialContext getInitialContext(final int serverid) throws Exception
Modified: branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -22,6 +22,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.jms.client.HornetQQueueConnectionFactory;
+import org.hornetq.jms.client.HornetQTopicConnectionFactory;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
@@ -32,8 +36,12 @@
public class JMSTestCase extends HornetQServerTestCase
{
- protected static HornetQConnectionFactory cf;
+ protected static HornetQJMSConnectionFactory cf;
+ protected static HornetQQueueConnectionFactory queueCf;
+
+ protected static HornetQTopicConnectionFactory topicCf;
+
protected static InitialContext ic;
protected static final String defaultConf = "all";
@@ -91,10 +99,81 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
null,
+ JMSFactoryType.CF,
"/testsuitecf");
- JMSTestCase.cf = (HornetQConnectionFactory)getInitialContext().lookup("/testsuitecf");
+ getJmsServerManager().createConnectionFactory("testsuitecf_queue",
+ connectorConfigs,
+ null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
+ HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE,
+ HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
+ true,
+ true,
+ true,
+ HornetQClient.DEFAULT_AUTO_GROUP,
+ HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
+ HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ HornetQClient.DEFAULT_ACK_BATCH_SIZE,
+ HornetQClient.DEFAULT_ACK_BATCH_SIZE,
+ HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
+ HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_RETRY_INTERVAL,
+ HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
+ HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
+ HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
+ JMSFactoryType.QUEUE_CF,
+ "/testsuitecf_queue");
+ getJmsServerManager().createConnectionFactory("testsuitecf_topic",
+ connectorConfigs,
+ null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
+ HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE,
+ HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
+ true,
+ true,
+ true,
+ HornetQClient.DEFAULT_AUTO_GROUP,
+ HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
+ HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ HornetQClient.DEFAULT_ACK_BATCH_SIZE,
+ HornetQClient.DEFAULT_ACK_BATCH_SIZE,
+ HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
+ HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_RETRY_INTERVAL,
+ HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
+ HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
+ HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
+ JMSFactoryType.TOPIC_CF,
+ "/testsuitecf_topic");
+
+ JMSTestCase.cf = (HornetQJMSConnectionFactory)getInitialContext().lookup("/testsuitecf");
+ JMSTestCase.queueCf = (HornetQQueueConnectionFactory)getInitialContext().lookup("/testsuitecf_queue");
+ JMSTestCase.topicCf = (HornetQTopicConnectionFactory)getInitialContext().lookup("/testsuitecf_topic");
+
assertRemainingMessages(0);
}
Modified: branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
===================================================================
--- branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -27,6 +27,7 @@
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.client.HornetQTopic;
import org.hornetq.jms.referenceable.ConnectionFactoryObjectFactory;
@@ -88,7 +89,7 @@
ProxyAssertSupport.assertTrue(instance instanceof HornetQConnectionFactory);
- HornetQConnectionFactory cf2 = (HornetQConnectionFactory)instance;
+ HornetQJMSConnectionFactory cf2 = (HornetQJMSConnectionFactory)instance;
simpleSendReceive(cf2, HornetQServerTestCase.queue1);
}
Modified: branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/message/JMSXDeliveryCountTest.java
===================================================================
--- branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/message/JMSXDeliveryCountTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/message/JMSXDeliveryCountTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -499,7 +499,7 @@
producer.send(tm);
- xaConn = ((XAConnectionFactory)getConnectionFactory()).createXAConnection();
+ xaConn = ((XAConnectionFactory)getXAConnectionFactory()).createXAConnection();
XASession consumerSess = xaConn.createXASession();
MessageConsumer consumer = consumerSess.createConsumer(HornetQServerTestCase.queue1);
Modified: branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -38,6 +38,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.integration.bootstrap.HornetQBootstrapServer;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.jboss.kernel.plugins.config.property.PropertyKernelConfig;
/**
@@ -321,6 +322,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
Modified: branches/HORNETQ-515/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java
===================================================================
--- branches/HORNETQ-515/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -129,7 +129,19 @@
public void createQueueConnectionFactory(final String name)
{
- createConnectionFactory(name);
+ try
+ {
+ invokeSyncOperation(ResourceNames.JMS_SERVER,
+ "createQueueConnectionFactory",
+ name,
+ NettyConnectorFactory.class.getName(),
+ new HashMap<String, Object>(),
+ new String[] { name });
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e);
+ }
}
public void createTopic(final String name)
@@ -148,7 +160,19 @@
public void createTopicConnectionFactory(final String name)
{
- createConnectionFactory(name);
+ try
+ {
+ invokeSyncOperation(ResourceNames.JMS_SERVER,
+ "createTopicConnectionFactory",
+ name,
+ NettyConnectorFactory.class.getName(),
+ new HashMap<String, Object>(),
+ new String[] { name });
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e);
+ }
}
public void deleteConnectionFactory(final String name)
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -35,6 +35,7 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.UnitTestCase;
@@ -159,6 +160,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
"/cf");
}
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.Configuration;
@@ -172,7 +173,7 @@
public void testDiscoveryConstructor() throws Exception
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(groupAddress, groupPort);
+ HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(groupAddress, groupPort, JMSFactoryType.CF);
assertFactoryParams(cf,
null,
groupAddress,
@@ -219,7 +220,7 @@
backupTC);
staticConnectors.add(pair0);
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(staticConnectors);
+ HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(staticConnectors, JMSFactoryType.CF);
assertFactoryParams(cf,
staticConnectors,
null,
@@ -267,7 +268,7 @@
backupTC);
staticConnectors.add(pair0);
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(liveTC, backupTC);
+ HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(liveTC, backupTC, JMSFactoryType.CF);
assertFactoryParams(cf,
staticConnectors,
null,
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -28,6 +28,7 @@
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import javax.jms.XAConnectionFactory;
import javax.transaction.TransactionManager;
import junit.framework.Assert;
@@ -50,8 +51,9 @@
import org.hornetq.jms.bridge.ConnectionFactoryFactory;
import org.hornetq.jms.bridge.DestinationFactory;
import org.hornetq.jms.bridge.QualityOfServiceMode;
-import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.jms.client.HornetQXAConnectionFactory;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.unit.util.InVMContext;
@@ -73,8 +75,12 @@
protected ConnectionFactoryFactory cff0, cff1;
+ protected ConnectionFactoryFactory cff0xa, cff1xa;
+
protected ConnectionFactory cf0, cf1;
+ protected XAConnectionFactory cf0xa, cf1xa;
+
protected DestinationFactory sourceQueueFactory, targetQueueFactory, localTargetQueueFactory, sourceTopicFactory;
protected Queue sourceQueue, targetQueue, localTargetQueue;
@@ -173,8 +179,12 @@
server0.stop();
cff0 = cff1 = null;
+
+ cff0xa = cff1xa = null;
cf0 = cf1 = null;
+
+ cf0xa = cf1xa = null;
sourceQueueFactory = targetQueueFactory = localTargetQueueFactory = sourceTopicFactory = null;
@@ -203,7 +213,7 @@
{
public ConnectionFactory createConnectionFactory() throws Exception
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
// Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
cf.setReconnectAttempts(0);
@@ -216,14 +226,32 @@
};
- cf0 = cff0.createConnectionFactory();
+ cff0xa = new ConnectionFactoryFactory()
+ {
+ public Object createConnectionFactory() throws Exception
+ {
+ HornetQXAConnectionFactory cf = HornetQJMSClient.createXAConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
+ cf.setReconnectAttempts(0);
+ cf.setBlockOnNonDurableSend(true);
+ cf.setBlockOnDurableSend(true);
+ cf.setCacheLargeMessagesClient(true);
+
+ return cf;
+ }
+
+ };
+
+ cf0 = (ConnectionFactory)cff0.createConnectionFactory();
+ cf0xa = (XAConnectionFactory)cff0xa.createConnectionFactory();
+
cff1 = new ConnectionFactoryFactory()
{
public ConnectionFactory createConnectionFactory() throws Exception
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName(),
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName(),
params1));
// Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
@@ -236,8 +264,27 @@
}
};
- cf1 = cff1.createConnectionFactory();
+ cff1xa = new ConnectionFactoryFactory()
+ {
+ public XAConnectionFactory createConnectionFactory() throws Exception
+ {
+ HornetQXAConnectionFactory cf = (HornetQXAConnectionFactory) HornetQJMSClient.createXAConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName(),
+ params1));
+
+ // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
+ cf.setReconnectAttempts(0);
+ cf.setBlockOnNonDurableSend(true);
+ cf.setBlockOnDurableSend(true);
+ cf.setCacheLargeMessagesClient(true);
+
+ return cf;
+ }
+ };
+
+ cf1 = (ConnectionFactory)cff1.createConnectionFactory();
+ cf1xa = (XAConnectionFactory)cff1xa.createConnectionFactory();
+
sourceQueueFactory = new DestinationFactory()
{
public Destination createDestination() throws Exception
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -18,6 +18,7 @@
import junit.framework.Assert;
import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.bridge.ConnectionFactoryFactory;
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.bridge.impl.JMSBridgeImpl;
@@ -196,10 +197,18 @@
{
JMSBridgeImpl bridge = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ ConnectionFactoryFactory factInUse1 = cff1;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ factInUse1 = cff1xa;
+ }
+
try
{
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse1,
sourceQueueFactory,
targetQueueFactory,
null,
@@ -290,8 +299,8 @@
try
{
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(cff0xa,
+ cff1xa,
sourceQueueFactory,
targetQueueFactory,
null,
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -33,6 +33,7 @@
import org.hornetq.api.jms.HornetQJMSConstants;
import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.bridge.ConnectionFactoryFactory;
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.bridge.impl.JMSBridgeImpl;
import org.hornetq.jms.client.HornetQMessage;
@@ -1444,10 +1445,18 @@
Thread t = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ ConnectionFactoryFactory factInUse1 = cff1;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ factInUse1 = cff1xa;
+ }
+
try
{
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse1,
sourceQueueFactory,
targetQueueFactory,
null,
@@ -1530,10 +1539,18 @@
Thread t = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ ConnectionFactoryFactory factInUse1 = cff1;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ factInUse1 = cff1xa;
+ }
+
try
{
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse1,
sourceQueueFactory,
targetQueueFactory,
null,
@@ -1617,10 +1634,16 @@
Thread t = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ }
+
try
{
- bridge = new JMSBridgeImpl(cff0,
- cff0,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse0,
sourceQueueFactory,
localTargetQueueFactory,
null,
@@ -1699,12 +1722,19 @@
{
JMSBridgeImpl bridge = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ ConnectionFactoryFactory factInUse1 = cff1;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ factInUse1 = cff1xa;
+ }
try
{
final int NUM_MESSAGES = 10;
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse1,
sourceQueueFactory,
targetQueueFactory,
null,
@@ -1770,12 +1800,18 @@
{
JMSBridgeImpl bridge = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ }
+
try
{
final int NUM_MESSAGES = 10;
- bridge = new JMSBridgeImpl(cff0,
- cff0,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse0,
sourceQueueFactory,
localTargetQueueFactory,
null,
@@ -1840,14 +1876,22 @@
{
JMSBridgeImpl bridge = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ ConnectionFactoryFactory factInUse1 = cff1;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ factInUse1 = cff1xa;
+ }
+
try
{
final long MAX_BATCH_TIME = 3000;
final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse1,
sourceQueueFactory,
targetQueueFactory,
null,
@@ -1894,14 +1938,20 @@
{
JMSBridgeImpl bridge = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ }
+
try
{
final long MAX_BATCH_TIME = 3000;
final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
- bridge = new JMSBridgeImpl(cff0,
- cff0,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse0,
sourceQueueFactory,
localTargetQueueFactory,
null,
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -19,7 +19,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
/**
* A AutoGroupingTest
@@ -34,7 +34,7 @@
@Override
protected ConnectionFactory getCF() throws Exception
{
- HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ HornetQJMSConnectionFactory cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
cf.setAutoGroup(true);
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -19,7 +19,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
/**
* A GroupIDTest
@@ -34,7 +34,7 @@
@Override
protected ConnectionFactory getCF() throws Exception
{
- HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ HornetQJMSConnectionFactory cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
cf.setGroupID("wibble");
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -27,6 +27,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.tests.util.JMSTestBase;
/**
@@ -229,6 +230,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -34,6 +34,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.jms.HornetQJMSConstants;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.tests.util.JMSTestBase;
import org.hornetq.tests.util.UnitTestCase;
@@ -328,6 +329,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -34,6 +34,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.JMSTestBase;
@@ -97,6 +98,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
false,
null,
+ JMSFactoryType.CF,
"/cffoo");
cf = (ConnectionFactory)context.lookup("/cffoo");
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -27,6 +27,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.tests.util.JMSTestBase;
import org.hornetq.tests.util.RandomUtil;
@@ -264,6 +265,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -47,6 +47,7 @@
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQSession;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.unit.util.InVMContext;
@@ -159,7 +160,7 @@
{
HornetQConnectionFactory jbcf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"),
new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams));
+ backupParams), JMSFactoryType.CF);
jbcf.setBlockOnDurableSend(true);
jbcf.setBlockOnNonDurableSend(true);
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -30,6 +30,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.tests.util.JMSTestBase;
/**
@@ -175,6 +176,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.tests.util.RandomUtil;
@@ -65,7 +66,7 @@
final long connectionTTL,
final long clientFailureCheckPeriod) throws JMSException
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(connectorFactory));
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(connectorFactory));
cf.setBlockOnNonDurableSend(true);
cf.setBlockOnDurableSend(true);
@@ -103,7 +104,7 @@
public static String[] sendMessages(final Destination destination, final int messagesToSend) throws Exception
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
return JMSUtil.sendMessages(cf, destination, messagesToSend);
}
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -47,7 +47,7 @@
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
-import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
@@ -161,7 +161,7 @@
protected ConnectionFactory createConnectionFactory()
{
- return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ return new HornetQJMSConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
}
protected Socket createSocket() throws IOException
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -53,6 +53,7 @@
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.bridge.impl.JMSBridgeImpl;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.unit.util.InVMContext;
@@ -157,7 +158,7 @@
private static ConnectionFactory createConnectionFactory()
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
// Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
cf.setReconnectAttempts(0);
cf.setBlockOnNonDurableSend(true);
@@ -171,8 +172,10 @@
public void testStartWithRepeatedFailure() throws Exception
{
- HornetQConnectionFactory failingSourceCF = new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
+ HornetQJMSConnectionFactory failingSourceCF = new HornetQJMSConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
{
+ private static final long serialVersionUID = -2142578705002528826L;
+
@Override
public Connection createConnection() throws JMSException
{
@@ -212,8 +215,9 @@
public void testStartWithFailureThenSuccess() throws Exception
{
- HornetQConnectionFactory failingSourceCF = new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
+ HornetQJMSConnectionFactory failingSourceCF = new HornetQJMSConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
{
+ private static final long serialVersionUID = 1274250681150776714L;
boolean firstTime = true;
@Override
@@ -410,8 +414,10 @@
public void testExceptionOnSourceAndRetrySucceeds() throws Exception
{
final AtomicReference<Connection> sourceConn = new AtomicReference<Connection>();
- HornetQConnectionFactory failingSourceCF = new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
+ HornetQJMSConnectionFactory failingSourceCF = new HornetQJMSConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
{
+ private static final long serialVersionUID = -6930787952179727779L;
+
@Override
public Connection createConnection() throws JMSException
{
@@ -460,8 +466,9 @@
public void testExceptionOnSourceAndRetryFails() throws Exception
{
final AtomicReference<Connection> sourceConn = new AtomicReference<Connection>();
- HornetQConnectionFactory failingSourceCF = new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
+ HornetQJMSConnectionFactory failingSourceCF = new HornetQJMSConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
{
+ private static final long serialVersionUID = 4163579449500727852L;
boolean firstTime = true;
@Override
Modified: branches/HORNETQ-515/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- branches/HORNETQ-515/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-10-10 00:51:36 UTC (rev 9761)
+++ branches/HORNETQ-515/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-10-10 12:15:41 UTC (rev 9762)
@@ -29,6 +29,7 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.unit.util.InVMContext;
@@ -213,6 +214,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
13 years, 6 months
JBoss hornetq SVN: r9761 - branches.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-09 20:51:36 -0400 (Sat, 09 Oct 2010)
New Revision: 9761
Added:
branches/HORNETQ-515/
Log:
create a branch for HORNETQ-515
Copied: branches/HORNETQ-515 (from rev 9760, trunk)
13 years, 6 months
JBoss hornetq SVN: r9760 - in trunk: src/main/org/hornetq/core/protocol/core/impl and 9 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-10-08 07:49:58 -0400 (Fri, 08 Oct 2010)
New Revision: 9760
Added:
trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java
trunk/src/main/org/hornetq/spi/core/remoting/Connection.java
trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-533
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -296,9 +296,15 @@
{
handleConnectionFailure(connectionID, me);
}
+
+ public void connectionReadyForWrites(final Object connectionID, final boolean ready)
+ {
+ }
// ConnectionManager implementation ------------------------------------------------------------------
+
+
public ClientSession createSession(final String username,
final String password,
final boolean xa,
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -23,6 +23,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.ReadyListener;
/**
* A CoreSessionCallback
@@ -40,7 +41,7 @@
private ProtocolManager protocolManager;
private String name;
-
+
public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel)
{
this.name = name;
@@ -90,4 +91,15 @@
{
protocolManager.removeHandler(name);
}
+
+ public void addReadyListener(final ReadyListener listener)
+ {
+ channel.getConnection().getTransportConnection().addReadyListener(listener);
+ }
+
+ public void removeReadyListener(final ReadyListener listener)
+ {
+ channel.getConnection().getTransportConnection().removeReadyListener(listener);
+ }
+
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -139,6 +139,10 @@
*/
public synchronized StompFrame decode(final HornetQBuffer buffer) throws Exception
{
+ //log.info("got buff " + buffer.readableBytes());
+
+ long start = System.nanoTime();
+
int readable = buffer.readableBytes();
if (data + readable >= workingBuffer.length)
@@ -301,6 +305,8 @@
throwInvalid();
}
}
+
+ long commandTime = System.nanoTime() - start;
if (readingHeaders)
{
@@ -406,6 +412,8 @@
}
}
}
+
+ long headersTime = System.nanoTime() - start - commandTime;
// Now the body
@@ -447,6 +455,8 @@
}
}
}
+
+
if (content != null)
{
@@ -464,6 +474,12 @@
StompFrame ret = new StompFrame(command, headers, content);
init();
+
+ // log.info("decoded");
+
+ long bodyTime = System.nanoTime() - start - headersTime - commandTime;
+
+ // log.info("command: "+ commandTime + " headers: " + headersTime + " body: " + bodyTime);
return ret;
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -142,11 +142,14 @@
public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
{
+ long start = System.nanoTime();
StompConnection conn = (StompConnection)connection;
conn.setDataReceived();
StompDecoder decoder = conn.getDecoder();
+
+ // log.info("in handle");
do
{
@@ -165,7 +168,7 @@
if (request == null)
{
- return;
+ break;
}
try
@@ -253,6 +256,10 @@
server.getStorageManager().clearContext();
}
} while (decoder.hasBytes());
+
+ long end = System.nanoTime();
+
+ // log.info("handle took " + (end-start));
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -30,6 +30,7 @@
import org.hornetq.core.server.ServerSession;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.ReadyListener;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UUIDGenerator;
@@ -156,7 +157,17 @@
public void closed()
{
}
+
+ public void addReadyListener(final ReadyListener listener)
+ {
+ connection.getTransportConnection().addReadyListener(listener);
+ }
+ public void removeReadyListener(final ReadyListener listener)
+ {
+ connection.getTransportConnection().removeReadyListener(listener);
+ }
+
public void acknowledge(String messageID) throws Exception
{
long id = Long.parseLong(messageID);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -250,6 +250,9 @@
listener.connectionException(connectionID, me);
}
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -22,6 +22,7 @@
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.hornetq.spi.core.remoting.ReadyListener;
import org.hornetq.utils.UUIDGenerator;
/**
@@ -32,6 +33,7 @@
*/
public class InVMConnection implements Connection
{
+
private static final Logger log = Logger.getLogger(InVMConnection.class);
private final BufferHandler handler;
@@ -159,5 +161,12 @@
{
return -1;
}
+
+ public void addReadyListener(ReadyListener listener)
+ {
+ }
+ public void removeReadyListener(ReadyListener listener)
+ {
+ }
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -215,6 +215,12 @@
});
}
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
+
+
+
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -58,8 +58,14 @@
group.add(e.getChannel());
ctx.sendUpstream(e);
}
-
+
@Override
+ public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
+ {
+ listener.connectionReadyForWrites(e.getChannel().getId(), e.getChannel().isWritable());
+ }
+
+ @Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
{
ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -137,7 +137,7 @@
private final HttpKeepAliveRunnable httpKeepAliveRunnable;
- private final ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
+ private final ConcurrentMap<Object, NettyConnection> connections = new ConcurrentHashMap<Object, NettyConnection>();
private final Executor threadPool;
@@ -654,7 +654,7 @@
{
public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
- if (connections.putIfAbsent(connection.getID(), connection) != null)
+ if (connections.putIfAbsent(connection.getID(), (NettyConnection)connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
@@ -683,6 +683,16 @@
}.start();
}
+
+ public void connectionReadyForWrites(final Object connectionID, boolean ready)
+ {
+ NettyConnection conn = connections.get(connectionID);
+
+ if (conn != null)
+ {
+ conn.fireReady(ready);
+ }
+ }
}
private class BatchFlusher implements Runnable
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -13,6 +13,7 @@
package org.hornetq.core.remoting.impl.netty;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hornetq.api.core.HornetQBuffer;
@@ -22,6 +23,8 @@
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.hornetq.spi.core.remoting.ReadyListener;
+import org.hornetq.utils.ConcurrentHashSet;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
@@ -57,6 +60,8 @@
private volatile HornetQBuffer batchBuffer;
private final AtomicBoolean writeLock = new AtomicBoolean(false);
+
+ private Set<ReadyListener> readyListeners = new ConcurrentHashSet<ReadyListener>();
// Static --------------------------------------------------------
@@ -241,6 +246,24 @@
{
return directDeliver;
}
+
+ public void addReadyListener(final ReadyListener listener)
+ {
+ readyListeners.add(listener);
+ }
+
+ public void removeReadyListener(final ReadyListener listener)
+ {
+ readyListeners.remove(listener);
+ }
+
+ public void fireReady(final boolean ready)
+ {
+ for (ReadyListener listener: readyListeners)
+ {
+ listener.readyForWriting(ready);
+ }
+ }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -693,6 +693,12 @@
}
});
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
+
+
}
private class BatchFlusher implements Runnable
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -423,6 +423,10 @@
// Connections should only fail when TTL is exceeded
}
+
+ public void connectionReadyForWrites(final Object connectionID, final boolean ready)
+ {
+ }
public void addInterceptor(final Interceptor interceptor)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -17,6 +17,7 @@
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
@@ -42,6 +43,7 @@
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.ReadyListener;
import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
@@ -54,7 +56,7 @@
*
* @version <tt>$Revision: 3783 $</tt> $Id: ServerConsumerImpl.java 3783 2008-02-25 12:15:14Z timfox $
*/
-public class ServerConsumerImpl implements ServerConsumer
+public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
// Constants ------------------------------------------------------------------------------------
@@ -117,6 +119,12 @@
private final Binding binding;
private boolean transferring = false;
+
+ /* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
+ * This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
+ * write queue when the TCP buffer is full, e.g. the client is slow or has died.
+ */
+ private AtomicBoolean writeReady = new AtomicBoolean(true);
// Constructors ---------------------------------------------------------------------------------
@@ -159,6 +167,8 @@
minLargeMessageSize = session.getMinLargeMessageSize();
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
+
+ this.callback.addReadyListener(this);
if (browseOnly)
{
@@ -177,7 +187,7 @@
{
return id;
}
-
+
public HandleStatus handle(final MessageReference ref) throws Exception
{
if (availableCredits != null && availableCredits.get() <= 0)
@@ -185,6 +195,12 @@
return HandleStatus.BUSY;
}
+// TODO - https://jira.jboss.org/browse/HORNETQ-533
+// if (!writeReady.get())
+// {
+// return HandleStatus.BUSY;
+// }
+
synchronized (lock)
{
// If the consumer is stopped then we don't accept the message, it
@@ -264,6 +280,8 @@
public void close(final boolean failed) throws Exception
{
+ callback.removeReadyListener(this);
+
setStarted(false);
if (largeMessageDeliverer != null)
@@ -584,9 +602,21 @@
return ref;
}
+
+ public void readyForWriting(final boolean ready)
+ {
+ if (ready)
+ {
+ writeReady.set(true);
+
+ promptDelivery();
+ }
+ else
+ {
+ writeReady.set(false);
+ }
+ }
- // Public ---------------------------------------------------------------------------------------
-
/** To be used on tests only */
public AtomicInteger getAvailableCredits()
{
Modified: trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/spi/core/protocol/SessionCallback.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.remoting.ReadyListener;
/**
* A SessionCallback
@@ -34,4 +35,8 @@
int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse);
void closed();
+
+ void addReadyListener(ReadyListener listener);
+
+ void removeReadyListener(ReadyListener listener);
}
Modified: trunk/src/main/org/hornetq/spi/core/remoting/Connection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/Connection.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/spi/core/remoting/Connection.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -70,4 +70,8 @@
* Called periodically to flush any data in the batch buffer
*/
void checkFlushBatchBuffer();
+
+ void addReadyListener(ReadyListener listener);
+
+ void removeReadyListener(ReadyListener listener);
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -44,4 +44,6 @@
* @param me the exception.
*/
void connectionException(Object connectionID, HornetQException me);
+
+ void connectionReadyForWrites(Object connectionID, boolean ready);
}
Added: trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java (rev 0)
+++ trunk/src/main/org/hornetq/spi/core/remoting/ReadyListener.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.remoting;
+
+/**
+ * A ReadyListener
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ReadyListener
+{
+ void readyForWriting(boolean ready);
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -561,5 +561,11 @@
{
me.printStackTrace();
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
+
+
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -66,6 +66,12 @@
public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
+
+
};
Acceptor acceptor = factory.createAcceptor(params,
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -80,6 +80,10 @@
public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
};
NettyAcceptor acceptor = new NettyAcceptor(params,
handler,
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -234,6 +234,10 @@
{
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-10-06 13:49:22 UTC (rev 9759)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-10-08 11:49:58 UTC (rev 9760)
@@ -69,6 +69,9 @@
public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
};
NettyConnector connector = new NettyConnector(params,
@@ -106,6 +109,10 @@
public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
+
+ public void connectionReadyForWrites(Object connectionID, boolean ready)
+ {
+ }
};
try
13 years, 6 months
JBoss hornetq SVN: r9759 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-06 09:49:22 -0400 (Wed, 06 Oct 2010)
New Revision: 9759
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
Log:
fixed jms failover tests
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-10-06 11:24:44 UTC (rev 9758)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-10-06 13:49:22 UTC (rev 9759)
@@ -13,7 +13,9 @@
package org.hornetq.tests.integration.jms.cluster;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.jms.BytesMessage;
@@ -36,6 +38,8 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.BackupConnectorConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
@@ -43,13 +47,16 @@
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQSession;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.integration.jms.server.management.JMSUtil;
import org.hornetq.tests.unit.util.InVMContext;
+import org.hornetq.tests.util.FakeLockHornetQServer;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -91,7 +98,12 @@
protected HornetQServer backupService;
protected Map<String, Object> backupParams = new HashMap<String, Object>();
+ private TransportConfiguration backuptc;
+ private TransportConfiguration livetc;
+ private TransportConfiguration liveAcceptortc;
+ private TransportConfiguration backupAcceptortc;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -102,26 +114,19 @@
{
liveJMSService.createQueue(true, "queue1", null, true, "/queue/queue1");
assertNotNull(ctx1.lookup("/queue/queue1"));
- liveJMSService.stop();
- Object obj = null;
+ HornetQConnectionFactory jbcf = HornetQJMSClient.createConnectionFactoryWithHA(livetc);
- try
- {
- obj = ctx1.lookup("/queue/queue1");
- }
- catch (NamingException expected)
- {
+ jbcf.setReconnectAttempts(-1);
- }
+ Connection conn = jbcf.createConnection();
- assertNull(obj);
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- backupJMSService.stop();
+ ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
- backupConf.setBackup(false);
+ JMSUtil.crash(liveService, coreSession);
- backupJMSService.start();
assertNotNull(ctx2.lookup("/queue/queue1"));
}
@@ -131,36 +136,26 @@
{
liveJMSService.createTopic(true, "topic", "/topic/t1");
assertNotNull(ctx1.lookup("//topic/t1"));
- liveJMSService.stop();
- Object obj = null;
+ HornetQConnectionFactory jbcf = HornetQJMSClient.createConnectionFactoryWithHA(livetc);
- try
- {
- obj = ctx1.lookup("//topic/t1");
- }
- catch (NamingException expected)
- {
+ jbcf.setReconnectAttempts(-1);
- }
+ Connection conn = jbcf.createConnection();
- assertNull(obj);
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- backupJMSService.stop();
+ ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
- backupConf.setBackup(false);
+ JMSUtil.crash(liveService, coreSession);
- backupJMSService.start();
-
assertNotNull(ctx2.lookup("/topic/t1"));
}
public void testAutomaticFailover() throws Exception
{
- HornetQConnectionFactory jbcf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"),
- new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams));
-
+ HornetQConnectionFactory jbcf = HornetQJMSClient.createConnectionFactoryWithHA(livetc);
+ jbcf.setReconnectAttempts(-1);
jbcf.setBlockOnDurableSend(true);
jbcf.setBlockOnNonDurableSend(true);
@@ -184,8 +179,6 @@
ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
- RemotingConnection coreConn = ((ClientSessionInternal)coreSession).getConnection();
-
SimpleString jmsQueueName = new SimpleString(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
coreSession.createQueue(jmsQueueName, jmsQueueName, null, true);
@@ -215,10 +208,8 @@
Thread.sleep(2000);
- HornetQException me = new HornetQException(HornetQException.NOT_CONNECTED);
+ JMSUtil.crash(liveService, ((HornetQSession) sess).getCoreSession());
- coreConn.fail(me);
-
for (int i = 0; i < numMessages; i++)
{
JMSFailoverTest.log.info("got message " + i);
@@ -236,22 +227,20 @@
conn.close();
- Assert.assertNotNull(listener.e);
-
- Assert.assertTrue(me == listener.e.getCause());
}
public void testManualFailover() throws Exception
{
- HornetQConnectionFactory jbcfLive = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ HornetQConnectionFactory jbcfLive = HornetQJMSClient.createConnectionFactoryWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
jbcfLive.setBlockOnNonDurableSend(true);
jbcfLive.setBlockOnDurableSend(true);
- HornetQConnectionFactory jbcfBackup = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ HornetQConnectionFactory jbcfBackup = HornetQJMSClient.createConnectionFactoryWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
jbcfBackup.setBlockOnNonDurableSend(true);
jbcfBackup.setBlockOnDurableSend(true);
+ jbcfBackup.setReconnectAttempts(-1);
Connection connLive = jbcfLive.createConnection();
@@ -284,16 +273,8 @@
// Note we block on P send to make sure all messages get to server before failover
- HornetQException me = new HornetQException(HornetQException.NOT_CONNECTED);
+ JMSUtil.crash(liveService, coreSessionLive);
- coreConnLive.fail(me);
-
- Assert.assertNotNull(listener.e);
-
- JMSException je = listener.e;
-
- Assert.assertEquals(me, je.getCause());
-
connLive.close();
// Now recreate on backup
@@ -330,7 +311,7 @@
protected void setUp() throws Exception
{
super.setUp();
-
+ FakeLockFile.clearLocks();
startServers();
}
@@ -339,7 +320,25 @@
*/
protected void startServers() throws Exception
{
+ backuptc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams);
+ livetc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
+
+ liveAcceptortc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory");
+
+
+ backupAcceptortc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
+ backupParams);
+
backupConf = new ConfigurationImpl();
+
+ backupConf.getAcceptorConfigurations().add(backupAcceptortc);
+ backupConf.getConnectorConfigurations().put(livetc.getName(), livetc);
+ backupConf.getConnectorConfigurations().put(backuptc.getName(), backuptc);
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(livetc.getName());
+ backupConf.setBackupConnectorConfiguration(new BackupConnectorConfiguration(staticConnectors, backuptc.getName()) );
+
backupConf.setSecurityEnabled(false);
backupConf.setJournalType(getDefaultJournalType());
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -352,7 +351,9 @@
backupConf.setJournalDirectory(getJournalDir());
backupConf.setPagingDirectory(getPageDir());
backupConf.setLargeMessagesDirectory(getLargeMessagesDir());
- backupService = HornetQServers.newHornetQServer(backupConf, true);
+ backupConf.setPersistenceEnabled(true);
+ backupConf.setClustered(true);
+ backupService = new FakeLockHornetQServer(backupConf);
backupJMSService = new JMSServerManagerImpl(backupService);
@@ -365,7 +366,11 @@
liveConf = new ConfigurationImpl();
liveConf.setSecurityEnabled(false);
liveConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
+ .add(liveAcceptortc);
+ List<String> pairs = null;
+ ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", livetc.getName(), -1, false, false, 1, 1,
+ pairs);
+ liveConf.getClusterConfigurations().add(ccc0);
liveConf.setSharedStore(true);
liveConf.setJournalType(getDefaultJournalType());
liveConf.setBindingsDirectory(getBindingsDir());
@@ -373,8 +378,10 @@
liveConf.setJournalDirectory(getJournalDir());
liveConf.setPagingDirectory(getPageDir());
liveConf.setLargeMessagesDirectory(getLargeMessagesDir());
-
- liveService = HornetQServers.newHornetQServer(liveConf, true);
+ liveConf.getConnectorConfigurations().put(livetc.getName(), livetc);
+ liveConf.setPersistenceEnabled(true);
+ liveConf.setClustered(true);
+ liveService = new FakeLockHornetQServer(liveConf);
liveJMSService = new JMSServerManagerImpl(liveService);
@@ -382,6 +389,7 @@
liveJMSService.start();
+ JMSUtil.waitForServer(backupService);
}
@Override
13 years, 6 months
JBoss hornetq SVN: r9758 - in trunk: src/main/org/hornetq/core/protocol/core/impl and 5 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-10-06 07:24:44 -0400 (Wed, 06 Oct 2010)
New Revision: 9758
Added:
trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
Modified:
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-526
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -107,7 +107,7 @@
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
* @author <a href="mailto:clebert.suconic@jboss.org>Clebert Suconic</a>
*/
-public class ServerSessionPacketHandler implements ChannelHandler, CloseListener, FailureListener
+public class ServerSessionPacketHandler implements ChannelHandler
{
private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
@@ -150,8 +150,6 @@
{
direct = false;
}
-
- addConnectionListeners();
}
public long getID()
@@ -159,22 +157,6 @@
return channel.getID();
}
- public void connectionFailed(final HornetQException exception)
- {
- log.warn("Client connection failed, clearing up resources for session " + session.getName());
-
- try
- {
- session.close(true);
- }
- catch (Exception e)
- {
- log.error("Failed to close session", e);
- }
-
- log.warn("Cleared up resources for session " + session.getName());
- }
-
public void close()
{
channel.flushConfirmations();
@@ -189,22 +171,6 @@
}
}
- public void connectionClosed()
- {
- }
-
- private void addConnectionListeners()
- {
- remotingConnection.addFailureListener(this);
- remotingConnection.addCloseListener(this);
- }
-
- private void removeConnectionListeners()
- {
- remotingConnection.removeFailureListener(this);
- remotingConnection.removeCloseListener(this);
- }
-
public Channel getChannel()
{
return channel;
@@ -423,7 +389,7 @@
{
requiresResponse = true;
session.close(false);
- removeConnectionListeners();
+ // removeConnectionListeners();
response = new NullResponseMessage();
flush = true;
closeChannel = true;
@@ -601,10 +567,10 @@
// might be executed
// before we have transferred the connection, leaving it in a started state
session.setTransferring(true);
-
- remotingConnection.removeFailureListener(this);
- remotingConnection.removeCloseListener(this);
-
+
+ List<CloseListener> closeListeners = remotingConnection.removeCloseListeners();
+ List<FailureListener> failureListeners = remotingConnection.removeFailureListeners();
+
// Note. We do not destroy the replicating connection here. In the case the live server has really crashed
// then the connection will get cleaned up anyway when the server ping timeout kicks in.
// In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
@@ -618,8 +584,8 @@
remotingConnection = newConnection;
- remotingConnection.addFailureListener(this);
- remotingConnection.addCloseListener(this);
+ remotingConnection.setCloseListeners(closeListeners);
+ remotingConnection.setFailureListeners(failureListeners);
int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -27,7 +27,7 @@
/**
* A CoreSessionCallback
*
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author Tim Fox
*
*
*/
@@ -40,7 +40,7 @@
private ProtocolManager protocolManager;
private String name;
-
+
public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel)
{
this.name = name;
@@ -54,8 +54,8 @@
channel.send(packet);
- int size = packet.getPacketSize();
-
+ int size = packet.getPacketSize();
+
return size;
}
@@ -67,15 +67,15 @@
return packet.getPacketSize();
}
-
+
public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
{
Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
channel.sendBatched(packet);
-
- int size = packet.getPacketSize();
+ int size = packet.getPacketSize();
+
return size;
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -225,6 +225,31 @@
return closeListeners.remove(listener);
}
+ public List<CloseListener> removeCloseListeners()
+ {
+ List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
+
+ closeListeners.clear();
+
+ return ret;
+ }
+
+ public List<FailureListener> removeFailureListeners()
+ {
+ List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners);
+
+ failureListeners.clear();
+
+ return ret;
+ }
+
+ public void setCloseListeners(List<CloseListener> listeners)
+ {
+ closeListeners.clear();
+
+ closeListeners.addAll(listeners);
+ }
+
public HornetQBuffer createBuffer(final int size)
{
return transportConnection.createBuffer(size);
@@ -471,6 +496,7 @@
channels.clear();
}
}
+
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -13,8 +13,10 @@
package org.hornetq.core.protocol.stomp;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -32,8 +34,9 @@
*
*
*/
-class StompConnection implements RemotingConnection
+public class StompConnection implements RemotingConnection
{
+
private static final Logger log = Logger.getLogger(StompConnection.class);
private final StompProtocolManager manager;
@@ -49,9 +52,17 @@
private boolean valid;
private boolean destroyed = false;
-
+
private StompDecoder decoder = new StompDecoder();
+
+ private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
+
+ private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>();
+
+ private final Object failLock = new Object();
+ private volatile boolean dataReceived;
+
public StompDecoder getDecoder()
{
return decoder;
@@ -64,17 +75,90 @@
this.manager = manager;
}
- public void addCloseListener(CloseListener listener)
+ public void addFailureListener(final FailureListener listener)
{
+ if (listener == null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ failureListeners.add(listener);
}
- public void addFailureListener(FailureListener listener)
+ public boolean removeFailureListener(final FailureListener listener)
{
+ if (listener == null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ return failureListeners.remove(listener);
}
+ public void addCloseListener(final CloseListener listener)
+ {
+ if (listener == null)
+ {
+ throw new IllegalStateException("CloseListener cannot be null");
+ }
+
+ closeListeners.add(listener);
+ }
+
+ public boolean removeCloseListener(final CloseListener listener)
+ {
+ if (listener == null)
+ {
+ throw new IllegalStateException("CloseListener cannot be null");
+ }
+
+ return closeListeners.remove(listener);
+ }
+
+ public List<CloseListener> removeCloseListeners()
+ {
+ List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
+
+ closeListeners.clear();
+
+ return ret;
+ }
+
+ public List<FailureListener> removeFailureListeners()
+ {
+ List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners);
+
+ failureListeners.clear();
+
+ return ret;
+ }
+
+ public void setCloseListeners(List<CloseListener> listeners)
+ {
+ closeListeners.clear();
+
+ closeListeners.addAll(listeners);
+ }
+
+ public void setFailureListeners(final List<FailureListener> listeners)
+ {
+ failureListeners.clear();
+
+ failureListeners.addAll(listeners);
+ }
+
+ public void setDataReceived()
+ {
+ dataReceived = true;
+ }
+
public boolean checkDataReceived()
{
- return true;
+ boolean res = dataReceived;
+
+ dataReceived = false;
+
+ return res;
}
public HornetQBuffer createBuffer(int size)
@@ -84,13 +168,23 @@
public void destroy()
{
- if (destroyed)
+ synchronized (failLock)
{
- return;
+ if (destroyed)
+ {
+ return;
+ }
}
destroyed = true;
+ internalClose();
+
+ callClosingListeners();
+ }
+
+ private void internalClose()
+ {
transportConnection.close();
manager.cleanup(this);
@@ -100,8 +194,29 @@
{
}
- public void fail(HornetQException me)
+ public void fail(final HornetQException me)
{
+ synchronized (failLock)
+ {
+ if (destroyed)
+ {
+ return;
+ }
+
+ destroyed = true;
+ }
+
+ log.warn("Connection failure has been detected: " + me.getMessage() +
+ " [code=" +
+ me.getCode() +
+ "]");
+
+ // Then call the listeners
+ callFailureListeners(me);
+
+ callClosingListeners();
+
+ internalClose();
}
public void flush()
@@ -140,20 +255,6 @@
return destroyed;
}
- public boolean removeCloseListener(CloseListener listener)
- {
- return false;
- }
-
- public boolean removeFailureListener(FailureListener listener)
- {
- return false;
- }
-
- public void setFailureListeners(List<FailureListener> listeners)
- {
- }
-
public void bufferReceived(Object connectionID, HornetQBuffer buffer)
{
manager.handleBuffer(this, buffer);
@@ -188,7 +289,7 @@
{
return clientID;
}
-
+
public boolean isValid()
{
return valid;
@@ -198,4 +299,45 @@
{
this.valid = valid;
}
+
+ private void callFailureListeners(final HornetQException me)
+ {
+ final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
+
+ for (final FailureListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionFailed(me);
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
+ }
+
+ private void callClosingListeners()
+ {
+ final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners);
+
+ for (final CloseListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionClosed();
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -124,6 +124,7 @@
else
{
// Default to 1 minute - which is same as core protocol
+
return new ConnectionEntry(conn, System.currentTimeMillis(), 1 * 60 * 1000);
}
}
@@ -143,6 +144,8 @@
{
StompConnection conn = (StompConnection)connection;
+ conn.setDataReceived();
+
StompDecoder decoder = conn.getDecoder();
do
@@ -217,7 +220,6 @@
if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
{
- log.info("receipt requested");
if (response == null)
{
Map<String, Object> h = new HashMap<String, Object>();
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -33,13 +33,13 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.ServerSessionImpl;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
@@ -133,7 +133,8 @@
// difference between Stomp and Stomp over Web Sockets is handled in NettyAcceptor.getPipeline()
this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server,
interceptors));
- this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManagerFactory().createProtocolManager(server,
+ interceptors));
}
// RemotingService implementation -------------------------------
@@ -144,15 +145,14 @@
{
return;
}
-
- ClassLoader tccl =
- AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+
+ ClassLoader tccl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
{
- public ClassLoader run()
- {
- return Thread.currentThread().getContextClassLoader();
- }
- });
+ return Thread.currentThread().getContextClassLoader();
+ }
+ });
// The remoting service maintains it's own thread pool for handling remoting traffic
// If OIO each connection will have it's own thread
@@ -161,7 +161,8 @@
// to support many hundreds of connections, but the main thread pool must be kept small for better performance
ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads" + System.identityHashCode(this),
- false, tccl);
+ false,
+ tccl);
threadPool = Executors.newCachedThreadPool(tFactory);
@@ -322,6 +323,8 @@
}
else
{
+ log.info("failed to remove connection");
+
return null;
}
}
@@ -388,7 +391,7 @@
for (FailureListener listener : failureListeners)
{
- if (listener instanceof ServerSessionPacketHandler)
+ if (listener instanceof ServerSessionImpl)
{
empty = false;
@@ -528,9 +531,12 @@
RemotingConnection conn = removeConnection(id);
HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive ping from " + conn.getRemoteAddress() +
+ "Did not receive data from " + conn.getRemoteAddress() +
". It is likely the client has exited or crashed without " +
- "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+ "closing its connection, or the network between the server and client has failed. " +
+ "You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
+ "Please check user manual for more information." +
+ " The connection will now be closed.");
conn.fail(me);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -73,7 +73,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
*/
-public class ServerSessionImpl implements ServerSession, FailureListener
+public class ServerSessionImpl implements ServerSession , FailureListener
{
// Constants -----------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -77,14 +77,22 @@
* @return true if removed
*/
boolean removeCloseListener(CloseListener listener);
-
+
+ List<CloseListener> removeCloseListeners();
+
+ void setCloseListeners(List<CloseListener> listeners);
+
+
/**
* return all the failure listeners
*
* @return the listeners
*/
List<FailureListener> getFailureListeners();
+
+ List<FailureListener> removeFailureListeners();
+
/**
* set the failure listeners.
* <p/>
Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.stomp;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.jms.server.JMSServerManager;
+
+/**
+ * A StompConnectionCleanupTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class StompConnectionCleanupTest extends StompTestBase
+{
+ private static final long CONNECTION_TTL = 2000;
+
+ public void testConnectionCleanup() throws Exception
+ {
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+
+ //We send and consumer a message to ensure a STOMP connection and server session is created
+
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+
+ // Now we wait until the connection is cleared on the server, which will happen some time after ttl, since no data
+ // is being sent
+
+ long start = System.currentTimeMillis();
+
+ while (true)
+ {
+ int connCount = server.getHornetQServer().getRemotingService().getConnections().size();
+
+ int sessionCount = server.getHornetQServer().getSessions().size();
+
+ // All connections and sessions should be timed out including STOMP + JMS connection
+
+ if (connCount == 0 && sessionCount == 0)
+ {
+ break;
+ }
+
+ Thread.sleep(10);
+
+ if (System.currentTimeMillis() - start > 10000)
+ {
+ fail("Timed out waiting for connection to be cleared up");
+ }
+ }
+ }
+
+ public void testConnectionNotCleanedUp() throws Exception
+ {
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+
+ //We send and consumer a message to ensure a STOMP connection and server session is created
+
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ long time = CONNECTION_TTL * 3;
+
+ long start = System.currentTimeMillis();
+
+ //Send msgs for an amount of time > connection_ttl make sure connection is not closed
+ while (true)
+ {
+ //Send and receive a msg
+
+ frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ sendFrame(frame);
+
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+
+ Thread.sleep(100);
+
+ if (System.currentTimeMillis() - start > time)
+ {
+ break;
+ }
+ }
+
+ }
+
+ @Override
+ protected JMSServerManager createServer() throws Exception
+ {
+ JMSServerManager s = super.createServer();
+
+ s.getHornetQServer().getConfiguration().setConnectionTTLOverride(CONNECTION_TTL);
+
+ return s;
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -19,77 +19,29 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
import java.net.SocketTimeoutException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
-import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.Topic;
import junit.framework.Assert;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.Stomp;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
-import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.hornetq.core.remoting.impl.netty.TransportConstants;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.JMSServerManager;
-import org.hornetq.jms.server.config.JMSConfiguration;
-import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
-import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
-import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
-import org.hornetq.jms.server.impl.JMSServerManagerImpl;
-import org.hornetq.spi.core.protocol.ProtocolType;
-import org.hornetq.tests.unit.util.InVMContext;
-import org.hornetq.tests.util.UnitTestCase;
-public class StompTest extends UnitTestCase
+public class StompTest extends StompTestBase
{
private static final transient Logger log = Logger.getLogger(StompTest.class);
- private int port = 61613;
-
- private Socket stompSocket;
-
- private ByteArrayOutputStream inputBuffer;
-
- private ConnectionFactory connectionFactory;
-
- private Connection connection;
-
- private Session session;
-
- private Queue queue;
-
- private Topic topic;
-
- private JMSServerManager server;
-
public void testSendManyMessages() throws Exception
{
MessageConsumer consumer = session.createConsumer(queue);
@@ -106,7 +58,7 @@
public void onMessage(Message arg0)
{
- //System.out.println("<<< " + (1000 - latch.getCount()));
+ // System.out.println("<<< " + (1000 - latch.getCount()));
latch.countDown();
}
});
@@ -115,7 +67,7 @@
for (int i = 1; i <= count; i++)
{
// Thread.sleep(1);
- //System.out.println(">>> " + i);
+ // System.out.println(">>> " + i);
sendFrame(frame);
}
@@ -191,14 +143,14 @@
TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
-
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
-
+
/*
* Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame
* This means next frame read might have a \n a the beginning.
@@ -215,14 +167,20 @@
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL + "\n";
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL +
+ "\n";
sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
-
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
@@ -258,7 +216,7 @@
TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
-
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
@@ -1279,210 +1237,4 @@
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
sendFrame(frame);
}
-
- // Implementation methods
- // -------------------------------------------------------------------------
- protected void setUp() throws Exception
- {
- super.setUp();
-
- server = createServer();
- server.start();
- connectionFactory = createConnectionFactory();
-
- stompSocket = createSocket();
- inputBuffer = new ByteArrayOutputStream();
-
- connection = connectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue(getQueueName());
- topic = session.createTopic(getTopicName());
- connection.start();
- }
-
- /**
- * @return
- * @throws Exception
- */
- private JMSServerManager createServer() throws Exception
- {
- Configuration config = new ConfigurationImpl();
- config.setSecurityEnabled(false);
- config.setPersistenceEnabled(false);
-
- Map<String, Object> params = new HashMap<String, Object>();
- params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
- params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
- TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
- config.getAcceptorConfigurations().add(stompTransport);
- config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
-
- JMSConfiguration jmsConfig = new JMSConfigurationImpl();
- jmsConfig.getQueueConfigurations()
- .add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
- jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
- server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
- server.setContext(new InVMContext());
- return server;
- }
-
- protected void tearDown() throws Exception
- {
- connection.close();
- if (stompSocket != null)
- {
- stompSocket.close();
- }
- server.stop();
-
- super.tearDown();
- }
-
- protected void reconnect() throws Exception
- {
- reconnect(0);
- }
-
- protected void reconnect(long sleep) throws Exception
- {
- stompSocket.close();
-
- if (sleep > 0)
- {
- Thread.sleep(sleep);
- }
-
- stompSocket = createSocket();
- inputBuffer = new ByteArrayOutputStream();
- }
-
- protected ConnectionFactory createConnectionFactory()
- {
- return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
- }
-
- protected Socket createSocket() throws IOException
- {
- return new Socket("127.0.0.1", port);
- }
-
- protected String getQueueName()
- {
- return "test";
- }
-
- protected String getQueuePrefix()
- {
- return "jms.queue.";
- }
-
- protected String getTopicName()
- {
- return "testtopic";
- }
-
- protected String getTopicPrefix()
- {
- return "jms.topic.";
- }
-
- public void sendFrame(String data) throws Exception
- {
- byte[] bytes = data.getBytes("UTF-8");
- OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < bytes.length; i++)
- {
- outputStream.write(bytes[i]);
- }
- outputStream.flush();
- }
-
- public void sendFrame(byte[] data) throws Exception
- {
- OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < data.length; i++)
- {
- outputStream.write(data[i]);
- }
- outputStream.flush();
- }
-
- public String receiveFrame(long timeOut) throws Exception
- {
- stompSocket.setSoTimeout((int)timeOut);
- InputStream is = stompSocket.getInputStream();
- int c = 0;
- for (;;)
- {
- c = is.read();
- if (c < 0)
- {
- throw new IOException("socket closed.");
- }
- else if (c == 0)
- {
- c = is.read();
- if (c != '\n')
- {
- byte[] ba = inputBuffer.toByteArray();
- System.out.println(new String(ba, "UTF-8"));
- }
- Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
- byte[] ba = inputBuffer.toByteArray();
- inputBuffer.reset();
- return new String(ba, "UTF-8");
- }
- else
- {
- inputBuffer.write(c);
- }
- }
- }
-
- public void sendMessage(String msg) throws Exception
- {
- sendMessage(msg, queue);
- }
-
- public void sendMessage(String msg, Destination destination) throws Exception
- {
- MessageProducer producer = session.createProducer(destination);
- TextMessage message = session.createTextMessage(msg);
- producer.send(message);
- }
-
- public void sendMessage(byte[] data, Destination destination) throws Exception
- {
- sendMessage(data, "foo", "xyz", destination);
- }
-
- public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception
- {
- sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
- }
-
- public void sendMessage(byte[] data, String propertyName, String propertyValue, Destination destination) throws Exception
- {
- MessageProducer producer = session.createProducer(destination);
- BytesMessage message = session.createBytesMessage();
- message.setStringProperty(propertyName, propertyValue);
- message.writeBytes(data);
- producer.send(message);
- }
-
- protected void waitForReceipt() throws Exception
- {
- String frame = receiveFrame(50000);
- assertNotNull(frame);
- assertTrue(frame.indexOf("RECEIPT") > -1);
- }
-
- protected void waitForFrameToTakeEffect() throws InterruptedException
- {
- // bit of a dirty hack :)
- // another option would be to force some kind of receipt to be returned
- // from the frame
- Thread.sleep(2000);
- }
}
Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -0,0 +1,290 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
+import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.unit.util.InVMContext;
+import org.hornetq.tests.util.UnitTestCase;
+
+public abstract class StompTestBase extends UnitTestCase
+{
+ private static final transient Logger log = Logger.getLogger(StompTestBase.class);
+
+ private int port = 61613;
+
+ private Socket stompSocket;
+
+ private ByteArrayOutputStream inputBuffer;
+
+ private ConnectionFactory connectionFactory;
+
+ private Connection connection;
+
+ protected Session session;
+
+ protected Queue queue;
+
+ protected Topic topic;
+
+ protected JMSServerManager server;
+
+
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer();
+ server.start();
+ connectionFactory = createConnectionFactory();
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(getQueueName());
+ topic = session.createTopic(getTopicName());
+ connection.start();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ protected JMSServerManager createServer() throws Exception
+ {
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+ config.setPersistenceEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+ params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+ config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+
+ JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+ jmsConfig.getQueueConfigurations()
+ .add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+ jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
+ server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+ server.setContext(new InVMContext());
+ return server;
+ }
+
+ protected void tearDown() throws Exception
+ {
+ connection.close();
+ if (stompSocket != null)
+ {
+ stompSocket.close();
+ }
+ server.stop();
+
+ super.tearDown();
+ }
+
+ protected void reconnect() throws Exception
+ {
+ reconnect(0);
+ }
+
+ protected void reconnect(long sleep) throws Exception
+ {
+ stompSocket.close();
+
+ if (sleep > 0)
+ {
+ Thread.sleep(sleep);
+ }
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+ }
+
+ protected ConnectionFactory createConnectionFactory()
+ {
+ return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ }
+
+ protected Socket createSocket() throws IOException
+ {
+ return new Socket("127.0.0.1", port);
+ }
+
+ protected String getQueueName()
+ {
+ return "test";
+ }
+
+ protected String getQueuePrefix()
+ {
+ return "jms.queue.";
+ }
+
+ protected String getTopicName()
+ {
+ return "testtopic";
+ }
+
+ protected String getTopicPrefix()
+ {
+ return "jms.topic.";
+ }
+
+ public void sendFrame(String data) throws Exception
+ {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++)
+ {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public void sendFrame(byte[] data) throws Exception
+ {
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < data.length; i++)
+ {
+ outputStream.write(data[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(long timeOut) throws Exception
+ {
+ stompSocket.setSoTimeout((int)timeOut);
+ InputStream is = stompSocket.getInputStream();
+ int c = 0;
+ for (;;)
+ {
+ c = is.read();
+ if (c < 0)
+ {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0)
+ {
+ c = is.read();
+ if (c != '\n')
+ {
+ byte[] ba = inputBuffer.toByteArray();
+ System.out.println(new String(ba, "UTF-8"));
+ }
+ Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else
+ {
+ inputBuffer.write(c);
+ }
+ }
+ }
+
+ public void sendMessage(String msg) throws Exception
+ {
+ sendMessage(msg, queue);
+ }
+
+ public void sendMessage(String msg, Destination destination) throws Exception
+ {
+ MessageProducer producer = session.createProducer(destination);
+ TextMessage message = session.createTextMessage(msg);
+ producer.send(message);
+ }
+
+ public void sendMessage(byte[] data, Destination destination) throws Exception
+ {
+ sendMessage(data, "foo", "xyz", destination);
+ }
+
+ public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception
+ {
+ sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
+ }
+
+ public void sendMessage(byte[] data, String propertyName, String propertyValue, Destination destination) throws Exception
+ {
+ MessageProducer producer = session.createProducer(destination);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty(propertyName, propertyValue);
+ message.writeBytes(data);
+ producer.send(message);
+ }
+
+ protected void waitForReceipt() throws Exception
+ {
+ String frame = receiveFrame(50000);
+ assertNotNull(frame);
+ assertTrue(frame.indexOf("RECEIPT") > -1);
+ }
+
+ protected void waitForFrameToTakeEffect() throws InterruptedException
+ {
+ // bit of a dirty hack :)
+ // another option would be to force some kind of receipt to be returned
+ // from the frame
+ Thread.sleep(2000);
+ }
+}
13 years, 6 months
JBoss hornetq SVN: r9757 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-06 04:18:15 -0400 (Wed, 06 Oct 2010)
New Revision: 9757
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
Log:
fixed clustere with backup test
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2010-10-06 07:23:16 UTC (rev 9756)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2010-10-06 08:18:15 UTC (rev 9757)
@@ -115,9 +115,9 @@
protected void setupServers() throws Exception
{
// The backups
- setupServer(0, isFileStorage(), isNetty(), true, -1, true);
- setupServer(1, isFileStorage(), isNetty(), true, -1, true);
- setupServer(2, isFileStorage(), isNetty(), true, -1, true);
+ setupServer(0, isFileStorage(), isNetty(), true, 3, true);
+ setupServer(1, isFileStorage(), isNetty(), true, 4, true);
+ setupServer(2, isFileStorage(), isNetty(), true, 5, true);
// The lives
setupServer(3, isFileStorage(), isNetty(), 0, true);
13 years, 6 months
JBoss hornetq SVN: r9756 - trunk/tests/src/org/hornetq/tests/integration/stomp.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-10-06 03:23:16 -0400 (Wed, 06 Oct 2010)
New Revision: 9756
Modified:
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
removed test that shouldn't be there
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-10-05 22:39:35 UTC (rev 9755)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-10-06 07:23:16 UTC (rev 9756)
@@ -121,36 +121,7 @@
assertTrue(latch.await(60, TimeUnit.SECONDS));
}
-
- public void testPerf() throws Exception
- {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- int count = 100000;
-
- frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "ABCDJIMTEST<GRV>http://techcrunch.com/2010/09/23/thelikestream-digg-for-facebook-likes/<GRV>0" + Stomp.NULL;
-
- long start = System.currentTimeMillis();
-
- for (int i = 1; i <= count; i++)
- {
- sendFrame(frame);
-
- if (i % 1000 == 0)
- {
- log.info("Sent " + i);
- }
- }
-
- long end = System.currentTimeMillis();
-
- log.info("That took " + (end-start));
- }
-
public void testConnect() throws Exception
{
13 years, 6 months
JBoss hornetq SVN: r9755 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-05 18:39:35 -0400 (Tue, 05 Oct 2010)
New Revision: 9755
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
adding tx test around redelivery of the cursor
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05 22:18:27 UTC (rev 9754)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05 22:39:35 UTC (rev 9755)
@@ -129,6 +129,11 @@
{
store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
installTXCallback(tx, position);
+
+ // It needs to persist, otherwise the cursor will return to the fist page position
+ tx.setContainsPersistent();
+
+
// tx.afterCommit()
}
@@ -179,6 +184,7 @@
{
Pair<PagePosition, ServerMessage> msgCheck = cursorProvider.getAfter(tmpPos);
// end of the hole, we can finish processing here
+ // It may be also that the next was just a next page, so we just ignore it
if (msgCheck == null || msgCheck.a.equals(pos))
{
break;
@@ -240,6 +246,7 @@
*/
private void installTXCallback(Transaction tx, PagePosition position)
{
+ //TODO: Play with rollbacks on the reference counts
}
// Inner classes -------------------------------------------------
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-05 22:18:27 UTC (rev 9754)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-05 22:39:35 UTC (rev 9755)
@@ -27,11 +27,14 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
@@ -153,13 +156,19 @@
PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
System.out.println("Cursor: " + cursor);
- for (int i = 0 ; i < 500 ; i++)
+ for (int i = 0 ; i < 1000 ; i++)
{
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getIntProperty("key").intValue());
- cursor.ack(msg.a);
+
+ if (i < 500)
+ {
+ cursor.ack(msg.a);
+ }
}
+ OperationContextImpl.getContext(null).waitCompletion();
+
server.stop();
server.start();
@@ -173,6 +182,8 @@
cursor.ack(msg.a);
}
+
+
}
@@ -225,11 +236,67 @@
}
+ public void testRestartWithHoleOnAckAndTransaction() throws Exception
+ {
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
+
+ System.out.println("Number of pages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+
+ Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+ for (int i = 0 ; i < 100 ; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ if (i < 10 || i > 20)
+ {
+ cursor.ackTx(tx, msg.a);
+ }
+ }
+
+ tx.commit();
+
+ server.stop();
+
+ server.start();
+
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ for (int i = 10; i <= 20; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+ for (int i = 100; i < NUM_MESSAGES; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+ }
+
+
public void testRollbackScenarios() throws Exception
{
}
+ public void testPrepareScenarios() throws Exception
+ {
+
+ }
+
public void testRedeliveryScenarios() throws Exception
{
@@ -297,7 +364,7 @@
Configuration config = createDefaultConfig();
- config.setJournalSyncNonTransactional(false);
+ config.setJournalSyncNonTransactional(true);
server = createServer(true,
config,
13 years, 6 months
JBoss hornetq SVN: r9754 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-05 18:18:27 -0400 (Tue, 05 Oct 2010)
New Revision: 9754
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
First implementation of the redelivery and ack with reload of the cursors
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -82,5 +82,5 @@
void deletePageStore(SimpleString storeName) throws Exception;
- void processReload();
+ void processReload() throws Exception;
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -64,7 +64,7 @@
PageCursorProvider getCursorProvier();
- void processReload();
+ void processReload() throws Exception;
/**
* @return false if a thread was already started, or if not in page mode
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -48,7 +48,7 @@
*/
void reloadPreparedACK(Transaction tx, PagePosition position);
- void processReload();
+ void processReload() throws Exception;
/**
* To be used on redeliveries
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -57,9 +57,13 @@
// PageCursor recoverCursor(PagePosition position);
Pair<PagePosition, ServerMessage> getAfter(PagePosition pos) throws Exception;
+
+ ServerMessage getMessage(PagePosition pos) throws Exception;
- void processReload();
+ void processReload() throws Exception;
+ void stop();
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -37,5 +37,8 @@
PagePosition nextMessage();
PagePosition nextPage();
+
+ /** This will just test if the current position is the immediate next to the parameter position */
+ boolean isRightAfter(PagePosition previous);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -14,8 +14,10 @@
package org.hornetq.core.paging.cursor.impl;
import java.util.Collections;
+import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.PagingStore;
@@ -23,6 +25,7 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
@@ -49,9 +52,12 @@
private final PageCursorProvider cursorProvider;
private volatile PagePosition lastPosition;
-
+
private List<PagePosition> recoveredACK;
+ // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
+ private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -74,11 +80,23 @@
*/
public synchronized Pair<PagePosition, ServerMessage> moveNext() throws Exception
{
+ PagePosition redeliveryPos = null;
+
+ // Redeliveries will take precedence
+ if ((redeliveryPos = redeliveries.poll()) != null)
+ {
+ return new Pair<PagePosition, ServerMessage>(redeliveryPos, cursorProvider.getMessage(redeliveryPos));
+ }
+
if (lastPosition == null)
{
- lastPosition = recoverLastPosition();
+ // it will start at the first available page
+ long firstPage = pageStore.getFirstPage();
+ lastPosition = new PagePositionImpl(firstPage, -1);
}
+ boolean match = false;
+
Pair<PagePosition, ServerMessage> message = null;
do
{
@@ -87,8 +105,14 @@
{
lastPosition = message.a;
}
+ match = match(message.b);
+
+ if (!match)
+ {
+ ignored(message.a);
+ }
}
- while (message != null && !match(message.b));
+ while (message != null && !match);
return message;
}
@@ -111,12 +135,10 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void redeliver(final PagePosition position)
+ public synchronized void redeliver(final PagePosition position)
{
- // TODO Auto-generated method stub
-
+ this.redeliveries.add(position);
}
-
/**
* Theres no need to synchronize this method as it's only called from journal load on startup
@@ -136,11 +158,49 @@
installTXCallback(tx, position);
}
- public void processReload()
+ public void processReload() throws Exception
{
if (this.recoveredACK != null)
{
+ System.out.println("********** processing reload!!!!!!!");
Collections.sort(recoveredACK);
+
+ PagePosition previousPos = null;
+ for (PagePosition pos : recoveredACK)
+ {
+ lastPosition = pos;
+ if (previousPos != null)
+ {
+ if (!previousPos.isRightAfter(previousPos))
+ {
+ PagePosition tmpPos = previousPos;
+ // looking for holes on the ack list for redelivery
+ while (true)
+ {
+ Pair<PagePosition, ServerMessage> msgCheck = cursorProvider.getAfter(tmpPos);
+ // end of the hole, we can finish processing here
+ if (msgCheck == null || msgCheck.a.equals(pos))
+ {
+ break;
+ }
+ else
+ {
+ if (match(msgCheck.b))
+ {
+ redeliver(msgCheck.a);
+ }
+ }
+ tmpPos = msgCheck.a;
+ }
+ }
+ }
+
+ previousPos = pos;
+ System.out.println("pos: " + pos);
+ }
+
+ recoveredACK.clear();
+ recoveredACK = null;
}
}
@@ -153,11 +213,14 @@
// To be used with expressions
return true;
}
-
-
// Private -------------------------------------------------------
-
+
+ private void ignored(final PagePosition message)
+ {
+ // TODO: Update reference counts
+ }
+
/**
* @param committedACK
*/
@@ -167,27 +230,18 @@
{
recoveredACK = new LinkedList<PagePosition>();
}
-
+
recoveredACK.add(committedACK);
}
-
- private PagePosition recoverLastPosition()
- {
- long firstPage = pageStore.getFirstPage();
- return new PagePositionImpl(firstPage, -1);
- }
-
-
/**
* @param tx
* @param position
*/
private void installTXCallback(Transaction tx, PagePosition position)
{
- }
+ }
-
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -78,7 +78,12 @@
PageCursor activeCursor = activeCursors.get(cursorID);
if (activeCursor == null)
{
- activeCursor = activeCursors.putIfAbsent(cursorID, new PageCursorImpl(this, pagingStore, storageManager, cursorID));
+ activeCursor = new PageCursorImpl(this, pagingStore, storageManager, cursorID);
+ PageCursor previousValue = activeCursors.putIfAbsent(cursorID, activeCursor);
+ if (previousValue != null)
+ {
+ activeCursor = previousValue;
+ }
}
return activeCursor;
@@ -118,6 +123,19 @@
return new Pair<PagePosition, ServerMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
}
+
+ public ServerMessage getMessage(final PagePosition pos) throws Exception
+ {
+ PageCache cache = getPageCache(pos.getPageNr());
+
+ if (pos.getMessageNr() >= cache.getNumberOfMessages())
+ {
+ // sanity check, this should never happen unless there's a bug
+ throw new IllegalStateException("Invalid messageNumber passed = " + pos);
+ }
+
+ return cache.getMessage(pos.getMessageNr());
+ }
public PageCache getPageCache(final long pageId) throws Exception
{
@@ -181,13 +199,18 @@
return softCache.size();
}
- public void processReload()
+ public void processReload() throws Exception
{
for (PageCursor cursor : this.activeCursors.values())
{
cursor.processReload();
}
}
+
+ public void stop()
+ {
+ activeCursors.clear();
+ }
// Package protected ---------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -82,6 +82,11 @@
{
return messageNr;
}
+
+ public boolean isRightAfter(final PagePosition previous)
+ {
+ return this.pageNr == previous.getPageNr() && this.messageNr == previous.getMessageNr() + 1;
+ }
/* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
@@ -158,4 +163,12 @@
return true;
}
+ @Override
+ public String toString()
+ {
+ return "PagePositionImpl [pageNr=" + pageNr + ", messageNr=" + messageNr + ", recordID=" + recordID + "]";
+ }
+
+
+
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -229,7 +229,7 @@
}
}
- public void processReload()
+ public void processReload() throws Exception
{
for (PagingStore store: stores.values())
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -317,6 +317,13 @@
public boolean startDepaging()
{
+
+ // Disabled for now
+
+ return false;
+
+
+ /*
if (!running)
{
return false;
@@ -353,11 +360,11 @@
finally
{
currentPageLock.readLock().unlock();
- }
+ } */
}
- public void processReload()
+ public void processReload() throws Exception
{
cursorProvider.processReload();
}
@@ -396,6 +403,8 @@
currentPage.close();
currentPage = null;
}
+
+ cursorProvider.stop();
}
}
@@ -1218,7 +1227,7 @@
// Inner classes -------------------------------------------------
- private class DepageRunnable implements Runnable
+/* private class DepageRunnable implements Runnable
{
private final Executor followingExecutor;
@@ -1252,5 +1261,5 @@
PagingStoreImpl.log.error(e, e);
}
}
- }
+ } */
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -1114,7 +1114,7 @@
deploymentManager.start();
}
- pagingManager.reloadStores();
+ pagingManager.processReload();
pagingManager.resumeDepages();
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-05 22:18:27 UTC (rev 9754)
@@ -23,10 +23,12 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -50,6 +52,8 @@
private SimpleString ADDRESS = new SimpleString("test-add");
private HornetQServer server;
+
+ private Queue queue;
private static final int PAGE_MAX = -1;
@@ -135,6 +139,92 @@
}
+ public void testRestart() throws Exception
+ {
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 100 * 1024);
+
+ System.out.println("Number of pages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+ for (int i = 0 ; i < 500 ; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+ server.stop();
+
+ server.start();
+
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ for (int i = 500; i < NUM_MESSAGES; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+ }
+
+
+ public void testRestartWithHoleOnAck() throws Exception
+ {
+
+ final int NUM_MESSAGES = 1000;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
+
+ System.out.println("Number of pages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+ for (int i = 0 ; i < 100 ; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ if (i < 10 || i > 20)
+ {
+ cursor.ack(msg.a);
+ }
+ }
+
+ server.stop();
+
+ server.start();
+
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ for (int i = 10; i <= 20; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+
+ for (int i = 100; i < NUM_MESSAGES; i++)
+ {
+ Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+
+ }
+
+
public void testRollbackScenarios() throws Exception
{
@@ -216,8 +306,10 @@
new HashMap<String, AddressSettings>());
server.start();
+
+ queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
- createQueue(ADDRESS.toString(), ADDRESS.toString());
+ //createQueue(ADDRESS.toString(), ADDRESS.toString());
}
protected void tearDown() throws Exception
13 years, 6 months