JBoss hornetq SVN: r9824 - in trunk: examples/common and 6 other directories.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2010-10-30 09:58:39 -0400 (Sat, 30 Oct 2010)
New Revision: 9824
Modified:
trunk/build-hornetq.xml
trunk/examples/common/build.xml
trunk/examples/core/twitter-connector/build.xml
trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml
trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java
trunk/pom.xml
trunk/src/main/org/hornetq/integration/twitter/TwitterConstants.java
trunk/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java
trunk/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-549
* change authentication method from basic auth to OAuth
* update twitter4j from 2.1.2 to 2.1.6(now reply-to works collectly)
* example's message is now able to be replaced with env.TWITTER_EXAMPLE_ALTERNATIVE_MESSAGE property
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-10-30 07:36:27 UTC (rev 9823)
+++ trunk/build-hornetq.xml 2010-10-30 13:58:39 UTC (rev 9824)
@@ -1710,8 +1710,10 @@
timeout="${junit.timeout}">
<sysproperty key="user.home" value="${user.home}"/>
<sysproperty key="java.io.tmpdir" value="${java.io.tmpdir}"/>
- <sysproperty key="twitter.username" value="${twitter.username}"/>
- <sysproperty key="twitter.password" value="${twitter.password}"/>
+ <sysproperty key="twitter.consumerKey" value="${twitter.consumerKey}"/>
+ <sysproperty key="twitter.consumerSecret" value="${twitter.consumerSecret}"/>
+ <sysproperty key="twitter.accessToken" value="${twitter.accessToken}"/>
+ <sysproperty key="twitter.accessTokenSecret" value="${twitter.accessTokenSecret}"/>
<jvmarg value="-Djava.library.path=native/bin"/>
<jvmarg value="-Dmodule.output=./"/>
<jvmarg value="-Djava.util.logging.config.file=src/config/trunk/non-clustered/logging.properties"/>
Modified: trunk/examples/common/build.xml
===================================================================
--- trunk/examples/common/build.xml 2010-10-30 07:36:27 UTC (rev 9823)
+++ trunk/examples/common/build.xml 2010-10-30 13:58:39 UTC (rev 9824)
@@ -140,6 +140,7 @@
<!--<echo>client classpath = ${clientClasspath}</echo>-->
<property file="${imported.basedir}/config/server.properties"/>
<java classname="${example.classname}" fork="true" resultproperty="example-result">
+ <jvmarg line="${client.args}"/>
<jvmarg value="-Dhornetq.example.server.classpath=${serverclasspath}"/>
<jvmarg value="-Dhornetq.example.server.args=${server.args}"/>
<jvmarg value="-Dhornetq.example.logserveroutput=${hornetq.example.logserveroutput}"/>
Modified: trunk/examples/core/twitter-connector/build.xml
===================================================================
--- trunk/examples/core/twitter-connector/build.xml 2010-10-30 07:36:27 UTC (rev 9823)
+++ trunk/examples/core/twitter-connector/build.xml 2010-10-30 13:58:39 UTC (rev 9824)
@@ -19,10 +19,13 @@
<import file="../../common/build.xml"/>
<property environment='env'/>
- <target name="check" unless="env.TWITTER_USERNAME">
+ <target name="check" unless="env.TWITTER_CONSUMER_KEY">
<echo>**************************************************************************</echo>
<echo>* Please set the twitter account: *</echo>
- <echo>* ./build.sh -Denv.TWITTER_USERNAME=user -Denv.TWITTER_PASSWORD=password *</echo>
+ <echo>* ./build.sh -Denv.TWITTER_CONSUMER_KEY=consumerKey \ *</echo>
+ <echo>* -Denv.TWITTER_CONSUMER_SECRET=consumerSecret \ *</echo>
+ <echo>* -Denv.TWITTER_ACCESS_TOKEN=accessToken \ *</echo>
+ <echo>* -Denv.TWITTER_ACCESS_TOKEN_SECRET=accessTokenSecret *</echo>
<echo>**************************************************************************</echo>
<fail message="run example failed"/>
</target>
@@ -30,10 +33,11 @@
<target name="run" depends="check">
<antcall target="runExample">
<param name="example.classname" value="org.hornetq.core.example.TwitterConnectorExample"/>
+ <param name="client.args" value="-Dtwitter.example.alternativeMessage=${env.TWITTER_EXAMPLE_ALTERNATIVE_MESSAGE}"/>
<!-- HTTP proxy settings
<param name="server.args" value="-Dtwitter4j.http.proxyHost=your.proxy.server -Dtwitter4j.http.proxyPort=your.proxy.port"/>
-->
- <param name="server.args" value="-Dtwitter.username=${env.TWITTER_USERNAME} -Dtwitter.password=${env.TWITTER_PASSWORD}"/>
+ <param name="server.args" value="-Dtwitter.consumerKey=${env.TWITTER_CONSUMER_KEY} -Dtwitter.consumerSecret=${env.TWITTER_CONSUMER_SECRET} -Dtwitter.accessToken=${env.TWITTER_ACCESS_TOKEN} -Dtwitter.accessTokenSecret=${env.TWITTER_ACCESS_TOKEN_SECRET}"/>
</antcall>
</target>
Modified: trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml 2010-10-30 07:36:27 UTC (rev 9823)
+++ trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml 2010-10-30 13:58:39 UTC (rev 9824)
@@ -13,7 +13,7 @@
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">
- <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
</acceptor>
</acceptors>
@@ -44,15 +44,19 @@
<connector-service name="my-incoming-tweets">
<factory-class>org.hornetq.integration.twitter.TwitterIncomingConnectorServiceFactory</factory-class>
<param key="queue" value="queue.incomingQueue"/>
- <param key="username" value="${twitter.username}"/>
- <param key="password" value="${twitter.password}"/>
+ <param key="consumerKey" value="${twitter.consumerKey}"/>
+ <param key="consumerSecret" value="${twitter.consumerSecret}"/>
+ <param key="accessToken" value="${twitter.accessToken}"/>
+ <param key="accessTokenSecret" value="${twitter.accessTokenSecret}"/>
<param key="interval" value="60"/>
</connector-service>
<connector-service name="my-outgoing-tweets">
<factory-class>org.hornetq.integration.twitter.TwitterOutgoingConnectorServiceFactory</factory-class>
<param key="queue" value="queue.outgoingQueue"/>
- <param key="username" value="${twitter.username}"/>
- <param key="password" value="${twitter.password}"/>
+ <param key="consumerKey" value="${twitter.consumerKey}"/>
+ <param key="consumerSecret" value="${twitter.consumerSecret}"/>
+ <param key="accessToken" value="${twitter.accessToken}"/>
+ <param key="accessTokenSecret" value="${twitter.accessTokenSecret}"/>
</connector-service>
</connector-services>
Modified: trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java
===================================================================
--- trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java 2010-10-30 07:36:27 UTC (rev 9823)
+++ trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java 2010-10-30 13:58:39 UTC (rev 9824)
@@ -44,6 +44,11 @@
ClientSession session = null;
try
{
+ String testMessage = System.currentTimeMillis() + ": " + System.getProperty("twitter.example.alternativeMessage");
+ if(testMessage == null || testMessage.trim().equals("")) {
+ testMessage = System.currentTimeMillis() + ": ### Hello, HornetQ fans!! We are now experiencing so fast, so reliable and so exciting messaging never seen before ;-) ###";
+ }
+
// Step 1. Create a ClientSessionFactory.
csf = HornetQClient.createClientSessionFactory (new TransportConfiguration(NettyConnectorFactory.class.getName()));
@@ -58,7 +63,6 @@
// Step 5. Create a core message.
ClientMessage cm = session.createMessage(org.hornetq.api.core.Message.TEXT_TYPE,true);
- String testMessage = System.currentTimeMillis() + ": ### Hello, HornetQ fans!! We are now experiencing so fast, so reliable and so exciting messaging never seen before ;-) ###";
cm.getBodyBuffer().writeString(testMessage);
// Step 6. Send a message to queue.outgoingQueue.
@@ -76,13 +80,21 @@
ClientMessage received = cc.receive(70 * 1000);
received.acknowledge();
String receivedText = received.getBodyBuffer().readString();
- System.out.println("#### Received a message from " + INCOMING_QUEUE + ": " + receivedText);
- if(!receivedText.equals(testMessage))
+ while(!receivedText.equals(testMessage))
{
- return false;
+ // ignoring other tweets
+ received = cc.receiveImmediate();
+ if(received == null) {
+ // no other tweets. test message has gone...
+ return false;
+ }
+
+ received.acknowledge();
+ receivedText = received.getBodyBuffer().readString();
}
-
+
+ System.out.println("#### Received a message from " + INCOMING_QUEUE + ": " + receivedText);
return true;
}
finally
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-10-30 07:36:27 UTC (rev 9823)
+++ trunk/pom.xml 2010-10-30 13:58:39 UTC (rev 9824)
@@ -253,7 +253,7 @@
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
- <version>2.1.2</version>
+ <version>2.1.6</version>
</dependency>
<!-- needed to compile the tests-->
<dependency>
Modified: trunk/src/main/org/hornetq/integration/twitter/TwitterConstants.java
===================================================================
--- trunk/src/main/org/hornetq/integration/twitter/TwitterConstants.java 2010-10-30 07:36:27 UTC (rev 9823)
+++ trunk/src/main/org/hornetq/integration/twitter/TwitterConstants.java 2010-10-30 13:58:39 UTC (rev 9824)
@@ -50,32 +50,42 @@
public static final Set<String> ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
public static final Set<String> REQUIRED_OUTGOING_CONNECTOR_KEYS;
- public static final String USER_NAME = "username";
- public static final String PASSWORD = "password";
+ public static final String CONSUMER_KEY = "consumerKey";
+ public static final String CONSUMER_SECRET = "consumerSecret";
+ public static final String ACCESS_TOKEN ="accessToken";
+ public static final String ACCESS_TOKEN_SECRET = "accessTokenSecret";
public static final String QUEUE_NAME = "queue";
public static final String INCOMING_INTERVAL = "interval";
static
{
ALLOWABLE_INCOMING_CONNECTOR_KEYS = new HashSet<String>();
- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(USER_NAME);
- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(PASSWORD);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(CONSUMER_KEY);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(INCOMING_INTERVAL);
REQUIRED_INCOMING_CONNECTOR_KEYS = new HashSet<String>();
- REQUIRED_INCOMING_CONNECTOR_KEYS.add(USER_NAME);
- REQUIRED_INCOMING_CONNECTOR_KEYS.add(PASSWORD);
+ REQUIRED_INCOMING_CONNECTOR_KEYS.add(CONSUMER_KEY);
+ REQUIRED_INCOMING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+ REQUIRED_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+ REQUIRED_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
REQUIRED_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS = new HashSet<String>();
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(USER_NAME);
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(PASSWORD);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_KEY);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
REQUIRED_OUTGOING_CONNECTOR_KEYS = new HashSet<String>();
- REQUIRED_OUTGOING_CONNECTOR_KEYS.add(USER_NAME);
- REQUIRED_OUTGOING_CONNECTOR_KEYS.add(PASSWORD);
+ REQUIRED_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_KEY);
+ REQUIRED_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+ REQUIRED_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+ REQUIRED_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
REQUIRED_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
}
}
Modified: trunk/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java
===================================================================
--- trunk/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java 2010-10-30 07:36:27 UTC (rev 9823)
+++ trunk/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java 2010-10-30 13:58:39 UTC (rev 9824)
@@ -23,7 +23,7 @@
import org.hornetq.integration.twitter.TwitterConstants;
import org.hornetq.utils.ConfigurationHelper;
import twitter4j.*;
-
+import twitter4j.http.AccessToken;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -39,10 +39,14 @@
private final String connectorName;
- private final String userName;
+ private final String consumerKey;
- private final String password;
+ private final String consumerSecret;
+ private final String accessToken;
+
+ private final String accessTokenSecret;
+
private final String queueName;
private final int intervalSeconds;
@@ -59,17 +63,19 @@
private final ScheduledExecutorService scheduledPool;
- private ScheduledFuture scheduledFuture;
+ private ScheduledFuture<?> scheduledFuture;
- public IncomingTweetsHandler(final String connectorName,
+ public IncomingTweetsHandler(final String connectorName,
final Map<String, Object> configuration,
final StorageManager storageManager,
final PostOffice postOffice,
final ScheduledExecutorService scheduledThreadPool)
{
this.connectorName = connectorName;
- this.userName = ConfigurationHelper.getStringProperty(TwitterConstants.USER_NAME, null, configuration);
- this.password = ConfigurationHelper.getStringProperty(TwitterConstants.PASSWORD, null, configuration);
+ this.consumerKey = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_KEY, null, configuration);
+ this.consumerSecret = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_SECRET, null, configuration);
+ this.accessToken = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN, null, configuration);
+ this.accessTokenSecret = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN_SECRET, null, configuration);
this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME, null, configuration);
Integer intervalSeconds = ConfigurationHelper.getIntProperty(TwitterConstants.INCOMING_INTERVAL, 0, configuration);
if (intervalSeconds > 0)
@@ -95,9 +101,12 @@
paging = new Paging();
TwitterFactory tf = new TwitterFactory();
- this.twitter = tf.getInstance(userName, password);
+ this.twitter = tf.getOAuthAuthorizedInstance(this.consumerKey,
+ this.consumerSecret,
+ new AccessToken(this.accessToken,
+ this.accessTokenSecret));
this.twitter.verifyCredentials();
-
+
// getting latest ID
this.paging.setCount(TwitterConstants.FIRST_ATTEMPT_PAGE_SIZE);
ResponseList<Status> res = this.twitter.getHomeTimeline(paging);
Modified: trunk/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
===================================================================
--- trunk/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java 2010-10-30 07:36:27 UTC (rev 9823)
+++ trunk/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java 2010-10-30 13:58:39 UTC (rev 9824)
@@ -21,6 +21,7 @@
import org.hornetq.integration.twitter.TwitterConstants;
import org.hornetq.utils.ConfigurationHelper;
import twitter4j.*;
+import twitter4j.http.AccessToken;
import java.util.Map;
@@ -34,10 +35,14 @@
private final String connectorName;
- private final String userName;
+ private final String consumerKey;
- private final String password;
+ private final String consumerSecret;
+ private final String accessToken;
+
+ private final String accessTokenSecret;
+
private final String queueName;
private final PostOffice postOffice;
@@ -50,13 +55,15 @@
private boolean isStarted = false;
- public OutgoingTweetsHandler(final String connectorName,
+ public OutgoingTweetsHandler(final String connectorName,
final Map<String, Object> configuration,
final PostOffice postOffice)
{
this.connectorName = connectorName;
- this.userName = ConfigurationHelper.getStringProperty(TwitterConstants.USER_NAME, null, configuration);
- this.password = ConfigurationHelper.getStringProperty(TwitterConstants.PASSWORD, null, configuration);
+ this.consumerKey = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_KEY, null, configuration);
+ this.consumerSecret = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_SECRET, null, configuration);
+ this.accessToken = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN, null, configuration);
+ this.accessTokenSecret = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN_SECRET, null, configuration);
this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME, null, configuration);
this.postOffice = postOffice;
}
@@ -91,8 +98,12 @@
this.queue = (Queue)b.getBindable();
TwitterFactory tf = new TwitterFactory();
- this.twitter = tf.getInstance(userName, password);
+ this.twitter = tf.getOAuthAuthorizedInstance(this.consumerKey,
+ this.consumerSecret,
+ new AccessToken(this.accessToken,
+ this.accessTokenSecret));
this.twitter.verifyCredentials();
+
// TODO make filter-string configurable
// this.filter = FilterImpl.createFilter(filterString);
this.filter = null;
Modified: trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java 2010-10-30 07:36:27 UTC (rev 9823)
+++ trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java 2010-10-30 13:58:39 UTC (rev 9824)
@@ -38,6 +38,7 @@
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
import twitter4j.*;
+import twitter4j.http.AccessToken;
/**
* A TwitterTest
@@ -50,19 +51,23 @@
{
private static final Logger log = Logger.getLogger(TwitterTest.class);
private static final String KEY_CONNECTOR_NAME = "connector.name";
- private static final String KEY_USERNAME = "username";
- private static final String KEY_PASSWORD = "password";
+ private static final String KEY_CONSUMER_KEY = "consumerKey";
+ private static final String KEY_CONSUMER_SECRET = "consumerSecret";
+ private static final String KEY_ACCESS_TOKEN = "accessToken";
+ private static final String KEY_ACCESS_TOKEN_SECRET = "accessTokenSecret";
private static final String KEY_QUEUE_NAME = "queue.name";
- private static final String TWITTER_USERNAME = System.getProperty("twitter.username");
- private static final String TWITTER_PASSWORD = System.getProperty("twitter.password");
-
+ private static final String TWITTER_CONSUMER_KEY = System.getProperty("twitter.consumerKey");
+ private static final String TWITTER_CONSUMER_SECRET = System.getProperty("twitter.consumerSecret");
+ private static final String TWITTER_ACCESS_TOKEN = System.getProperty("twitter.accessToken");
+ private static final String TWITTER_ACCESS_TOKEN_SECRET = System.getProperty("twitter.accessTokenSecret");
+
@Override
protected void setUp() throws Exception
{
- if(TWITTER_USERNAME == null || TWITTER_PASSWORD == null)
+ if(TWITTER_CONSUMER_KEY == null || TWITTER_CONSUMER_SECRET == null || TWITTER_ACCESS_TOKEN == null || TWITTER_ACCESS_TOKEN_SECRET == null)
{
- throw new Exception("* * * Please set twitter.username and twitter.password in system property * * *");
+ throw new Exception("* * * Please set twitter.consumerKey, twitter.consumerSecret, twitter.accessToken and twitter.accessTokenSecuret in system property * * *");
}
super.setUp();
}
@@ -101,8 +106,10 @@
public void testIncomingWithInvalidCredentials() throws Exception
{
HashMap<String,String> params = new HashMap<String,String>();
- params.put(KEY_USERNAME, "invalidUsername");
- params.put(KEY_PASSWORD, "invalidPassword");
+ params.put(KEY_CONSUMER_KEY, "invalidConsumerKey");
+ params.put(KEY_CONSUMER_SECRET, "invalidConsumerSecret");
+ params.put(KEY_ACCESS_TOKEN, "invalidAccessToken");
+ params.put(KEY_ACCESS_TOKEN_SECRET, "invalidAcccessTokenSecret");
internalTestIncomingFailedToInitialize(params);
}
@@ -139,18 +146,14 @@
public void testOutgoingWithInvalidCredentials() throws Exception
{
HashMap<String,String> params = new HashMap<String,String>();
- params.put(KEY_USERNAME, "invalidUsername");
- params.put(KEY_PASSWORD, "invalidPassword");
+ params.put(KEY_CONSUMER_KEY, "invalidConsumerKey");
+ params.put(KEY_CONSUMER_SECRET, "invalidConsumerSecret");
+ params.put(KEY_ACCESS_TOKEN, "invalidAccessToken");
+ params.put(KEY_ACCESS_TOKEN_SECRET, "invalidAcccessTokenSecret");
internalTestOutgoingFailedToInitialize(params);
}
- /**
- * This will fail until TFJ-347 is fixed.
- * http://twitter4j.org/jira/browse/TFJ-347
- *
- * @throws Exception
- */
- public void _testOutgoingWithInReplyTo() throws Exception
+ public void testOutgoingWithInReplyTo() throws Exception
{
internalTestOutgoingWithInReplyTo();
}
@@ -161,7 +164,10 @@
ClientSession session = null;
String queue = "TwitterTestQueue";
int interval = 5;
- Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+ Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
+ TWITTER_CONSUMER_SECRET,
+ new AccessToken(TWITTER_ACCESS_TOKEN,
+ TWITTER_ACCESS_TOKEN_SECRET));
String testMessage = "TwitterTest/incoming: " + System.currentTimeMillis();
log.debug("test incoming: " + testMessage);
@@ -171,8 +177,10 @@
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.INCOMING_INTERVAL, interval);
config.put(TwitterConstants.QUEUE_NAME, queue);
- config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
- config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+ config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
+ config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
+ config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
+ config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
ConnectorServiceConfiguration inconf =
new ConnectorServiceConfiguration(
TwitterIncomingConnectorServiceFactory.class.getName(),
@@ -244,22 +252,32 @@
HornetQServer server0 = null;
String connectorName = "test-incoming-connector";
String queue = "TwitterTestQueue";
- String userName = "invalidUsername";
- String password = "invalidPassword";
+ String consumerKey = "invalidConsumerKey";
+ String consumerSecret = "invalidConsumerSecret";
+ String accessToken = "invalidAccessToken";
+ String accessTokenSecret = "invalidAccessTokenSecret";
int interval = 5;
if(params.containsKey(KEY_CONNECTOR_NAME))
{
connectorName = params.get(KEY_CONNECTOR_NAME);
}
- if(params.containsKey(KEY_USERNAME))
+ if(params.containsKey(KEY_CONSUMER_KEY))
{
- userName = params.get(KEY_USERNAME);
+ consumerKey = params.get(KEY_CONSUMER_KEY);
}
- if(params.containsKey(KEY_PASSWORD))
+ if(params.containsKey(KEY_CONSUMER_SECRET))
{
- password = params.get(KEY_PASSWORD);
+ consumerSecret = params.get(KEY_CONSUMER_SECRET);
}
+ if(params.containsKey(KEY_ACCESS_TOKEN))
+ {
+ accessToken = params.get(KEY_ACCESS_TOKEN);
+ }
+ if(params.containsKey(KEY_ACCESS_TOKEN_SECRET))
+ {
+ accessTokenSecret = params.get(KEY_ACCESS_TOKEN_SECRET);
+ }
if(params.containsKey(KEY_QUEUE_NAME))
{
queue = params.get(KEY_QUEUE_NAME);
@@ -271,8 +289,10 @@
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.INCOMING_INTERVAL, interval);
config.put(TwitterConstants.QUEUE_NAME, queue);
- config.put(TwitterConstants.USER_NAME, userName);
- config.put(TwitterConstants.PASSWORD, password);
+ config.put(TwitterConstants.CONSUMER_KEY, consumerKey);
+ config.put(TwitterConstants.CONSUMER_SECRET, consumerSecret);
+ config.put(TwitterConstants.ACCESS_TOKEN, accessToken);
+ config.put(TwitterConstants.ACCESS_TOKEN_SECRET, accessTokenSecret);
ConnectorServiceConfiguration inconf =
new ConnectorServiceConfiguration(TwitterIncomingConnectorServiceFactory.class.getName(),
config,
@@ -306,7 +326,10 @@
HornetQServer server0 = null;
ClientSession session = null;
String queue = "TwitterTestQueue";
- Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+ Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
+ TWITTER_CONSUMER_SECRET,
+ new AccessToken(TWITTER_ACCESS_TOKEN,
+ TWITTER_ACCESS_TOKEN_SECRET));
String testMessage = "TwitterTest/outgoing: " + System.currentTimeMillis();
log.debug("test outgoing: " + testMessage);
@@ -315,8 +338,10 @@
Configuration configuration = createDefaultConfig(false);
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.QUEUE_NAME, queue);
- config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
- config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+ config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
+ config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
+ config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
+ config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
ConnectorServiceConfiguration outconf =
new ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
config,
@@ -388,25 +413,35 @@
protected void internalTestOutgoingFailedToInitialize(HashMap<String,String> params) throws Exception
{
HornetQServer server0 = null;
- String connectorName = "test-outgoing-connector";
+ String connectorName = "test-outgoing-connector";
String queue = "TwitterTestQueue";
- String userName = TWITTER_USERNAME;
- String password = TWITTER_PASSWORD;
+ String consumerKey = TWITTER_CONSUMER_KEY;
+ String consumerSecret = TWITTER_CONSUMER_SECRET;
+ String accessToken = TWITTER_ACCESS_TOKEN;
+ String accessTokenSecret = TWITTER_ACCESS_TOKEN_SECRET;
if(params.containsKey(KEY_CONNECTOR_NAME))
{
connectorName = params.get(KEY_CONNECTOR_NAME);
}
- if(params.containsKey(KEY_USERNAME))
+ if (params.containsKey(KEY_CONSUMER_KEY))
{
- userName = params.get(KEY_USERNAME);
+ consumerKey = params.get(KEY_CONSUMER_KEY);
}
- if(params.containsKey(KEY_PASSWORD))
+ if (params.containsKey(KEY_CONSUMER_SECRET))
{
- password = params.get(KEY_PASSWORD);
+ consumerSecret = params.get(KEY_CONSUMER_SECRET);
}
- if(params.containsKey(KEY_QUEUE_NAME))
+ if (params.containsKey(KEY_ACCESS_TOKEN))
{
+ accessToken = params.get(KEY_ACCESS_TOKEN);
+ }
+ if (params.containsKey(KEY_ACCESS_TOKEN_SECRET))
+ {
+ accessTokenSecret = params.get(KEY_ACCESS_TOKEN_SECRET);
+ }
+ if (params.containsKey(KEY_QUEUE_NAME))
+ {
queue = params.get(KEY_QUEUE_NAME);
}
@@ -415,12 +450,14 @@
Configuration configuration = createDefaultConfig(false);
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.QUEUE_NAME, queue);
- config.put(TwitterConstants.USER_NAME, userName);
- config.put(TwitterConstants.PASSWORD, password);
+ config.put(TwitterConstants.CONSUMER_KEY, consumerKey);
+ config.put(TwitterConstants.CONSUMER_SECRET, consumerSecret);
+ config.put(TwitterConstants.ACCESS_TOKEN, accessToken);
+ config.put(TwitterConstants.ACCESS_TOKEN_SECRET, accessTokenSecret);
ConnectorServiceConfiguration outconf =
new ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
config,
- "test-outgoing-connector");
+ connectorName);
configuration.getConnectorServiceConfigurations().add(outconf);
CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, false);
configuration.getQueueConfigurations().add(qc);
@@ -446,16 +483,21 @@
HornetQServer server0 = null;
ClientSession session = null;
String queue = "TwitterTestQueue";
- Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+ Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
+ TWITTER_CONSUMER_SECRET,
+ new AccessToken(TWITTER_ACCESS_TOKEN,
+ TWITTER_ACCESS_TOKEN_SECRET));
String testMessage = "TwitterTest/outgoing with in_reply_to: " + System.currentTimeMillis();
- String replyMessage = "@" + TWITTER_USERNAME + " TwitterTest/outgoing reply: " + System.currentTimeMillis();
+ String replyMessage = "@" + twitter.getScreenName() + " TwitterTest/outgoing reply: " + System.currentTimeMillis();
try
{
Configuration configuration = createDefaultConfig(false);
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.QUEUE_NAME, queue);
- config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
- config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+ config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
+ config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
+ config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
+ config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
ConnectorServiceConfiguration outconf =
new ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
config,
14 years, 4 months
JBoss hornetq SVN: r9823 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-30 03:36:27 -0400 (Sat, 30 Oct 2010)
New Revision: 9823
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
fixed failover test config
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-10-30 00:06:04 UTC (rev 9822)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-10-30 07:36:27 UTC (rev 9823)
@@ -137,6 +137,9 @@
backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(liveConnector.getName());
+ ClusterConnectionConfiguration cccLive = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
+ staticConnectors);
+ backupConfig.getClusterConfigurations().add(cccLive);
backupServer = createBackupServer();
// FIXME
@@ -254,7 +257,8 @@
sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
- assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+ boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+ assertTrue(ok);
return sf;
}
14 years, 4 months
JBoss hornetq SVN: r9822 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-29 20:06:04 -0400 (Fri, 29 Oct 2010)
New Revision: 9822
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Changes
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-29 23:28:22 UTC (rev 9821)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-30 00:06:04 UTC (rev 9822)
@@ -16,6 +16,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.LinkedListIterator;
/**
* A PageCursor
@@ -37,6 +38,8 @@
/** It will be 0 if non persistent cursor */
public long getId();
+ public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
+
// To be called when the cursor is closed for good. Most likely when the queue is deleted
void close() throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-29 23:28:22 UTC (rev 9821)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-30 00:06:04 UTC (rev 9822)
@@ -152,7 +152,7 @@
}
- class LocalIterator implements LinkedListIterator<Pair<PagePosition, PagedMessage>>
+ class CursorIterator implements LinkedListIterator<Pair<PagePosition, PagedMessage>>
{
PagePosition position = getLastPosition();
@@ -191,7 +191,14 @@
{
Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
lastOperation = position;
- position = nextPos.a;
+ if (nextPos == null)
+ {
+ position = null;
+ }
+ else
+ {
+ position = nextPos.a;
+ }
return nextPos;
}
catch (Exception e)
@@ -219,8 +226,20 @@
{
}
}
+
+
/* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#iterator()
+ */
+ public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator()
+ {
+ return new CursorIterator();
+ }
+
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition posision) throws Exception
@@ -923,5 +942,4 @@
}
}
-
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-29 23:28:22 UTC (rev 9821)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-30 00:06:04 UTC (rev 9822)
@@ -46,6 +46,7 @@
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.LinkedListIterator;
/**
* A PageCacheTest
@@ -118,14 +119,24 @@
Pair<PagePosition, PagedMessage> msg;
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
int key = 0;
- while ((msg = cursor.moveNext()) != null)
+ while ((msg = iterator.next()) != null)
{
assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
assertEquals(NUM_MESSAGES, key);
+
+ server.getStorageManager().waitOnOperations();
+
+ waitCleanup();
+
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
+
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
forceGC();
assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
@@ -194,8 +205,13 @@
Pair<PagePosition, PagedMessage> msg;
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorEven = cursorEven.iterator();
+
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorOdd = cursorOdd.iterator();
+
+
int key = 0;
- while ((msg = cursorEven.moveNext()) != null)
+ while ((msg = iteratorEven.next()) != null)
{
assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
assertTrue(msg.b.getMessage().getBooleanProperty("even").booleanValue());
@@ -205,7 +221,7 @@
assertEquals(NUM_MESSAGES, key);
key = 1;
- while ((msg = cursorOdd.moveNext()) != null)
+ while ((msg = iteratorOdd.next()) != null)
{
assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
assertFalse(msg.b.getMessage().getBooleanProperty("even").booleanValue());
@@ -271,11 +287,13 @@
System.out.println("Cursor: " + cursor);
cursorProvider.printDebug();
+
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
for (int i = 0; i < 1000; i++)
{
System.out.println("Reading Msg : " + i);
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertNotNull(msg);
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
@@ -305,10 +323,12 @@
.getCursorProvier()
.getPersistentCursor(queue.getID());
+ iterator = cursor.iterator();
+
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
System.out.println("Received " + i);
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertNotNull(msg);
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
@@ -356,9 +376,10 @@
.createPersistentCursor(queue.getID(), null);
System.out.println("Cursor: " + cursor);
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
@@ -376,17 +397,18 @@
.getPageStore(ADDRESS)
.getCursorProvier()
.getPersistentCursor(queue.getID());
+ iterator = cursor.iterator();
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
@@ -422,9 +444,12 @@
System.out.println("Cursor: " + cursor);
Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+
for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
@@ -446,17 +471,18 @@
.getPersistentCursor(queue.getID());
tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
+ iterator = cursor.iterator();
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ackTx(tx, msg.a);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = iterator.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ackTx(tx, msg.a);
}
@@ -495,6 +521,8 @@
System.out.println("Cursor: " + cursor);
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
if (i % 100 == 0)
@@ -509,13 +537,13 @@
Assert.assertTrue(pageStore.page(msg));
- Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> readMessage = iterator.next();
assertNotNull(readMessage);
assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
- assertNull(cursor.moveNext());
+ assertNull(iterator.next());
}
server.stop();
@@ -530,6 +558,7 @@
.getPageStore(ADDRESS)
.getCursorProvier()
.getPersistentCursor(queue.getID());
+ iterator = cursor.iterator();
for (int i = 0; i < NUM_MESSAGES * 2; i++)
{
@@ -549,7 +578,7 @@
Assert.assertTrue(pageStore.page(msg));
}
- Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> readMessage = iterator.next();
assertNotNull(readMessage);
@@ -568,6 +597,7 @@
.getPageStore(ADDRESS)
.getCursorProvier()
.getPersistentCursor(queue.getID());
+ iterator = cursor.iterator();
for (int i = 0; i < NUM_MESSAGES * 3; i++)
{
@@ -587,7 +617,7 @@
Assert.assertTrue(pageStore.page(msg));
}
- Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> readMessage = iterator.next();
assertNotNull(readMessage);
@@ -596,7 +626,7 @@
assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
}
- Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> readMessage = iterator.next();
assertEquals(NUM_MESSAGES * 3, readMessage.b.getMessage().getIntProperty("key").intValue());
@@ -658,6 +688,7 @@
.getPageStore(ADDRESS)
.getCursorProvier()
.createPersistentCursor(queue.getID(), null);
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
System.out.println("Cursor: " + cursor);
@@ -686,13 +717,13 @@
// First consume what's already there without any tx as nothing was committed
for (int i = 300; i < 400; i++)
{
- Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> pos = iterator.next();
assertNotNull("Null at position " + i, pos);
assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
cursor.ack(pos.a);
}
- assertNull(cursor.moveNext());
+ assertNull(iterator.next());
cursor.printDebug();
pgtxRollback.rollback();
@@ -703,13 +734,13 @@
// Second:after pgtxCommit was done
for (int i = 200; i < 300; i++)
{
- Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> pos = iterator.next();
assertNotNull(pos);
assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
cursor.ack(pos.a);
}
- assertNull(cursor.moveNext());
+ assertNull(iterator.next());
server.stop();
createServer();
@@ -733,9 +764,11 @@
PageCursorImpl cursor2 = (PageCursorImpl)cursorProvider.createNonPersistentCursor(null);
Pair<PagePosition, PagedMessage> msg;
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator2 = cursor.iterator();
int key = 0;
- while ((msg = cursor.moveNext()) != null)
+ while ((msg = iterator.next()) != null)
{
assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
@@ -748,7 +781,7 @@
for (int i = 0; i < 10; i++)
{
- msg = cursor2.moveNext();
+ msg = iterator2.next();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
}
@@ -791,9 +824,10 @@
msg = null;
cache = null;
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
Pair<PagePosition, PagedMessage> msgCursor = null;
- while ((msgCursor = cursor.moveNext()) != null)
+ while ((msgCursor = iterator.next()) != null)
{
assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msgCursor.a);
@@ -840,8 +874,10 @@
cache = null;
+ LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+
Pair<PagePosition, PagedMessage> msgCursor = null;
- while ((msgCursor = cursor.moveNext()) != null)
+ while ((msgCursor = iterator.next()) != null)
{
assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
}
@@ -856,7 +892,8 @@
cursorProvider = lookupCursorProvider();
cursor = cursorProvider.getPersistentCursor(queue.getID());
key = initialKey;
- while ((msgCursor = cursor.moveNext()) != null)
+ iterator = cursor.iterator();
+ while ((msgCursor = iterator.next()) != null)
{
assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msgCursor.a);
14 years, 4 months
JBoss hornetq SVN: r9821 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-29 19:28:22 -0400 (Fri, 29 Oct 2010)
New Revision: 9821
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
Log:
changes
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-10-29 23:25:39 UTC (rev 9820)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-10-29 23:28:22 UTC (rev 9821)
@@ -321,7 +321,15 @@
public ClientMessage receive(final long timeout) throws HornetQException
{
- return receive(timeout, false);
+ if (isBrowseOnly())
+ {
+ log.warn("receive timeout is not effective on browsing, ignoring timeout");
+ return receive(0, true);
+ }
+ else
+ {
+ return receive(timeout, false);
+ }
}
public ClientMessage receive() throws HornetQException
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-29 23:25:39 UTC (rev 9820)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-29 23:28:22 UTC (rev 9821)
@@ -47,8 +47,6 @@
void disableAutoCleanup();
void enableAutoCleanup();
-
- Pair<PagePosition, PagedMessage> moveNext() throws Exception;
void ack(PagePosition position) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-29 23:25:39 UTC (rev 9820)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-29 23:28:22 UTC (rev 9821)
@@ -17,6 +17,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
@@ -44,6 +45,8 @@
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.Future;
+import org.hornetq.utils.LinkedListImpl;
+import org.hornetq.utils.LinkedListIterator;
/**
* A PageCursorImpl
@@ -91,7 +94,7 @@
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
- private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
+ private final org.hornetq.utils.LinkedList<PagePosition> redeliveries = new LinkedListImpl<PagePosition>();
// Static --------------------------------------------------------
@@ -147,11 +150,80 @@
ack(position);
}
+
+
+ class LocalIterator implements LinkedListIterator<Pair<PagePosition, PagedMessage>>
+ {
+ PagePosition position = getLastPosition();
+
+ PagePosition lastOperation = null;
+
+ LinkedListIterator<PagePosition> redeliveryIterator = redeliveries.iterator();
+ boolean isredelivery = false;
+
+
+ public void repeat()
+ {
+ if (isredelivery)
+ {
+ redeliveryIterator.repeat();
+ }
+ else
+ {
+ if (lastOperation == null)
+ {
+ position = getLastPosition();
+ }
+ else
+ {
+ position = lastOperation;
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#next()
+ */
+ public Pair<PagePosition, PagedMessage> next()
+ {
+ try
+ {
+ Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
+ lastOperation = position;
+ position = nextPos.a;
+ return nextPos;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public boolean hasNext()
+ {
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#remove()
+ */
+ public void remove()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.utils.LinkedListIterator#close()
+ */
+ public void close()
+ {
+ }
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
- public synchronized Pair<PagePosition, PagedMessage> moveNext() throws Exception
+ public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition posision) throws Exception
{
PagePosition redeliveryPos = null;
@@ -161,20 +233,13 @@
return new Pair<PagePosition, PagedMessage>(redeliveryPos, cursorProvider.getMessage(redeliveryPos));
}
- if (lastPosition == null)
- {
- // it will start at the first available page
- long firstPage = pageStore.getFirstPage();
- lastPosition = new PagePositionImpl(firstPage, -1);
- }
-
boolean match = false;
Pair<PagePosition, PagedMessage> message = null;
do
{
- message = cursorProvider.getNext(this, lastPosition);
+ message = cursorProvider.getNext(this, posision);
if (message != null)
{
@@ -194,6 +259,21 @@
return message;
}
+ /**
+ *
+ */
+ private PagePosition getLastPosition()
+ {
+ if (lastPosition == null)
+ {
+ // it will start at the first available page
+ long firstPage = pageStore.getFirstPage();
+ lastPosition = new PagePositionImpl(firstPage, -1);
+ }
+
+ return lastPosition;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
*/
@@ -251,7 +331,7 @@
*/
public synchronized void redeliver(final PagePosition position)
{
- redeliveries.add(position);
+ redeliveries.addTail(position);
}
/**
Modified: branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
===================================================================
--- branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2010-10-29 23:25:39 UTC (rev 9820)
+++ branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2010-10-29 23:28:22 UTC (rev 9821)
@@ -26,6 +26,12 @@
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.tests.util.ProxyAssertSupport;
/**
@@ -109,8 +115,8 @@
}
}
}
-
- public void testBrowse() throws Exception
+
+ public void testBrowse2() throws Exception
{
Connection conn = null;
@@ -121,59 +127,75 @@
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(HornetQServerTestCase.queue1);
+
+ HornetQConnectionFactory cf = (HornetQConnectionFactory) getConnectionFactory();
- final int numMessages = 10;
+ ClientSession coreSession = cf.getCoreFactory().createSession(true, true);
- for (int i = 0; i < numMessages; i++)
+ coreSession.start();
+
+ ClientConsumer browser = coreSession.createConsumer("jms.queue.Queue1", true);
+
+ conn.start();
+
+ Message m = session.createMessage();
+ m.setIntProperty("cnt", 0);
+ producer.send(m);
+
+
+ assertNotNull(browser.receive(5000));
+
+ Thread.sleep(5000);
+
+ coreSession.close();
+
+
+ System.out.println("Draining destination...");
+ drainDestination(getConnectionFactory(), queue1);
+
+ }
+ finally
+ {
+ if (conn != null)
{
- Message m = session.createMessage();
- m.setIntProperty("cnt", i);
- producer.send(m);
+ conn.close();
}
+ }
+ }
+ public void testBrowse() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = getConnectionFactory().createConnection();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(HornetQServerTestCase.queue1);
+
QueueBrowser browser = session.createBrowser(HornetQServerTestCase.queue1);
ProxyAssertSupport.assertEquals(browser.getQueue(), HornetQServerTestCase.queue1);
ProxyAssertSupport.assertNull(browser.getMessageSelector());
- Enumeration en = browser.getEnumeration();
+ Enumeration<Message> en = (Enumeration<Message>)browser.getEnumeration();
- int count = 0;
- while (en.hasMoreElements())
- {
- en.nextElement();
- count++;
- }
-
- ProxyAssertSupport.assertEquals(numMessages, count);
-
- MessageConsumer mc = session.createConsumer(HornetQServerTestCase.queue1);
-
conn.start();
- for (int i = 0; i < numMessages; i++)
- {
- Message m = mc.receive();
- ProxyAssertSupport.assertNotNull(m);
- }
-
- browser = session.createBrowser(HornetQServerTestCase.queue1);
- en = browser.getEnumeration();
-
- log.info("browsing");
-
- count = 0;
- while (en.hasMoreElements())
- {
- Message mess = (Message)en.nextElement();
- log.info("message:" + mess);
- count++;
- }
-
- log.trace("Received " + count + " messages");
-
- ProxyAssertSupport.assertEquals(0, count);
+ Message m = session.createMessage();
+ m.setIntProperty("cnt", 0);
+ producer.send(m);
+ Message m2 = en.nextElement();
+
+ assertNotNull(m2);
+
+
+ System.out.println("Draining destination...");
+ drainDestination(getConnectionFactory(), queue1);
+
}
finally
{
@@ -204,19 +226,6 @@
m.setIntProperty("test_counter", i + 1);
producer.send(m);
}
-
- QueueBrowser browser = session.createBrowser(HornetQServerTestCase.queue1, "test_counter > 30");
-
- Enumeration en = browser.getEnumeration();
- int count = 0;
- while (en.hasMoreElements())
- {
- Message m = (Message)en.nextElement();
- int testCounter = m.getIntProperty("test_counter");
- ProxyAssertSupport.assertTrue(testCounter > 30);
- count++;
- }
- ProxyAssertSupport.assertEquals(70, count);
}
finally
{
14 years, 4 months
JBoss hornetq SVN: r9819 - branches/Branch_New_Paging/hornetq-rest/docbook.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-29 19:21:30 -0400 (Fri, 29 Oct 2010)
New Revision: 9819
Modified:
branches/Branch_New_Paging/hornetq-rest/docbook/
Log:
svn:ignore
Property changes on: branches/Branch_New_Paging/hornetq-rest/docbook
___________________________________________________________________
Name: svn:ignore
+ target
14 years, 4 months
JBoss hornetq SVN: r9818 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/server/cluster/impl and 7 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-29 06:13:41 -0400 (Fri, 29 Oct 2010)
New Revision: 9818
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TemporaryQueueClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
fixed threading issue and tests
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -141,7 +141,13 @@
private volatile boolean stopPingingAfterOne;
private volatile boolean closed;
-
+
+ public final Exception e = new Exception();
+
+ private final Object waitLock = new Object();
+
+ public static List<ClientSessionFactoryImpl> factories = new ArrayList<ClientSessionFactoryImpl>();
+
// Static
// ---------------------------------------------------------------------------------------
@@ -161,6 +167,9 @@
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> interceptors) throws HornetQException
{
+
+ e.fillInStackTrace();
+
this.serverLocator = serverLocator;
this.connectorConfig = connectorConfig;
@@ -390,6 +399,10 @@
public void causeExit()
{
exitLoop = true;
+ synchronized (waitLock)
+ {
+ waitLock.notify();
+ }
}
public void close()
@@ -867,59 +880,69 @@
long interval = retryInterval;
int count = 0;
-
- while (true)
+ factories.add(this);
+ try
{
- if (exitLoop)
+ synchronized (waitLock)
{
- return;
- }
+ while (true)
+ {
+ if (exitLoop)
+ {
+ return;
+ }
- getConnection();
+ getConnection();
- if (connection == null)
- {
- // Failed to get connection
+ if (connection == null)
+ {
+ // Failed to get connection
- if (reconnectAttempts != 0)
- {
- count++;
+ if (reconnectAttempts != 0)
+ {
+ count++;
- if (reconnectAttempts != -1 && count == reconnectAttempts)
- {
- log.warn("Tried " + reconnectAttempts + " times to connect. Now giving up.");
+ if (reconnectAttempts != -1 && count == reconnectAttempts)
+ {
+ log.warn("Tried " + reconnectAttempts + " times to connect. Now giving up.");
- return;
- }
+ return;
+ }
- try
- {
- Thread.sleep(interval);
- }
- catch (InterruptedException ignore)
- {
- }
+ try
+ {
+ waitLock.wait(interval);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
- // Exponential back-off
- long newInterval = (long)(interval * retryIntervalMultiplier);
+ // Exponential back-off
+ long newInterval = (long)(interval * retryIntervalMultiplier);
- if (newInterval > maxRetryInterval)
- {
- newInterval = maxRetryInterval;
+ if (newInterval > maxRetryInterval)
+ {
+ newInterval = maxRetryInterval;
+ }
+
+ interval = newInterval;
+ }
+ else
+ {
+ return;
+ }
+ }
+ else
+ {
+ return;
+ }
}
-
- interval = newInterval;
- }
- else
- {
- return;
- }
}
- else
- {
- return;
- }
}
+ finally
+ {
+ factories.remove(this);
+ }
}
private void cancelScheduledTasks()
@@ -1084,6 +1107,21 @@
return connection;
}
+ public void finalize() throws Throwable
+ {
+ if (!closed)
+ {
+ log.warn("I'm closing a core ClientSessionFactory you left open. Please make sure you close all ClientSessionFactories explicitly " + "before letting them go out of scope! " +
+ System.identityHashCode(this));
+
+ log.warn("The ClientSessionFactory you didn't close was created here:", e);
+
+ close();
+ }
+
+ super.finalize();
+ }
+
private ConnectorFactory instantiateConnectorFactory(final String connectorFactoryClassName)
{
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -159,6 +159,7 @@
private TransportConfiguration clusterTransportConfiguration;
private boolean backup;
+ private final Exception e = new Exception();
private static synchronized ExecutorService getGlobalThreadPool()
{
@@ -303,6 +304,8 @@
final int discoveryPort,
final TransportConfiguration[] transportConfigs)
{
+ e.fillInStackTrace();
+
this.ha = useHA;
this.discoveryAddress = discoveryAddress;
@@ -1310,11 +1313,25 @@
}
}
+ public void finalize() throws Throwable
+ {
+ if (!closed)
+ {
+ log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
+ System.identityHashCode(this));
+ log.warn("The ServerLocator you didn't close was created here:", e);
+
+ close();
+ }
+
+ super.finalize();
+ }
+
class Connector implements Callable<ClientSessionFactory>
{
private TransportConfiguration initialConnector;
- private ClientSessionFactoryInternal factory;
+ private volatile ClientSessionFactoryInternal factory;
private boolean isConnected = false;
private boolean interrupted = false;
private Exception e;
@@ -1327,6 +1344,10 @@
public ClientSessionFactory call() throws HornetQException
{
factory = getFactory();
+ if(factory == null)
+ {
+ return null;
+ }
try
{
factory.connect(reconnectAttempts, failoverOnInitialConnection);
@@ -1338,6 +1359,11 @@
this.e = e;
throw e;
}
+ if(factory != null)
+ {
+ factory.close();
+ factory = null;
+ }
return null;
}
isConnected = true;
@@ -1356,29 +1382,24 @@
return isConnected;
}
- public void disconnect()
+ public synchronized void disconnect()
{
interrupted = true;
- try
+
+ if (factory != null)
{
- ClientSessionFactoryInternal factory = getFactory();
- if (factory != null)
- {
- factory.causeExit();
- }
- else
- {
- System.out.println("ServerLocatorImpl$StaticConnector$Connector.disconnect");
- }
+ factory.causeExit();
+ factory.close();
+ factory = null;
}
- catch (HornetQException e1)
- {
- log.debug("exception closing factory");
- }
}
private synchronized ClientSessionFactoryInternal getFactory() throws HornetQException
{
+ if(interrupted)
+ {
+ return null;
+ }
if (factory == null)
{
try
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -214,6 +214,13 @@
bridges.clear();
+ if(backupSessionFactory != null)
+ {
+ backupSessionFactory.close();
+ backupSessionFactory.getServerLocator().close();
+ backupSessionFactory = null;
+ }
+
started = false;
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -115,6 +115,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testSimpleDuplicateDetectionWithString() throws Exception
@@ -183,6 +185,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testCacheSize() throws Exception
@@ -340,6 +344,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testTransactedDuplicateDetection1() throws Exception
@@ -390,6 +396,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testTransactedDuplicateDetection2() throws Exception
@@ -434,6 +442,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testTransactedDuplicateDetection3() throws Exception
@@ -490,6 +500,8 @@
session.close();
sf.close();
+
+ locator.close();
}
/*
@@ -553,6 +565,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testXADuplicateDetection1() throws Exception
@@ -630,6 +644,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testXADuplicateDetection2() throws Exception
@@ -709,6 +725,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testXADuplicateDetection3() throws Exception
@@ -787,6 +805,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testXADuplicateDetection4() throws Exception
@@ -867,6 +887,8 @@
session.close();
sf.close();
+
+ locator.close();
}
private ClientMessage createMessage(final ClientSession session, final int i)
@@ -958,6 +980,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1037,6 +1061,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1128,6 +1154,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1229,6 +1257,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1314,6 +1344,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1403,6 +1435,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1494,6 +1528,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1597,6 +1633,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1696,6 +1734,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1797,6 +1837,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1897,6 +1939,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -48,6 +48,7 @@
private SimpleString queue;
private SimpleString address;
+ private ServerLocator locator;
// Static --------------------------------------------------------
@@ -156,7 +157,7 @@
address = RandomUtil.randomSimpleString();
queue = RandomUtil.randomSimpleString();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
sf = locator.createSessionFactory();
@@ -176,6 +177,8 @@
sf.close();
+ locator.close();
+
server.stop();
session = null;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -119,6 +119,8 @@
session.close();
+ locator.close();
+
server.stop();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -38,6 +38,7 @@
private HornetQServer server;
private ClientSession clientSession;
+ private ServerLocator locator;
public void testBasicSend() throws Exception
{
@@ -194,6 +195,8 @@
sendSession.close();
+ locator.close();
+
}
public void testExpireWithDefaultAddressSettings() throws Exception
@@ -310,7 +313,7 @@
// start the server
server.start();
// then we create a client as normal
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
locator.setBlockOnAcknowledge(true);
ClientSessionFactory sessionFactory = locator.createSessionFactory();
@@ -344,6 +347,7 @@
//
}
}
+ locator.close();
server = null;
clientSession = null;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -95,6 +95,10 @@
}
+ cf1.close();
+
+ cf2.close();
+
server = null;
jmsServer = null;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -52,6 +52,7 @@
private HornetQServer server;
private CoreRemotingConnection connection;
+ private ServerLocator locator;
// Static --------------------------------------------------------
@@ -66,7 +67,7 @@
server.getConfiguration().setConnectionTTLOverride(500);
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
ClientSessionFactory csf = locator.createSessionFactory();
connection = csf.getConnection();
@@ -77,6 +78,8 @@
{
connection.destroy();
+ locator.close();
+
server.stop();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -49,6 +49,7 @@
private ClientSession session;
private ClientSessionFactory sf;
+ private ServerLocator locator;
// Static --------------------------------------------------------
@@ -274,7 +275,7 @@
config.setSecurityEnabled(false);
server = HornetQServers.newHornetQServer(config, false);
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
sf = locator.createSessionFactory();
@@ -288,6 +289,8 @@
session.close();
+ locator.close();
+
server.stop();
sf = null;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -37,6 +37,7 @@
private HornetQServer server;
private ClientSession clientSession;
+ private ServerLocator locator;
public void testSendToDLAWhenNoRoute() throws Exception
{
@@ -71,7 +72,7 @@
// start the server
server.start();
// then we create a client as normal
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
ClientSessionFactory sessionFactory = locator.createSessionFactory();
clientSession = sessionFactory.createSession(false, true, false);
}
@@ -90,6 +91,7 @@
//
}
}
+ locator.close();
if (server != null && server.isStarted())
{
try
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -58,6 +58,7 @@
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(createTransportConfiguration(isNetty(), false, generateParams(0, isNetty())));
ClientSessionFactory csf = locator.createSessionFactory();
csf.close();
+ locator.close();
}
public void testSingleConnectorSingleServerConnect() throws Exception
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -187,6 +187,8 @@
session0.close();
session2.close();
+ locator.close();
+
server0.stop();
server1.stop();
service2.stop();
@@ -315,6 +317,8 @@
session0.close();
session2.close();
+ locator.close();
+
server0.stop();
server1.stop();
service2.stop();
@@ -435,6 +439,8 @@
session0.close();
session1.close();
+ locator.close();
+
server0.stop();
server1.stop();
@@ -572,6 +578,8 @@
session0.close();
session1.close();
+ locator.close();
+
server0.stop();
server1.stop();
@@ -705,6 +713,8 @@
session0.close();
session1.close();
+ locator.close();
+
server0.stop();
server1.stop();
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -41,6 +41,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
super.tearDown();
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.*;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
@@ -97,6 +98,8 @@
nodeManagers[i] = new InVMNodeManager();
}
+ locators = new ServerLocator[ClusterTestBase.MAX_SERVERS];
+
}
@Override
@@ -151,6 +154,8 @@
protected NodeManager[] nodeManagers;
+ protected ServerLocator[] locators;
+
protected ClientSessionFactory[] sfs;
protected ClientConsumer getConsumer(final int node)
@@ -431,6 +436,21 @@
}
}
+ protected void closeAllServerLocatorsFactories() throws Exception
+ {
+ for (int i = 0; i < locators.length; i++)
+ {
+ ServerLocator sf = locators[i];
+
+ if (sf != null)
+ {
+ sf.close();
+
+ locators[i] = null;
+ }
+ }
+ }
+
protected void closeSessionFactory(final int node)
{
ClientSessionFactory sf = sfs[node];
@@ -1134,11 +1154,11 @@
serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
}
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+ locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ locators[node].setBlockOnNonDurableSend(true);
+ locators[node].setBlockOnDurableSend(true);
+ ClientSessionFactory sf = locators[node].createSessionFactory();
sfs[node] = sf;
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -132,6 +132,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4, 5);
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -19,8 +19,10 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
@@ -88,6 +90,9 @@
closeAllSessionFactories();
+
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -193,6 +198,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -252,6 +259,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -314,6 +323,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -374,6 +385,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -436,6 +449,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -493,6 +508,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -573,6 +590,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -675,6 +694,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -783,6 +804,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -886,6 +909,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -950,6 +975,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -1016,10 +1043,28 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if(locators != null)
+ {
+ for (ServerLocator locator : locators)
+ {
+ if(locator != null)
+ {
+ System.out.println("ClusteredGroupingTest.tearDown");
+ }
+ }
+ }
+ super.tearDown(); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
public boolean isNetty()
{
return true;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -159,6 +159,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4);
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -664,6 +664,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
clearServer(0, 1, 2);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -53,6 +53,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4);
super.tearDown();
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -51,6 +51,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
super.tearDown();
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -110,6 +110,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4);
startServers();
@@ -1771,6 +1773,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4);
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -88,6 +88,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers();
startServers();
@@ -131,6 +133,8 @@
closeAllConsumers();
closeAllSessionFactories();
+
+ closeAllServerLocatorsFactories();
}
catch (Throwable e)
{
@@ -257,6 +261,8 @@
closeAllConsumers();
closeAllSessionFactories();
+
+ closeAllServerLocatorsFactories();
}
//@Override
@@ -459,6 +465,8 @@
closeAllConsumers();
closeAllSessionFactories();
+
+ closeAllServerLocatorsFactories();
}
@Override
@@ -597,6 +605,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(5, 6, 7, 8, 9, 0, 1, 2, 3, 4);
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TemporaryQueueClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TemporaryQueueClusterTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TemporaryQueueClusterTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -131,6 +131,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -56,6 +56,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
super.tearDown();
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -310,6 +310,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4, 5);
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -122,6 +122,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -225,6 +227,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -98,6 +98,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
}
}
@@ -171,6 +173,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
}
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-10-27 21:50:48 UTC (rev 9817)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-10-29 10:13:41 UTC (rev 9818)
@@ -15,7 +15,9 @@
import java.io.File;
import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.management.MBeanServer;
@@ -70,6 +72,33 @@
protected static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
+ private List<ServerLocator> locators = new ArrayList<ServerLocator>();
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ for (ServerLocator locator : locators)
+ {
+ try
+ {
+ locator.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ locators.clear();
+ if(!ClientSessionFactoryImpl.factories.isEmpty())
+ {
+ for (ClientSessionFactoryImpl factory : ClientSessionFactoryImpl.factories)
+ {
+ //factory.e.printStackTrace();
+ }
+ }
+ super.tearDown(); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
protected static Map<String, Object> generateParams(final int node, final boolean netty)
{
Map<String, Object> params = new HashMap<String, Object>();
@@ -421,12 +450,16 @@
protected ServerLocator createInVMNonHALocator()
{
- return HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ ServerLocator locatorWithoutHA = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ locators.add(locatorWithoutHA);
+ return locatorWithoutHA;
}
protected ServerLocator createNettyNonHALocator()
{
- return HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
+ ServerLocator serverLocatorWithoutHA = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
+ locators.add(serverLocatorWithoutHA);
+ return serverLocatorWithoutHA;
}
protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws Exception
14 years, 4 months
JBoss hornetq SVN: r9817 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-27 17:50:48 -0400 (Wed, 27 Oct 2010)
New Revision: 9817
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
cleanup page support
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -83,13 +83,22 @@
Page depage() throws Exception;
+ void forceAnotherPage() throws Exception;
+ Page getCurrentPage();
+
+
/**
* @return false if a thread was already started, or if not in page mode
* @throws Exception
*/
boolean startDepaging();
+
+ /** @return true if paging was started, or false if paging was already started before this call */
+ boolean startPaging() throws Exception;
+ void stopPaging() throws Exception;
+
void addSize(int size);
void executeRunnableWhenMemoryAvailable(Runnable runnable);
@@ -103,4 +112,8 @@
*
*/
void unlock();
+
+ /** This is used mostly by tests.
+ * We will wait any pending runnable to finish its execution */
+ void flushExecutors();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -40,6 +40,14 @@
// To be called when the cursor is closed for good. Most likely when the queue is deleted
void close() throws Exception;
+ void scheduleCleanupCheck();
+
+ void cleanupEntries() throws Exception;
+
+ void disableAutoCleanup();
+
+ void enableAutoCleanup();
+
Pair<PagePosition, PagedMessage> moveNext() throws Exception;
void ack(PagePosition position) throws Exception;
@@ -85,4 +93,6 @@
* @return
*/
boolean isComplete(long minPage);
+
+ void flushExecutors();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -66,8 +66,13 @@
void processReload() throws Exception;
void stop();
+
+ void flushExecutors();
void scheduleCleanup();
+
+ // Perform the cleanup at the caller's thread (for startup and recovery)
+ void cleanup();
/**
* @param pageCursorImpl
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -68,10 +68,12 @@
System.out.println(message);
}
+ private volatile boolean autoCleanup = true;
+
private final StorageManager store;
private final long cursorId;
-
+
private final Filter filter;
private final PagingStore pageStore;
@@ -112,6 +114,16 @@
// Public --------------------------------------------------------
+ public void disableAutoCleanup()
+ {
+ autoCleanup = false;
+ }
+
+ public void enableAutoCleanup()
+ {
+ autoCleanup = true;
+ }
+
public PageCursorProvider getProvider()
{
return cursorProvider;
@@ -438,7 +450,7 @@
}
}
- public void stop()
+ public void flushExecutors()
{
Future future = new Future();
executor.execute(future);
@@ -448,6 +460,11 @@
}
}
+ public void stop()
+ {
+ flushExecutors();
+ }
+
public void printDebug()
{
printDebug(toString());
@@ -507,7 +524,10 @@
if (lastAckedPosition != null && lastAckedPosition.getPageNr() != pos.getPageNr())
{
// there's a different page being acked, we will do the check right away
- scheduleCleanupCheck();
+ if (autoCleanup)
+ {
+ scheduleCleanupCheck();
+ }
}
lastAckedPosition = pos;
}
@@ -547,32 +567,38 @@
*/
private void onPageDone(final PageCursorInfo info)
{
- scheduleCleanupCheck();
+ if (autoCleanup)
+ {
+ scheduleCleanupCheck();
+ }
}
- private void scheduleCleanupCheck()
+ public void scheduleCleanupCheck()
{
- executor.execute(new Runnable()
+ if (autoCleanup)
{
+ executor.execute(new Runnable()
+ {
- public void run()
- {
- try
+ public void run()
{
- cleanupPages();
+ try
+ {
+ cleanupEntries();
+ }
+ catch (Exception e)
+ {
+ PageCursorImpl.log.warn("Error on cleaning up cursor pages", e);
+ }
}
- catch (Exception e)
- {
- PageCursorImpl.log.warn("Error on cleaning up cursor pages", e);
- }
- }
- });
+ });
+ }
}
/**
* It will cleanup all the records for completed pages
* */
- private void cleanupPages() throws Exception
+ public void cleanupEntries() throws Exception
{
Transaction tx = new TransactionImpl(store);
@@ -687,7 +713,11 @@
@Override
public String toString()
{
- return "PageCursorInfo::PageID=" + pageId + " numberOfMessage = " + numberOfMessages + ", confirmed = " + confirmed;
+ return "PageCursorInfo::PageID=" + pageId +
+ " numberOfMessage = " +
+ numberOfMessages +
+ ", confirmed = " +
+ confirmed;
}
public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -264,6 +264,9 @@
{
cursor.processReload();
}
+
+ cleanup();
+
}
public void stop()
@@ -289,6 +292,29 @@
}
+ public void flushExecutors()
+ {
+ for (PageCursor cursor : activeCursors.values())
+ {
+ cursor.flushExecutors();
+ }
+
+ for (PageCursor cursor : nonPersistentCursors)
+ {
+ cursor.flushExecutors();
+ }
+
+ Future future = new Future();
+
+ executor.execute(future);
+
+ while (!future.await(10000))
+ {
+ log.warn("Waiting cursor provider " + this + " to finish executors");
+ }
+
+ }
+
public void close(PageCursor cursor)
{
if (cursor.getId() != 0)
@@ -318,7 +344,7 @@
});
}
- private void cleanup()
+ public void cleanup()
{
ArrayList<Page> depagedPages = new ArrayList<Page>();
@@ -328,17 +354,22 @@
{
try
{
+ if (!pagingStore.isStarted())
+ {
+ return;
+ }
+
ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
cursorList.addAll(activeCursors.values());
cursorList.addAll(nonPersistentCursors);
long minPage = checkMinPage(cursorList);
-
- if (minPage == pagingStore.getCurrentWritingPage())
+
+ if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0)
{
boolean complete = true;
-
- for (PageCursor cursor: cursorList)
+
+ for (PageCursor cursor : cursorList)
{
if (!cursor.isComplete(minPage))
{
@@ -346,19 +377,59 @@
break;
}
}
-
+
if (complete)
{
- System.out.println("Depaging complete now. We can leave page state at this point!");
- // move every cursor away from the main page, clearing every cursor's old pages while only keeping a bookmark for the next page case it happens again
+
+ System.out.println("Disabling depage!");
+ pagingStore.forceAnotherPage();
+
+ Page currentPage = pagingStore.getCurrentPage();
+
+ try
+ {
+ // First step: Move every cursor to the next bookmarked page (that was just created)
+ for (PageCursor cursor : cursorList)
+ {
+ cursor.ack(new PagePositionImpl(currentPage.getPageId(), -1));
+ }
+
+ storageManager.waitOnOperations();
+ }
+ finally
+ {
+ for (PageCursor cursor : cursorList)
+ {
+ cursor.enableAutoCleanup();
+ }
+ }
+
+ pagingStore.stopPaging();
+
+ // This has to be called after we stopped paging
+ for (PageCursor cursor : cursorList)
+ {
+ cursor.scheduleCleanupCheck();
+ }
+
}
}
for (long i = pagingStore.getFirstPage(); i < minPage; i++)
{
Page page = pagingStore.depage();
+ if (page == null)
+ {
+ break;
+ }
depagedPages.add(page);
}
+
+ if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 &&
+ pagingStore.getCurrentPage().getNumberOfMessages() == 0)
+ {
+ pagingStore.stopPaging();
+ }
}
catch (Exception ex)
{
@@ -412,11 +483,6 @@
*/
private long checkMinPage(List<PageCursor> cursorList)
{
- if (cursorList.size() == 0)
- {
- return 0l;
- }
-
long minPage = Long.MAX_VALUE;
for (PageCursor cursor : cursorList)
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -113,18 +113,15 @@
private volatile int currentPageId;
private volatile Page currentPage;
+
+ private volatile boolean paging = false;
- private final ReentrantLock writeLock = new ReentrantLock();
-
/** duplicate cache used at this address */
private final DuplicateIDCache duplicateCache;
-
+
private final PageCursorProvider cursorProvider;
- /**
- * We need to perform checks on currentPage with minimal locking
- * */
- private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
private volatile boolean running = false;
@@ -193,7 +190,7 @@
this.storeFactory = storeFactory;
this.syncNonTransactional = syncNonTransactional;
-
+
this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory);
// Post office could be null on the backup node
@@ -209,34 +206,34 @@
}
// Public --------------------------------------------------------
-
+
public String toString()
{
return "PagingStoreImpl(" + this.address + ")";
}
// PagingStore implementation ------------------------------------
-
+
public void lock()
{
- writeLock.lock();
+ lock.writeLock().lock();
}
-
+
public void unlock()
{
- writeLock.unlock();
+ lock.writeLock().unlock();
}
-
+
public PageCursorProvider getCursorProvier()
{
return cursorProvider;
}
-
+
public long getFirstPage()
{
return firstPageId;
}
-
+
public long getTopPage()
{
return currentPageId;
@@ -269,7 +266,7 @@
public boolean isPaging()
{
- currentPageLock.readLock().lock();
+ lock.readLock().lock();
try
{
@@ -283,12 +280,12 @@
}
else
{
- return currentPage != null;
+ return paging;
}
}
finally
{
- currentPageLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
@@ -296,7 +293,7 @@
{
return numberOfPages;
}
-
+
public int getCurrentWritingPage()
{
return currentPageId;
@@ -322,7 +319,7 @@
public void sync() throws Exception
{
- currentPageLock.readLock().lock();
+ lock.readLock().lock();
try
{
@@ -333,18 +330,17 @@
}
finally
{
- currentPageLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
public boolean startDepaging()
{
-
+
// Disabled for now
-
+
return false;
-
-
+
/*
if (!running)
{
@@ -384,7 +380,6 @@
currentPageLock.readLock().unlock();
} */
}
-
public void processReload() throws Exception
{
@@ -398,7 +393,7 @@
// HornetQComponent implementation
- public synchronized boolean isStarted()
+ public boolean isStarted()
{
return running;
}
@@ -407,21 +402,13 @@
{
if (running)
{
-
+
cursorProvider.stop();
running = false;
- Future future = new Future();
+ flushExecutors();
- executor.execute(future);
-
- if (!future.await(60000))
- {
- PagingStoreImpl.log.warn("Timed out on waiting PagingStore " + address + " to shutdown");
- }
-
-
if (currentPage != null)
{
currentPage.close();
@@ -429,10 +416,24 @@
}
}
}
+
+ public void flushExecutors()
+ {
+ cursorProvider.flushExecutors();
+
+ Future future = new Future();
+ executor.execute(future);
+
+ if (!future.await(60000))
+ {
+ PagingStoreImpl.log.warn("Timed out on waiting PagingStore " + address + " to shutdown");
+ }
+ }
+
public void start() throws Exception
{
- writeLock.lock();
+ lock.writeLock().lock();
try
{
@@ -448,80 +449,78 @@
}
else
{
- currentPageLock.writeLock().lock();
+ running = true;
+ firstPageId = Integer.MAX_VALUE;
- try
+ // There are no files yet on this Storage. We will just return it empty
+ if (fileFactory != null)
{
- running = true;
- firstPageId = Integer.MAX_VALUE;
- // There are no files yet on this Storage. We will just return it empty
- if (fileFactory != null)
- {
+ currentPageId = 0;
+ currentPage = null;
- currentPageId = 0;
- currentPage = null;
+ List<String> files = fileFactory.listFiles("page");
- List<String> files = fileFactory.listFiles("page");
+ numberOfPages = files.size();
- numberOfPages = files.size();
+ for (String fileName : files)
+ {
+ final int fileId = PagingStoreImpl.getPageIdFromFileName(fileName);
- for (String fileName : files)
+ if (fileId > currentPageId)
{
- final int fileId = PagingStoreImpl.getPageIdFromFileName(fileName);
-
- if (fileId > currentPageId)
- {
- currentPageId = fileId;
- }
-
- if (fileId < firstPageId)
- {
- firstPageId = fileId;
- }
+ currentPageId = fileId;
}
-
- if (currentPageId != 0)
+
+ if (fileId < firstPageId)
{
- currentPage = createPage(currentPageId);
- currentPage.open();
-
- List<PagedMessage> messages = currentPage.read();
-
- LivePageCache pageCache = new LivePageCacheImpl(currentPage);
-
- for (PagedMessage msg : messages)
- {
- msg.initMessage(storageManager);
- pageCache.addLiveMessage(msg);
- }
-
- currentPage.setLiveCache(pageCache);
-
- currentPageSize.set(currentPage.getSize());
-
- cursorProvider.addPageCache(pageCache);
+ firstPageId = fileId;
}
-
- if (currentPage != null)
+ }
+
+ if (currentPageId != 0)
+ {
+ currentPage = createPage(currentPageId);
+ currentPage.open();
+
+ List<PagedMessage> messages = currentPage.read();
+
+ LivePageCache pageCache = new LivePageCacheImpl(currentPage);
+
+ for (PagedMessage msg : messages)
{
-
- startPaging();
+ msg.initMessage(storageManager);
+ pageCache.addLiveMessage(msg);
}
+
+ currentPage.setLiveCache(pageCache);
+
+ currentPageSize.set(currentPage.getSize());
+
+ cursorProvider.addPageCache(pageCache);
}
+
+ // We will not mark it for paging if there's only a single empty file
+ if (currentPage != null && !(numberOfPages == 1 && currentPage.getSize() == 0))
+ {
+ startPaging();
+ }
}
- finally
- {
- currentPageLock.writeLock().unlock();
- }
}
}
finally
{
- writeLock.unlock();
+ lock.writeLock().unlock();
}
}
+
+ public void stopPaging()
+ {
+ lock.writeLock().lock();
+ paging = false;
+ lock.writeLock().unlock();
+ }
public boolean startPaging()
{
@@ -530,28 +529,30 @@
return false;
}
- // First check without any global locks.
- // (Faster)
- currentPageLock.readLock().lock();
+ lock.readLock().lock();
try
{
- // Already paging, nothing to be done
- if (currentPage != null)
+ if (paging)
{
return false;
}
}
finally
{
- currentPageLock.readLock().unlock();
+ lock.readLock().unlock();
}
// if the first check failed, we do it again under a global currentPageLock
// (writeLock) this time
- writeLock.lock();
+ lock.writeLock().lock();
try
{
+ if (paging)
+ {
+ return false;
+ }
+
if (currentPage == null)
{
try
@@ -565,17 +566,15 @@
PagingStoreImpl.log.warn("IO Error, impossible to start paging", e);
return false;
}
-
- return true;
}
- else
- {
- return false;
- }
+
+ paging = true;
+
+ return true;
}
finally
{
- writeLock.unlock();
+ lock.writeLock().unlock();
}
}
@@ -594,22 +593,19 @@
}
SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
-
+
Page page = new PageImpl(storeName, storageManager, fileFactory, file, pageNumber);
- // To create the file
+ // To create the file
file.open();
file.position(0);
file.close();
-
return page;
}
- // TestSupportPageStore ------------------------------------------
-
public void forceAnotherPage() throws Exception
{
openNewPage();
@@ -625,9 +621,7 @@
* */
public Page depage() throws Exception
{
- writeLock.lock();
-
- currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
+ lock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
try
{
if (!running)
@@ -689,8 +683,7 @@
}
finally
{
- currentPageLock.writeLock().unlock();
- writeLock.unlock();
+ lock.writeLock().unlock();
}
}
@@ -922,26 +915,26 @@
}
// We need to ensure a read lock, as depage could change the paging state
- currentPageLock.readLock().lock();
+ lock.readLock().lock();
try
{
// First check done concurrently, to avoid synchronization and increase throughput
- if (currentPage == null)
+ if (!paging)
{
return false;
}
}
finally
{
- currentPageLock.readLock().unlock();
+ lock.readLock().unlock();
}
- writeLock.lock();
+ lock.writeLock().lock();
try
{
- if (currentPage == null)
+ if (!paging)
{
return false;
}
@@ -971,42 +964,17 @@
if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
{
// Make sure nothing is currently validating or using currentPage
- currentPageLock.writeLock().lock();
- try
- {
- openNewPage();
-
- // openNewPage will set currentPageSize to zero, we need to set it again
- currentPageSize.addAndGet(bytesToWrite);
- }
- finally
- {
- currentPageLock.writeLock().unlock();
- }
+ openNewPage();
}
- currentPageLock.readLock().lock();
-
- try
- {
- currentPage.write(pagedMessage);
-
- if (sync)
- {
- currentPage.sync();
- }
- }
- finally
- {
- currentPageLock.readLock().unlock();
- }
+ currentPage.write(pagedMessage);
}
return true;
}
finally
{
- writeLock.unlock();
+ lock.writeLock().unlock();
}
}
@@ -1177,51 +1145,11 @@
return duplicateIdForPage;
}
- /**
- * @return
- */
- private boolean isAddressFull(final long nextPageSize)
- {
- return maxSize > 0 && getAddressSize() + nextPageSize > maxSize;
- }
+
- /**
- * startDepaging and clearDepage needs to be atomic.
- * We can't use writeLock to this operation as writeLock would still be used by another thread, and still being a valid usage
- * @return true if the depage status was cleared
- */
- private synchronized boolean clearDepage()
- {
- final boolean addressFull = isAddressFull(getPageSizeBytes());
-
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Clear Depage on Address = " + getStoreName() +
- " addressSize = " +
- getAddressSize() +
- " addressMax " +
- maxSize +
- " isPaging = " +
- isPaging() +
- " addressFull = " +
- addressFull);
- }
-
- // It should stop the executor when the address is full or when there is nothing else to be depaged
- if (addressFull || !isPaging())
- {
- depaging.set(false);
- return true;
- }
- else
- {
- return false;
- }
- }
-
private void openNewPage() throws Exception
{
- currentPageLock.writeLock().lock();
+ lock.writeLock().lock();
try
{
@@ -1240,20 +1168,20 @@
}
currentPage = createPage(currentPageId);
-
+
LivePageCache pageCache = new LivePageCacheImpl(currentPage);
-
+
currentPage.setLiveCache(pageCache);
cursorProvider.addPageCache(pageCache);
-
+
currentPageSize.set(0);
currentPage.open();
}
finally
{
- currentPageLock.writeLock().unlock();
+ lock.writeLock().unlock();
}
}
@@ -1282,39 +1210,39 @@
// Inner classes -------------------------------------------------
-/* private class DepageRunnable implements Runnable
- {
- private final Executor followingExecutor;
-
- public DepageRunnable(final Executor followingExecutor)
+ /* private class DepageRunnable implements Runnable
{
- this.followingExecutor = followingExecutor;
- }
+ private final Executor followingExecutor;
- public void run()
- {
- try
+ public DepageRunnable(final Executor followingExecutor)
{
- if (running)
+ this.followingExecutor = followingExecutor;
+ }
+
+ public void run()
+ {
+ try
{
- if (!isAddressFull(getPageSizeBytes()))
+ if (running)
{
- readPage();
- }
+ if (!isAddressFull(getPageSizeBytes()))
+ {
+ readPage();
+ }
- // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
- // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding
- // the lock and this would dead lock
- if (running && !clearDepage())
- {
- followingExecutor.execute(this);
+ // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
+ // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding
+ // the lock and this would dead lock
+ if (running && !clearDepage())
+ {
+ followingExecutor.execute(this);
+ }
}
}
+ catch (Throwable e)
+ {
+ PagingStoreImpl.log.error(e, e);
+ }
}
- catch (Throwable e)
- {
- PagingStoreImpl.log.error(e, e);
- }
- }
- } */
+ } */
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -13,7 +13,6 @@
package org.hornetq.core.paging.impl;
-import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagingStore;
/**
@@ -23,10 +22,4 @@
*/
public interface TestSupportPageStore extends PagingStore
{
- void forceAnotherPage() throws Exception;
-
- /** @return true if paging was started, or false if paging was already started before this call */
- boolean startPaging() throws Exception;
-
- Page getCurrentPage();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -280,8 +280,12 @@
if (timeout == 0)
{
waitCallback.waitCompletion();
+ return true;
}
- return waitCallback.waitCompletion(timeout);
+ else
+ {
+ return waitCallback.waitCompletion(timeout);
+ }
}
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-27 13:43:09 UTC (rev 9816)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-27 21:50:48 UTC (rev 9817)
@@ -132,6 +132,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -219,6 +220,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -283,7 +285,13 @@
}
}
cursorProvider.printDebug();
+
+ server.getStorageManager().waitOnOperations();
+ lookupPageStore(ADDRESS).flushExecutors();
+
+
+
// needs to clear the context since we are using the same thread over two distinct servers
// otherwise we will get the old executor on the factory
OperationContextImpl.clearContext();
@@ -312,9 +320,15 @@
OperationContextImpl.getContext(null).waitCompletion();
((PageCursorImpl)cursor).printDebug();
+
+ lookupPageStore(ADDRESS).flushExecutors();
+
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
server.stop();
createServer();
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -379,6 +393,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -450,6 +465,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -558,13 +574,13 @@
if (i % 100 == 0)
System.out.println("Paged " + i);
- if (i >= NUM_MESSAGES * 2)
+ if (i >= NUM_MESSAGES * 2 - 1)
{
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
- msg.putIntProperty("key", i);
+ msg.putIntProperty("key", i + 1);
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
@@ -579,13 +595,47 @@
assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
}
+
+ Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+
+ assertEquals(NUM_MESSAGES * 3, readMessage.b.getMessage().getIntProperty("key").intValue());
+
+ cursor.ack(readMessage.a);
+
+ server.getStorageManager().waitOnOperations();
+ pageStore.flushExecutors();
+
+ assertFalse(pageStore.isPaging());
+
server.stop();
createServer();
- assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ assertFalse(pageStore.isPaging());
+ waitCleanup();
+
+ assertFalse(lookupPageStore(ADDRESS).isPaging());
+
}
+ /**
+ * @throws Exception
+ * @throws InterruptedException
+ */
+ private void waitCleanup() throws Exception, InterruptedException
+ {
+ // The cleanup is done asynchronously, so we need to wait some time
+ long timeout = System.currentTimeMillis() + 10000;
+
+ while (System.currentTimeMillis() < timeout && lookupPageStore(ADDRESS).getNumberOfPages() != 1)
+ {
+ Thread.sleep(100);
+ }
+
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+ }
+
public void testPrepareScenarios() throws Exception
{
PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
@@ -663,6 +713,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -707,6 +758,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -754,6 +806,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -815,6 +868,7 @@
server.stop();
createServer();
+ waitCleanup();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
14 years, 4 months
JBoss hornetq SVN: r9816 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq: core/cluster/impl and 10 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-27 09:43:09 -0400 (Wed, 27 Oct 2010)
New Revision: 9816
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
Log:
added failback restart backup server support
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -1159,7 +1159,7 @@
// cause reconnect loop
public void run()
{
- conn.fail(new HornetQException(HornetQException.DISCONNECTED,
+ conn.fail(new HornetQException(msg.isFailoverOnServerShutdown()?HornetQException.NOT_CONNECTED:HornetQException.DISCONNECTED,
"The connection was disconnected because of server shutdown"));
if (msg.getNodeID() != null)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -165,6 +165,11 @@
started = false;
}
+ synchronized (waitLock)
+ {
+ waitLock.notify();
+ }
+
try
{
thread.join();
@@ -173,6 +178,7 @@
{
}
+
socket.close();
socket = null;
@@ -222,7 +228,7 @@
long toWait = timeout;
- while (!received && (toWait > 0 || timeout == 0))
+ while (started && !received && (toWait > 0 || timeout == 0))
{
try
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -288,7 +288,7 @@
callClosingListeners();
}
- public void disconnect()
+ public void disconnect(boolean failoverOnServerShutdown)
{
Channel channel0 = getChannel(0, -1);
@@ -307,7 +307,7 @@
channel.flushConfirmations();
}
- Packet disconnect = new DisconnectMessage(nodeID);
+ Packet disconnect = new DisconnectMessage(nodeID, failoverOnServerShutdown);
channel0.sendAndFlush(disconnect);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -31,16 +31,20 @@
// Attributes ----------------------------------------------------
private SimpleString nodeID;
+
+ private boolean failoverOnServerShutdown;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public DisconnectMessage(final SimpleString nodeID)
+ public DisconnectMessage(final SimpleString nodeID, boolean failoverOnServerShutdown)
{
super(PacketImpl.DISCONNECT);
this.nodeID = nodeID;
+
+ this.failoverOnServerShutdown = failoverOnServerShutdown;
}
public DisconnectMessage()
@@ -55,16 +59,24 @@
return nodeID;
}
+ public boolean isFailoverOnServerShutdown()
+ {
+ return failoverOnServerShutdown;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeNullableSimpleString(nodeID);
+
+ buffer.writeBoolean(failoverOnServerShutdown);
}
@Override
public void decodeRest(final HornetQBuffer buffer)
{
nodeID = buffer.readNullableSimpleString();
+ failoverOnServerShutdown = buffer.readBoolean();
}
@Override
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -89,7 +89,7 @@
manager.cleanup(this);
}
- public void disconnect()
+ public void disconnect(boolean clientFailover)
{
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -45,4 +45,6 @@
void freeze();
RemotingConnection getServerSideReplicatingConnection();
+
+ void stop(boolean failoverOnServerShutdown) throws Exception;
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -30,9 +30,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
@@ -255,6 +253,11 @@
public void stop() throws Exception
{
+ stop(false);
+ }
+
+ public void stop(boolean clientFailover) throws Exception
+ {
if (!started)
{
return;
@@ -279,7 +282,7 @@
{
RemotingConnection conn = entry.connection;
- conn.disconnect();
+ conn.disconnect(clientFailover);
}
for (Acceptor acceptor : acceptors)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -74,5 +74,5 @@
public abstract boolean isAwaitingFailback() throws Exception;
- public abstract void killServer();
+ public abstract boolean isBackupLive() throws Exception;
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -52,4 +52,6 @@
void notifyNodeUp(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
Topology getTopology();
+
+ void announceBackup() throws Exception;
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -210,7 +210,10 @@
}
catch (Exception e)
{
- log.warn("did not connect the cluster connection to other nodes", e);
+ if(started)
+ {
+ log.warn("did not connect the cluster connection to other nodes", e);
+ }
}
}
});
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -362,6 +362,7 @@
try
{
broadcastGroup.start();
+ broadcastGroup.activate();
}
catch (Exception e)
{
@@ -405,6 +406,29 @@
}
}
+ public void announceBackup() throws Exception
+ {
+ List<ClusterConnectionConfiguration> configs = this.configuration.getClusterConfigurations();
+ if(!configs.isEmpty())
+ {
+ ClusterConnectionConfiguration config = configs.get(0);
+
+ TransportConfiguration connector = configuration.getConnectorConfigurations().get(config.getConnectorName());
+
+ if (connector == null)
+ {
+ log.warn("No connecor with name '" + config.getConnectorName() +
+ "'. backup cannot be announced.");
+ return;
+ }
+ announceBackup(config, connector);
+ }
+ else
+ {
+ log.warn("no cluster connections defined, unable to announce backup");
+ }
+ }
+
private synchronized void announceNode()
{
// TODO does this really work with more than one cluster connection? I think not
@@ -784,6 +808,7 @@
{
return;
}
+ log.info("announcing backup");
backupSessionFactory = locator.connect();
backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), nodeUUID.toString(), true, connector));
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -36,6 +36,14 @@
private final String SERVER_LOCK_NAME = "server.lock";
+ private static final String ACCESS_MODE = "rw";
+
+ private static final int LIVE_LOCK_POS = 1;
+
+ private static final int BACKUP_LOCK_POS = 2;
+
+ private static final int LOCK_LENGTH = 1;
+
private static final byte LIVE = 'L';
private static final byte FAILINGBACK = 'F';
@@ -52,6 +60,7 @@
private final String directory;
+
public FileLockNodeManager(final String directory)
{
this.directory = directory;
@@ -70,7 +79,7 @@
file.createNewFile();
}
- RandomAccessFile raFile = new RandomAccessFile(file, "rw");
+ RandomAccessFile raFile = new RandomAccessFile(file, ACCESS_MODE);
channel = raFile.getChannel();
@@ -92,12 +101,20 @@
return getState() == FAILINGBACK;
}
- @Override
- public void killServer()
+ public boolean isBackupLive() throws Exception
{
- System.exit(0);
+ FileLock liveAttemptLock;
+ liveAttemptLock = channel.tryLock(LIVE_LOCK_POS, LOCK_LENGTH, false);
+ if(liveAttemptLock == null)
+ {
+ return true;
+ }
+ else
+ {
+ liveAttemptLock.release();
+ return false;
+ }
}
-
@Override
public void releaseBackup() throws Exception
{
@@ -115,7 +132,7 @@
Thread.sleep(2000);
}
- liveLock = channel.lock(1, 1, false);
+ liveLock = channel.lock(LIVE_LOCK_POS, 1, false);
byte state = getState();
@@ -144,7 +161,7 @@
log.info("Waiting to become backup node");
- backupLock = channel.lock(2, 1, false);
+ backupLock = channel.lock(BACKUP_LOCK_POS, LOCK_LENGTH, false);
log.info("** got backup lock");
@@ -157,7 +174,7 @@
log.info("Waiting to obtain live lock");
- liveLock = channel.lock(1, 1, false);
+ liveLock = channel.lock(LIVE_LOCK_POS, LOCK_LENGTH, false);
log.info("Live Server Obtained live lock");
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -305,10 +305,19 @@
checkJournalDirectory();
+ initialisePart1();
+
+ if(nodeManager.isBackupLive())
+ {
+ //looks like we've failed over at some point need to inform that we are the backup so when the current live
+ // goes down they failover to us
+ clusterManager.announceBackup();
+ //
+ Thread.sleep(2000);
+ }
+
nodeManager.startLiveNode();
- initialisePart1();
-
initialisePart2();
log.info("Server is now live");
@@ -362,25 +371,43 @@
nodeManager.releaseBackup();
if(configuration.isAllowAutoFailBack())
{
- //todo dont hardcode schedule timings
- scheduledPool.scheduleAtFixedRate(new Runnable()
+ class FailbackChecker implements Runnable
{
+ boolean restarting = false;
public void run()
{
try
{
- if(nodeManager.isAwaitingFailback())
+ if(!restarting && nodeManager.isAwaitingFailback())
{
- log.info("live server wants to restart, killing server");
- nodeManager.killServer();
+ log.info("live server wants to restart, restarting server in backup");
+ restarting = true;
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ stop(true);
+ configuration.setBackup(true);
+ start();
+ }
+ catch (Exception e)
+ {
+ log.info("unable to restart server, please kill and restart manually", e);
+ }
+ }
+ });
+ t.start();
}
}
catch (Exception e)
{
- log.warn("unable to kill server, please kill manually to force failback");
+ //hopefully it will work next call
}
}
- }, 1000l, 1000l, TimeUnit.MILLISECONDS);
+ }
+ scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l, TimeUnit.MILLISECONDS);
}
}
catch (InterruptedException e)
@@ -586,7 +613,7 @@
{
System.out.println("HornetQServerImpl.stop");
}
- remotingService.stop();
+ remotingService.stop(permanently);
synchronized (this)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -123,9 +123,9 @@
}
@Override
- public void killServer()
+ public boolean isBackupLive() throws Exception
{
- //todo
+ return liveLock.availablePermits() == 0;
}
private void releaseBackupNode()
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-26 08:17:00 UTC (rev 9815)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-27 13:43:09 UTC (rev 9816)
@@ -137,8 +137,9 @@
/**
* Disconnect the connection, closing all channels
+ * @param clientFailover
*/
- void disconnect();
+ void disconnect(boolean clientFailover);
/**
* returns true if any data has been received since the last time this method was called.
14 years, 4 months
JBoss hornetq SVN: r9815 - branches/2_2_0_HA_Improvements.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-26 04:17:00 -0400 (Tue, 26 Oct 2010)
New Revision: 9815
Modified:
branches/2_2_0_HA_Improvements/build-hornetq.xml
Log:
removed debug
Modified: branches/2_2_0_HA_Improvements/build-hornetq.xml
===================================================================
--- branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-26 07:50:27 UTC (rev 9814)
+++ branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-26 08:17:00 UTC (rev 9815)
@@ -1672,7 +1672,7 @@
<jvmarg value="-Dhornetq.remoting.netty.port=5446"/>
<jvmarg value="-Djnp.port=1199"/>
<jvmarg value="-Djnp.rmiPort=1198"/>
- <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"/>
+ <!--<jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"/>-->
<arg line="hornetq-beans.xml"/>
<classpath path="${server.config}" />
<classpath refid="jms.standalone.server.classpath"/>
14 years, 4 months
JBoss hornetq SVN: r9814 - in branches/2_2_0_HA_Improvements: src/config/common/schema and 17 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-26 03:50:27 -0400 (Tue, 26 Oct 2010)
New Revision: 9814
Added:
branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/
branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-beans.xml
branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-configuration.xml
branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-jms.xml
branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-users.xml
branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/jndi.properties
branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/logging.properties
Removed:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BackupConnectorConfiguration.java
Modified:
branches/2_2_0_HA_Improvements/build-hornetq.xml
branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd
branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-configuration.xml
branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-jms.xml
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/Configuration.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerAction.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
Log:
added failback - also allow cluster connections to wait indefinately for discovery - removed backup connector config
Modified: branches/2_2_0_HA_Improvements/build-hornetq.xml
===================================================================
--- branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-26 07:50:27 UTC (rev 9814)
@@ -98,6 +98,7 @@
<property name="src.config.standalone.clustered.dir" value="${src.dir}/config/stand-alone/clustered"/>
<property name="src.config.trunk.non-clustered.dir" value="${src.dir}/config/trunk/non-clustered"/>
<property name="src.config.trunk.clustered.dir" value="${src.dir}/config/trunk/clustered"/>
+ <property name="src.config.trunk.clustered.backup.dir" value="${src.dir}/config/trunk/clustered-backup"/>
<property name="src.config.jbossas4.non-clustered.dir" value="${src.dir}/config/jboss-as-4/non-clustered"/>
<property name="src.config.jbossas4.clustered.dir" value="${src.dir}/config/jboss-as-4/clustered"/>
<property name="src.config.jbossas5.non-clustered.dir" value="${src.dir}/config/jboss-as-5/non-clustered"/>
@@ -1658,17 +1659,20 @@
<target name="runBackup1" depends="jar">
<mkdir dir="logs"/>
- <property name="server.config" value="${src.dir}/config/ha-test/backup-1"/>
- <java classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer" fork="false">
+ <property name="server.config" value="${src.config.trunk.clustered.backup.dir}"/>
+ <java classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer" fork="true">
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx2048M"/>
<jvmarg value="-XX:+AggressiveOpts"/>
<jvmarg value="-XX:+UseFastAccessorMethods"/>
<jvmarg value="-Dcom.sun.management.jmxremote"/>
- <jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
+ <jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.clustered.backup.dir}/logging.properties"/>
<jvmarg value="-Djava.library.path=${native.bin.dir}"/>
- <!--<jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>-->
+ <jvmarg value="-Dhornetq.remoting.netty.port=5446"/>
+ <jvmarg value="-Djnp.port=1199"/>
+ <jvmarg value="-Djnp.rmiPort=1198"/>
+ <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"/>
<arg line="hornetq-beans.xml"/>
<classpath path="${server.config}" />
<classpath refid="jms.standalone.server.classpath"/>
@@ -1677,16 +1681,19 @@
<target name="runBackup2" depends="jar">
<mkdir dir="logs"/>
- <property name="server.config" value="${src.dir}/config/ha-test/backup-2"/>
- <java classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer" fork="false">
+ <property name="server.config" value="${src.config.trunk.clustered.backup.dir}"/>
+ <java classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer" fork="true">
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx2048M"/>
<jvmarg value="-XX:+AggressiveOpts"/>
<jvmarg value="-XX:+UseFastAccessorMethods"/>
<jvmarg value="-Dcom.sun.management.jmxremote"/>
- <jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
+ <jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.clustered.backup.dir}/logging.properties"/>
<jvmarg value="-Djava.library.path=${native.bin.dir}"/>
+ <jvmarg value="-Dhornetq.remoting.netty.port=5447"/>
+ <jvmarg value="-Djnp.port=1299"/>
+ <jvmarg value="-Djnp.rmiPort=1298"/>
<!--<jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>-->
<arg line="hornetq-beans.xml"/>
<classpath path="${server.config}" />
@@ -1696,16 +1703,19 @@
<target name="runBackup3" depends="jar">
<mkdir dir="logs"/>
- <property name="server.config" value="${src.dir}/config/ha-test/backup-3"/>
- <java classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer" fork="false">
+ <property name="server.config" value="${src.config.trunk.clustered.backup.dir}"/>
+ <java classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer" fork="true">
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx2048M"/>
<jvmarg value="-XX:+AggressiveOpts"/>
<jvmarg value="-XX:+UseFastAccessorMethods"/>
<jvmarg value="-Dcom.sun.management.jmxremote"/>
- <jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
+ <jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.clustered.backup.dir}/logging.properties"/>
<jvmarg value="-Djava.library.path=${native.bin.dir}"/>
+ <jvmarg value="-Dhornetq.remoting.netty.port=5448"/>
+ <jvmarg value="-Djnp.port=1399"/>
+ <jvmarg value="-Djnp.rmiPort=1398"/>
<!--<jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>-->
<arg line="hornetq-beans.xml"/>
<classpath path="${server.config}" />
@@ -1747,12 +1757,14 @@
<jvmarg value="-Djnp.port=${jnp.port}"/>
<jvmarg value="-Djnp.rmiPort=${jnp.rmiPort}"/>
<jvmarg value="-Dhornetq.data.dir=${hornetq.data.dir}"/>
+ <!-- <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"/>-->
<arg line="hornetq-beans.xml"/>
<classpath path="${src.config.trunk.clustered.dir}" />
<classpath refid="jms.standalone.server.classpath"/>
</java>
</target>
+
<target name="debugServer" depends="jar">
<property name="server.config" value="${src.config.trunk.non-clustered.dir}"/>
<mkdir dir="logs"/>
Modified: branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd 2010-10-26 07:50:27 UTC (rev 9814)
@@ -66,7 +66,9 @@
<xsd:element maxOccurs="1" minOccurs="0" ref="remoting-interceptors">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="backup" type="xsd:boolean">
- </xsd:element>
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="allow-failback" type="xsd:boolean">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="shared-store" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="persist-delivery-count-before-delivery" type="xsd:boolean">
Modified: branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-configuration.xml
===================================================================
--- branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-configuration.xml 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-configuration.xml 2010-10-26 07:50:27 UTC (rev 9814)
@@ -3,6 +3,8 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<clustered>true</clustered>
+
+ <shared-store>true</shared-store>
<journal-min-files>10</journal-min-files>
@@ -12,13 +14,6 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
-
- <connector name="netty-throughput">
- <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
- <param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
- <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
- <param key="batch-delay" value="50"/>
- </connector>
</connectors>
<acceptors>
@@ -27,14 +22,6 @@
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
-
- <acceptor name="netty-throughput">
- <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
- <param key="host" value="${jboss.bind.address:localhost}"/>
- <param key="port" value="${hornetq.remoting.netty.port:5455}"/>
- <param key="batch-delay" value="50"/>
- <param key="direct-deliver" value="false"/>
- </acceptor>
</acceptors>
<broadcast-groups>
@@ -50,7 +37,7 @@
<discovery-group name="dg-group1">
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
- <refresh-timeout>10000</refresh-timeout>
+ <refresh-timeout>60000</refresh-timeout>
</discovery-group>
</discovery-groups>
Modified: branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-jms.xml
===================================================================
--- branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-jms.xml 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-jms.xml 2010-10-26 07:50:27 UTC (rev 9814)
@@ -12,16 +12,6 @@
</entries>
</connection-factory>
- <connection-factory name="NettyThroughputConnectionFactory">
- <connectors>
- <connector-ref connector-name="netty-throughput"/>
- </connectors>
- <entries>
- <entry name="/ThroughputConnectionFactory"/>
- <entry name="/XAThroughputConnectionFactory"/>
- </entries>
- </connection-factory>
-
<queue name="DLQ">
<entry name="/queue/DLQ"/>
</queue>
Copied: branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-beans.xml (from rev 9794, branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-beans.xml)
===================================================================
--- branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-beans.xml (rev 0)
+++ branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-beans.xml 2010-10-26 07:50:27 UTC (rev 9814)
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">${jnp.port:1199}</property>
+ <property name="bindAddress">${jnp.host:localhost}</property>
+ <property name="rmiPort">${jnp.rmiPort:1198}</property>
+ <property name="rmiBindAddress">${jnp.host:localhost}</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration">
+ </bean>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+
+</deployment>
\ No newline at end of file
Copied: branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-configuration.xml (from rev 9794, branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-configuration.xml)
===================================================================
--- branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-configuration.xml (rev 0)
+++ branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-configuration.xml 2010-10-26 07:50:27 UTC (rev 9814)
@@ -0,0 +1,90 @@
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+
+ <clustered>true</clustered>
+
+ <backup>true</backup>
+
+ <allow-failback>true</allow-failback>
+
+ <shared-store>true</shared-store>
+
+ <journal-min-files>10</journal-min-files>
+
+ <connectors>
+ <connector name="netty">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5446}"/>
+ </connector>
+ </connectors>
+
+ <acceptors>
+ <acceptor name="netty">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+ <param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5446}"/>
+ </acceptor>
+ </acceptors>
+
+ <broadcast-groups>
+ <broadcast-group name="bg-group1">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>5000</broadcast-period>
+ <connector-ref>netty</connector-ref>
+ </broadcast-group>
+ </broadcast-groups>
+
+ <discovery-groups>
+ <discovery-group name="dg-group1">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>60000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
+
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <connector-ref>netty</connector-ref>
+ <discovery-group-ref discovery-group-name="dg-group1"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>jms.queue.DLQ</dead-letter-address>
+ <expiry-address>jms.queue.ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <max-size-bytes>10485760</max-size-bytes>
+ <message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>BLOCK</address-full-policy>
+ </address-setting>
+ </address-settings>
+
+</configuration>
Copied: branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-jms.xml (from rev 9794, branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-jms.xml)
===================================================================
--- branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-jms.xml (rev 0)
+++ branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-jms.xml 2010-10-26 07:50:27 UTC (rev 9814)
@@ -0,0 +1,28 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/ConnectionFactory"/>
+ <entry name="/XAConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <queue name="DLQ">
+ <entry name="/queue/DLQ"/>
+ </queue>
+ <queue name="ExpiryQueue">
+ <entry name="/queue/ExpiryQueue"/>
+ </queue>
+ <queue name="ExampleQueue">
+ <entry name="/queue/ExampleQueue"/>
+ </queue>
+ <topic name="ExampleTopic">
+ <entry name="/topic/ExampleTopic"/>
+ </topic>
+
+</configuration>
\ No newline at end of file
Copied: branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-users.xml (from rev 9794, branches/2_2_0_HA_Improvements/src/config/trunk/clustered/hornetq-users.xml)
===================================================================
--- branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-users.xml (rev 0)
+++ branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/hornetq-users.xml 2010-10-26 07:50:27 UTC (rev 9814)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Copied: branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/jndi.properties (from rev 9794, branches/2_2_0_HA_Improvements/src/config/trunk/clustered/jndi.properties)
===================================================================
--- branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/jndi.properties (rev 0)
+++ branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/jndi.properties 2010-10-26 07:50:27 UTC (rev 9814)
@@ -0,0 +1,15 @@
+#
+# Copyright 2009 Red Hat, Inc.
+# Red Hat licenses this file to you under the Apache License, version
+# 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
\ No newline at end of file
Copied: branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/logging.properties (from rev 9794, branches/2_2_0_HA_Improvements/src/config/trunk/clustered/logging.properties)
===================================================================
--- branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/logging.properties (rev 0)
+++ branches/2_2_0_HA_Improvements/src/config/trunk/clustered-backup/logging.properties 2010-10-26 07:50:27 UTC (rev 9814)
@@ -0,0 +1,34 @@
+############################################################
+# Default Logging Configuration File
+#
+# You can use a different file by specifying a filename
+# with the java.util.logging.config.file system property.
+# For example java -Djava.util.logging.config.file=myfile
+############################################################
+
+############################################################
+# Global properties
+############################################################
+
+# "handlers" specifies a comma separated list of log Handler
+# classes. These handlers will be installed during VM startup.
+# Note that these classes must be on the system classpath.
+# By default we only configure a ConsoleHandler, which will only
+# show messages at the INFO and above levels.
+handlers=java.util.logging.ConsoleHandler,java.util.logging.FileHandler
+java.util.logging.ConsoleHandler.formatter=org.hornetq.integration.logging.HornetQLoggerFormatter
+java.util.logging.FileHandler.level=INFO
+java.util.logging.FileHandler.pattern=logs/hornetq.log
+java.util.logging.FileHandler.formatter=org.hornetq.integration.logging.HornetQLoggerFormatter
+# Default global logging level.
+# This specifies which kinds of events are logged across
+# all loggers. For any given facility this global level
+# can be overriden by a facility specific level
+# Note that the ConsoleHandler also has a separate level
+# setting to limit messages printed to the console.
+.level= INFO
+
+############################################################
+# Handler specific properties.
+# Describes specific configuration info for Handlers.
+############################################################
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -399,11 +399,11 @@
return;
}
-
+
+ //we need to stopthe factory from connecting if it is in the middle aof trying to failover before we get the lock
+ causeExit();
synchronized (createSessionLock)
{
- //we need to stopthe factory from connecting if it is in the middle aof trying to failover before we get the lock
- causeExit();
synchronized (failoverLock)
{
// work on a copied set. the session will be removed from sessions when session.close() is called
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -1655,7 +1655,7 @@
private void checkClosed() throws HornetQException
{
- if (closed)
+ if (closed || inClose)
{
throw new HornetQException(HornetQException.OBJECT_CLOSED, "Session is closed");
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -494,9 +494,9 @@
if (initialConnectors == null && discoveryGroup != null)
{
// Wait for an initial broadcast to give us at least one node in the cluster
+ long timeout = clusterConnection?0:discoveryInitialWaitTimeout;
+ boolean ok = discoveryGroup.waitForBroadcast(timeout);
- boolean ok = discoveryGroup.waitForBroadcast(discoveryInitialWaitTimeout);
-
if (!ok)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
@@ -1272,7 +1272,7 @@
log.debug("unable to connect with static connector " + connectors.get(i).initialConnector);
}
}
- if (csf == null)
+ if (csf == null && !closed)
{
throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
}
@@ -1282,7 +1282,7 @@
throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
}
- if (csf == null)
+ if (csf == null && !closed)
{
throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -222,7 +222,7 @@
long toWait = timeout;
- while (!received && toWait > 0)
+ while (!received && (toWait > 0 || timeout == 0))
{
try
{
@@ -232,11 +232,14 @@
{
}
- long now = System.currentTimeMillis();
+ if (timeout != 0)
+ {
+ long now = System.currentTimeMillis();
- toWait -= now - start;
+ toWait -= now - start;
- start = now;
+ start = now;
+ }
}
boolean ret = received;
Deleted: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BackupConnectorConfiguration.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BackupConnectorConfiguration.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/BackupConnectorConfiguration.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -1,64 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.config;
-
-import org.hornetq.api.core.TransportConfiguration;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Created by IntelliJ IDEA.
- * User: andy
- * Date: Sep 2, 2010
- * Time: 11:36:13 AM
- * To change this template use File | Settings | File Templates.
- */
-public class BackupConnectorConfiguration implements Serializable
-{
- private final List<String> staticConnectors;
-
- private final String discoveryGroupName;
-
- private String connector;
-
- public BackupConnectorConfiguration(List<String> staticConnectors, String connector)
- {
- this.staticConnectors = staticConnectors;
- this.discoveryGroupName = null;
- this.connector = connector;
- }
-
- public List<String> getStaticConnectors()
- {
- return staticConnectors;
- }
-
- public String getDiscoveryGroupName()
- {
- return discoveryGroupName;
- }
-
- public BackupConnectorConfiguration(String discoveryGroupName, String connector)
- {
- this.staticConnectors = null;
- this.discoveryGroupName = discoveryGroupName;
- this.connector = connector;
- }
-
- public String getConnector()
- {
- return connector;
- }
-}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/Configuration.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/Configuration.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -51,6 +51,19 @@
void setClustered(boolean clustered);
/**
+ * returns whether a backup will auto die when a live server is failing back
+ * @return
+ */
+ public boolean isAllowAutoFailBack();
+
+ /**
+ * whether a backup will auto die when a live server is failing back
+ *
+ * @param allowAutoFailBack true if allowed
+ */
+ public void setAllowAutoFailBack(boolean allowAutoFailBack);
+
+ /**
* Returns whether delivery count is persisted before messages are delivered to the consumers.
* <br>
* Default value is {@value org.hornetq.core.config.impl.ConfigurationImpl#DEFAULT_PERSIST_DELIVERY_COUNT_BEFORE_DELIVERY}.
@@ -252,18 +265,7 @@
*/
Map<String, TransportConfiguration> getConnectorConfigurations();
-
- /**
- * sets the connectors used to get topology info from for the backup server when shared store is used.
- */
- void setBackupConnectorConfiguration(BackupConnectorConfiguration backupConnectorConfiguration);
-
/**
- * Returns the connectors used to get topology info from for the backup server when shared store is used.
- */
- BackupConnectorConfiguration getBackupConnectorConfiguration();
-
- /**
* Sets the connectors configured for this server.
*/
void setConnectorConfigurations(Map<String, TransportConfiguration> infos);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -46,6 +46,8 @@
public static final boolean DEFAULT_BACKUP = false;
+ public static final boolean DEFAULT_ALLOW_AUTO_FAILBACK = true;
+
public static final boolean DEFAULT_SHARED_STORE = false;
public static final boolean DEFAULT_FILE_DEPLOYMENT_ENABLED = false;
@@ -177,6 +179,8 @@
protected boolean backup = ConfigurationImpl.DEFAULT_BACKUP;
+ protected boolean allowAutoFailBack = ConfigurationImpl.DEFAULT_ALLOW_AUTO_FAILBACK;
+
protected boolean sharedStore = ConfigurationImpl.DEFAULT_SHARED_STORE;
protected boolean fileDeploymentEnabled = ConfigurationImpl.DEFAULT_FILE_DEPLOYMENT_ENABLED;
@@ -231,8 +235,6 @@
protected List<BroadcastGroupConfiguration> broadcastGroupConfigurations = new ArrayList<BroadcastGroupConfiguration>();
- protected BackupConnectorConfiguration backupConnectorConfiguration;
-
protected Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations = new LinkedHashMap<String, DiscoveryGroupConfiguration>();
// Paging related attributes ------------------------------------------------------------
@@ -330,6 +332,16 @@
this.clustered = clustered;
}
+ public boolean isAllowAutoFailBack()
+ {
+ return allowAutoFailBack;
+ }
+
+ public void setAllowAutoFailBack(boolean allowAutoFailBack)
+ {
+ this.allowAutoFailBack = allowAutoFailBack;
+ }
+
public boolean isBackup()
{
return backup;
@@ -468,16 +480,6 @@
return connectorConfigs;
}
- public void setBackupConnectorConfiguration(BackupConnectorConfiguration backupConnectorConfiguration)
- {
- this.backupConnectorConfiguration = backupConnectorConfiguration;
- }
-
- public BackupConnectorConfiguration getBackupConnectorConfiguration()
- {
- return backupConnectorConfiguration;
- }
-
public void setConnectorConfigurations(final Map<String, TransportConfiguration> infos)
{
connectorConfigs = infos;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -27,13 +27,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.config.BridgeConfiguration;
-import org.hornetq.core.config.BroadcastGroupConfiguration;
-import org.hornetq.core.config.ClusterConnectionConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.CoreQueueConfiguration;
-import org.hornetq.core.config.DiscoveryGroupConfiguration;
-import org.hornetq.core.config.DivertConfiguration;
+import org.hornetq.core.config.*;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.config.impl.Validators;
@@ -150,6 +144,8 @@
config.setClustered(XMLConfigurationUtil.getBoolean(e, "clustered", config.isClustered()));
+ config.setAllowAutoFailBack(XMLConfigurationUtil.getBoolean(e, "allow-failback", config.isClustered()));
+
config.setBackup(XMLConfigurationUtil.getBoolean(e, "backup", config.isBackup()));
config.setSharedStore(XMLConfigurationUtil.getBoolean(e, "shared-store", config.isSharedStore()));
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -250,7 +250,7 @@
long start = System.currentTimeMillis();
- while (response == null && toWait > 0)
+ while (!closed && response == null && toWait > 0)
{
try
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -38,6 +38,8 @@
public abstract void stopBackup() throws Exception;
+ public abstract void releaseBackup() throws Exception;
+
private boolean isStarted = false;
protected volatile SimpleString nodeID;
@@ -69,4 +71,8 @@
{
return uuid;
}
+
+ public abstract boolean isAwaitingFailback() throws Exception;
+
+ public abstract void killServer();
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -229,7 +229,7 @@
}
}
- public synchronized void stop() throws Exception
+ public void stop() throws Exception
{
if (!started)
{
@@ -241,34 +241,37 @@
serverLocator.removeClusterTopologyListener(this);
}
- for (MessageFlowRecord record : records.values())
+ synchronized (this)
{
- try
+ for (MessageFlowRecord record : records.values())
{
- record.close();
+ try
+ {
+ record.close();
+ }
+ catch (Exception ignore)
+ {
+ }
}
- catch (Exception ignore)
+
+ if (serverLocator != null)
{
+ serverLocator.close();
}
- }
- if (serverLocator != null)
- {
- serverLocator.close();
- }
+ if (managementService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), name);
+ Notification notification = new Notification(nodeUUID.toString(),
+ NotificationType.CLUSTER_CONNECTION_STOPPED,
+ props);
+ managementService.sendNotification(notification);
+ }
- if (managementService != null)
- {
- TypedProperties props = new TypedProperties();
- props.putSimpleStringProperty(new SimpleString("name"), name);
- Notification notification = new Notification(nodeUUID.toString(),
- NotificationType.CLUSTER_CONNECTION_STOPPED,
- props);
- managementService.sendNotification(notification);
+ started = false;
}
-
- started = false;
}
public boolean isStarted()
@@ -399,7 +402,7 @@
// actually routed to at that address though
queue = server.createQueue(queueName, queueName, null, true, false);
}
-
+
createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
else
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -148,11 +148,6 @@
if (clustered)
{
- BackupConnectorConfiguration connectorConfiguration = configuration.getBackupConnectorConfiguration();
- if(connectorConfiguration != null)
- {
- deployBackupListener(connectorConfiguration);
- }
for (BroadcastGroupConfiguration config : configuration.getBroadcastGroupConfigurations())
{
deployBroadcastGroup(config);
@@ -180,49 +175,7 @@
started = true;
}
- private void deployBackupListener(BackupConnectorConfiguration connectorConfiguration)
- throws Exception
- {
- ServerLocatorInternal locator;
- if (connectorConfiguration.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration groupConfiguration = configuration.getDiscoveryGroupConfigurations().get(connectorConfiguration.getDiscoveryGroupName());
- if (groupConfiguration == null)
- {
- ClusterManagerImpl.log.warn("There is no discovery group deployed with name " + connectorConfiguration.getDiscoveryGroupName() +
- " deployed. This one will not be deployed.");
- return;
- }
- locator = new ServerLocatorImpl(true, groupConfiguration.getGroupAddress(), groupConfiguration.getGroupPort());
- }
- else
- {
- TransportConfiguration[] configs = new TransportConfiguration[connectorConfiguration.getStaticConnectors().size()];
- for (int i = 0, configsLength = configs.length; i < configsLength; i++)
- {
- configs[i] = configuration.getConnectorConfigurations().get(connectorConfiguration.getStaticConnectors().get(i));
- }
- locator = new ServerLocatorImpl(true, configs);
- }
- locator.addClusterTopologyListener(new ClusterTopologyListener()
- {
- public void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
- {
- notifyNodeUp(nodeID, sourceNodeID, connectorPair, last, distance);
- }
-
- public void nodeDown(String nodeID)
- {
- notifyNodeDown(nodeID);
- }
- });
- locator.setNodeID(nodeUUID.toString());
- locator.setReconnectAttempts(-1);
- backupSessionFactory = locator.connect();
- backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), nodeUUID.toString(), true, configuration.getConnectorConfigurations().get(connectorConfiguration.getConnector())));
- }
-
public synchronized void stop() throws Exception
{
if (!started)
@@ -795,8 +748,46 @@
{
clusterConnection.start();
}
+ else
+ {
+ announceBackup(config, connector);
+ }
}
+ private void announceBackup(ClusterConnectionConfiguration config, TransportConfiguration connector) throws Exception
+ {
+ ServerLocatorInternal locator;
+
+ if (config.getStaticConnectors() != null)
+ {
+ TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
+
+ locator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
+ locator.setReconnectAttempts(-1);
+ }
+ else if (config.getDiscoveryGroupName() != null)
+ {
+ DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
+ .get(config.getDiscoveryGroupName());
+
+ if (dg == null)
+ {
+ ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
+ "'. The cluster connection will not be deployed.");
+ }
+
+ locator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg.getGroupAddress(), dg.getGroupPort());
+ locator.setReconnectAttempts(-1);
+ locator.setDiscoveryInitialWaitTimeout(0);
+ }
+ else
+ {
+ return;
+ }
+ backupSessionFactory = locator.connect();
+ backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), nodeUUID.toString(), true, connector));
+ }
+
private Transformer instantiateTransformer(final String transformerClassName)
{
Transformer transformer = null;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -86,13 +86,32 @@
super.stop();
}
+ @Override
+ public boolean isAwaitingFailback() throws Exception
+ {
+ return getState() == FAILINGBACK;
+ }
+ @Override
+ public void killServer()
+ {
+ System.exit(0);
+ }
+
+ @Override
+ public void releaseBackup() throws Exception
+ {
+ releaseBackupLock();
+ }
+
+
public void awaitLiveNode() throws Exception
{
do
{
while (getState() == NOT_STARTED)
{
+ log.info("awaiting live node startup");
Thread.sleep(2000);
}
@@ -103,17 +122,17 @@
if (state == PAUSED)
{
liveLock.release();
+ log.info("awaiting live node restarting");
Thread.sleep(2000);
}
else if (state == FAILINGBACK)
{
liveLock.release();
+ log.info("awaiting live node failing back");
Thread.sleep(2000);
}
else if (state == LIVE)
{
- releaseBackupLock();
-
break;
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -358,6 +358,30 @@
clusterManager.activate();
log.info("Backup Server is now live");
+
+ nodeManager.releaseBackup();
+ if(configuration.isAllowAutoFailBack())
+ {
+ //todo dont hardcode schedule timings
+ scheduledPool.scheduleAtFixedRate(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ if(nodeManager.isAwaitingFailback())
+ {
+ log.info("live server wants to restart, killing server");
+ nodeManager.killServer();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("unable to kill server, please kill manually to force failback");
+ }
+ }
+ }, 1000l, 1000l, TimeUnit.MILLISECONDS);
+ }
}
catch (InterruptedException e)
{
@@ -685,7 +709,7 @@
try
{
- if (!threadPool.awaitTermination(30000, TimeUnit.MILLISECONDS))
+ if (!threadPool.awaitTermination(5000, TimeUnit.MILLISECONDS))
{
HornetQServerImpl.log.warn("Timed out waiting for pool to terminate");
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -69,7 +69,6 @@
}
else if (state == LIVE)
{
- releaseBackupNode();
break;
}
}
@@ -111,6 +110,24 @@
backupLock.release();
}
+ @Override
+ public void releaseBackup()
+ {
+ releaseBackupNode();
+ }
+
+ @Override
+ public boolean isAwaitingFailback() throws Exception
+ {
+ return state == FAILING_BACK;
+ }
+
+ @Override
+ public void killServer()
+ {
+ //todo
+ }
+
private void releaseBackupNode()
{
if(backupLock != null)
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerAction.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerAction.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerAction.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -31,12 +31,14 @@
public final static int PAUSE_LIVE = 3;
public final static int STOP_BACKUP = 4;
public final static int AWAIT_LIVE = 5;
+ public final static int RELEASE_BACKUP = 6;
public final static int HAS_LIVE = 10;
public final static int HAS_BACKUP = 11;
public final static int DOESNT_HAVE_LIVE = 12;
public final static int DOESNT_HAVE_BACKUP = 13;
+
private final int[] work;
boolean hasLiveLock = false;
@@ -77,8 +79,10 @@
case AWAIT_LIVE:
nodeManager.awaitLiveNode();
hasLiveLock = true;
+ break;
+ case RELEASE_BACKUP:
+ nodeManager.releaseBackup();
hasBackupLock = false;
- break;
case HAS_LIVE:
if (!hasLiveLock)
{
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -37,22 +37,22 @@
public void testSimpleLiveAndBackup() throws Exception
{
NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
- NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, PAUSE_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
performWork(live1, backup1);
}
public void testSimpleBackupAndLive() throws Exception
{
NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
- NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, PAUSE_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
performWork(backup1, live1);
}
public void testSimpleLiveAnd2Backups() throws Exception
{
NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
- NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
- NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
performWork(live1, backup1, backup2);
}
@@ -60,24 +60,24 @@
public void testSimple2BackupsAndLive() throws Exception
{
NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
- NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
- NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
performWork(backup1, backup2, live1);
}
public void testSimpleLiveAnd2BackupsPaused() throws Exception
{
NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
- NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
- NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
performWork(live1, backup1, backup2);
}
public void testSimple2BackupsPausedAndLive() throws Exception
{
NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
- NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
- NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
performWork(backup1, backup2, live1);
}
@@ -100,15 +100,15 @@
public void testLiveAndBackupLiveForcesFailback() throws Exception
{
NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE);
- NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE);
performWork(live1, backup1);
}
public void testLiveAnd2BackupsLiveForcesFailback() throws Exception
{
NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE);
- NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE);
- NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, CRASH_LIVE);
+ NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, AWAIT_LIVE, RELEASE_BACKUP, HAS_LIVE, CRASH_LIVE);
performWork(live1, backup1, backup2);
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -1381,8 +1381,6 @@
configuration.getConnectorConfigurations().put(backupConfig.getName(), backupConfig);
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(liveConfig.getName());
- BackupConnectorConfiguration bcc = new BackupConnectorConfiguration(staticConnectors, backupConfig.getName());
- configuration.setBackupConnectorConfiguration(bcc);
HornetQServer server;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -189,6 +189,8 @@
createQueue(0, "queues.testaddress", "queue0", null, false);
addConsumer(0, 0, "queue0", null);
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+
send(0, "queues.testaddress", 10, false, null);
verifyReceiveAll(10, 0);
verifyNotReceive(0);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -35,7 +35,6 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.config.BackupConnectorConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
@@ -138,7 +137,6 @@
backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(liveConnector.getName());
- backupConfig.setBackupConnectorConfiguration(new BackupConnectorConfiguration(staticConnectors, backupConnector.getName()));
backupServer = createBackupServer();
// FIXME
@@ -283,7 +281,7 @@
fail("backup server never started");
}
}
- System.out.println("FailoverTestBase.waitForBackup");
+ System.out.println("FailoverTestBase.waitForNewLive");
}
protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean live)
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -70,19 +70,34 @@
protected abstract boolean isNetty();
- protected int waitForBackup(long seconds, Map<Integer, TestableServer> servers, int... nodes)
+ protected int waitForNewLive(long seconds, boolean waitForNewBackup, Map<Integer, TestableServer> servers, int... nodes)
{
long time = System.currentTimeMillis();
long toWait = seconds * 1000;
+ int newLive = -1;
while (true)
{
for (int node : nodes)
{
TestableServer backupServer = servers.get(node);
- if (backupServer.isInitialised())
+ if (newLive == -1 && backupServer.isInitialised())
{
- return node;
+ newLive = node;
}
+ else if(newLive != -1)
+ {
+ if(waitForNewBackup)
+ {
+ if(node != newLive && servers.get(node).isStarted())
+ {
+ return newLive;
+ }
+ }
+ else
+ {
+ return newLive;
+ }
+ }
}
try
{
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -23,7 +23,6 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.config.BackupConnectorConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.NodeManager;
@@ -85,7 +84,7 @@
System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
servers.get(0).crash(session);
- int liveAfter0 = waitForBackup(10000, servers, 1, 2);
+ int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
ServerLocator locator2 = getServerLocator(3);
locator2.setBlockOnNonDurableSend(true);
@@ -97,7 +96,7 @@
System.setProperty("foo", "bar");
servers.get(3).crash(session2);
- int liveAfter3 = waitForBackup(10000, servers, 4, 5);
+ int liveAfter3 = waitForNewLive(10000, true, servers, 4, 5);
if (liveAfter0 == 2)
{
@@ -140,8 +139,6 @@
staticConnectors.add(liveConnector.getName());
}
TransportConfiguration backupConnector = createTransportConfiguration(isNetty(), false, generateParams(nodeid, isNetty()));
- BackupConnectorConfiguration connectorConfiguration = new BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
- config1.setBackupConnectorConfiguration(connectorConfiguration);
config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
List<String> clusterNodes = new ArrayList<String>();
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -19,7 +19,6 @@
import java.util.Map;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.BackupConnectorConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -166,8 +165,6 @@
staticConnectors.add(liveConnector.getName());
}
TransportConfiguration backupConnector = createTransportConfiguration(true, false, generateParams(nodeid, true));
- BackupConnectorConfiguration connectorConfiguration = new BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
- config1.setBackupConnectorConfiguration(connectorConfiguration);
config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
List<String> clusterNodes = new ArrayList<String>();
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -22,7 +22,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.config.BackupConnectorConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -73,7 +72,7 @@
servers.get(0).crash(session);
session.close();
- backupNode = waitForBackup(5, servers, 1, 2, 3);
+ backupNode = waitForNewLive(5, true, servers, 1, 2, 3);
session = sendAndConsume(sf, false);
System.out.println("restarting live node as a backup");
@@ -93,7 +92,7 @@
System.out.println("failing node " + backupNode);
servers.get(backupNode).crash(session);
session.close();
- backupNode = waitForBackup(5, servers, 0);
+ backupNode = waitForNewLive(5, true, servers, 0);
assertEquals(0, backupNode);
session = sendAndConsume(sf, false);
@@ -149,7 +148,7 @@
servers.put(liveNode, new RemoteProcessHornetQServer(SharedLiveServerConfiguration.class.getName()));
}
- @Override
+
protected void createBackupConfig(int liveNode, int nodeid, boolean createClusterConnections, int... nodes)
{
servers.put(nodeid, new RemoteProcessHornetQServer(backups.get(nodeid)));
@@ -273,8 +272,6 @@
ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
createClusterConnections? staticConnectors:pairs);
config1.getClusterConfigurations().add(ccc1);
- BackupConnectorConfiguration connectorConfiguration = new BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
- config1.setBackupConnectorConfiguration(connectorConfiguration);
config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
System.out.println(config1.getBindingsDirectory());
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -13,20 +13,15 @@
package org.hornetq.tests.integration.cluster.failover;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.config.BackupConnectorConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.NodeManager;
@@ -46,11 +41,11 @@
{
nodeManager = new InVMNodeManager();
createLiveConfig(0);
- createBackupConfig(0, 1,false, 0, 2, 3, 4, 5);
- createBackupConfig(0, 2,false, 0, 1, 3, 4, 5);
- createBackupConfig(0, 3,false, 0, 1, 2, 4, 5);
- createBackupConfig(0, 4, false, 0, 1, 2, 3, 5);
- createBackupConfig(0, 5, false, 0, 1, 2, 3, 4);
+ createBackupConfig(0, 1, 0, 2, 3, 4, 5);
+ createBackupConfig(0, 2, 0, 1, 3, 4, 5);
+ createBackupConfig(0, 3, 0, 1, 2, 4, 5);
+ createBackupConfig(0, 4, 0, 1, 2, 3, 5);
+ createBackupConfig(0, 5, 0, 1, 2, 3, 4);
servers.get(0).start();
servers.get(1).start();
servers.get(2).start();
@@ -69,39 +64,39 @@
ClientSession session = sendAndConsume(sf, true);
System.out.println("failing node 0");
servers.get(0).crash(session);
-
+
session.close();
- backupNode = waitForBackup(5, servers, 1, 2, 3, 4, 5);
+ backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
System.out.println("failing node " + backupNode);
servers.get(backupNode).crash(session);
session.close();
- backupNode = waitForBackup(5, servers, 1, 2, 3, 4, 5);
+ backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
System.out.println("failing node " + backupNode);
servers.get(backupNode).crash(session);
session.close();
- backupNode = waitForBackup(5, servers, 1, 2, 3, 4, 5);
+ backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
System.out.println("failing node " + backupNode);
servers.get(backupNode).crash(session);
session.close();
- backupNode = waitForBackup(5, servers, 1, 2, 3, 4, 5);
+ backupNode = waitForNewLive(5, true, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
System.out.println("failing node " + backupNode);
servers.get(backupNode).crash(session);
session.close();
- backupNode = waitForBackup(5, servers, 1, 2, 3, 4, 5);
+ backupNode = waitForNewLive(5, false, servers, 1, 2, 3, 4, 5);
session = sendAndConsume(sf, false);
session.close();
servers.get(backupNode).stop();
}
- protected void createBackupConfig(int liveNode, int nodeid, boolean createClusterConnections, int... nodes)
+ protected void createBackupConfig(int liveNode, int nodeid, int... nodes)
{
Configuration config1 = super.createDefaultConfig();
config1.getAcceptorConfigurations().clear();
@@ -119,12 +114,8 @@
staticConnectors.add(liveConnector.getName());
}
TransportConfiguration backupConnector = createTransportConfiguration(isNetty(), false, generateParams(nodeid, isNetty()));
- List<String> pairs = null;
- ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
- createClusterConnections? staticConnectors:pairs);
+ ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1, staticConnectors);
config1.getClusterConfigurations().add(ccc1);
- BackupConnectorConfiguration connectorConfiguration = new BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
- config1.setBackupConnectorConfiguration(connectorConfiguration);
config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" + liveNode);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -36,7 +36,8 @@
private Process serverProcess;
private boolean initialised = false;
private CountDownLatch initLatch;
-
+ private boolean started;
+
public RemoteProcessHornetQServer(String configurationClassName)
{
this.configurationClassName = configurationClassName;
@@ -74,6 +75,11 @@
}
}
+ public boolean isStarted()
+ {
+ return started;
+ }
+
public void setInitialised(boolean initialised)
{
this.initialised = initialised;
@@ -146,4 +152,8 @@
// Inner classes -------------------------------------------------
+ public void setStarted(boolean init)
+ {
+ started = true;
+ }
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -64,6 +64,10 @@
{
System.out.println("INIT:" + server.isInitialised());
}
+ else if ("STARTED?".equals(line.trim()))
+ {
+ System.out.println("STARTED:" + server.isStarted());
+ }
else if ("STOP".equals(line.trim()))
{
server.stop();
@@ -124,6 +128,11 @@
boolean init = Boolean.parseBoolean(line.substring("INIT:".length(), line.length()));
remoteProcessHornetQServer.setInitialised(init);
}
+ if (line.startsWith("STARTED:"))
+ {
+ boolean init = Boolean.parseBoolean(line.substring("STARTED:".length(), line.length()));
+ remoteProcessHornetQServer.setStarted(init);
+ }
}
}
catch (Exception e)
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -53,6 +53,11 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isStarted()
+ {
+ return server.isStarted();
+ }
+
public void start() throws Exception
{
server.start();
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -33,4 +33,6 @@
public boolean isInitialised();
void destroy();
+
+ boolean isStarted();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-10-26 00:08:05 UTC (rev 9813)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-10-26 07:50:27 UTC (rev 9814)
@@ -37,7 +37,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.config.BackupConnectorConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -337,7 +336,6 @@
backupConf.getConnectorConfigurations().put(backuptc.getName(), backuptc);
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(livetc.getName());
- backupConf.setBackupConnectorConfiguration(new BackupConnectorConfiguration(staticConnectors, backuptc.getName()) );
backupConf.setSecurityEnabled(false);
backupConf.setJournalType(getDefaultJournalType());
14 years, 4 months