[jboss-cvs] JBoss Messaging SVN: r7853 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732: integration/AS5/etc/xmdesc and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 14 12:15:53 EDT 2009
Author: jbertram at redhat.com
Date: 2009-10-14 12:15:53 -0400 (Wed, 14 Oct 2009)
New Revision: 7853
Added:
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/docs/userguide/en/modules/configuration.xml
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/AS5/etc/xmdesc/ServerPeer-xmbean.xml
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java
Log:
JBPAPP-2917
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/docs/userguide/en/modules/configuration.xml 2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/docs/userguide/en/modules/configuration.xml 2009-10-14 16:15:53 UTC (rev 7853)
@@ -315,6 +315,20 @@
</warning></para>
</section>
+ <section id="conf.serverpeer.attributes.suckerconnectionretryTimes">
+ <title>SuckerConnectionRetryTimes</title>
+
+ <para>Maximum times for a sucker's connection to retry in case of failure.
+ Default is -1 (retry forever)</para>
+ </section>
+
+ <section id="conf.serverpeer.attributes.suckerconnectionretryinterval">
+ <title>SuckerConnectionRetryInterval</title>
+
+ <para>The interval in milliseconds between each retry of the failed sucker's
+ connection. Default is 5000.</para>
+ </section>
+
<section id="conf.serverpeer.attributes.stricttck">
<title>StrictTCK</title>
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/AS5/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/AS5/etc/xmdesc/ServerPeer-xmbean.xml 2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/AS5/etc/xmdesc/ServerPeer-xmbean.xml 2009-10-14 16:15:53 UTC (rev 7853)
@@ -228,8 +228,19 @@
<name>SuckerPassword</name>
<type>java.lang.String</type>
</attribute>
+
+ <attribute access="read-write" getMethod="getSuckerConnectionRetryTimes" setMethod="setSuckerConnectionRetryTimes">
+ <description>Max times for a sucker's connection to retry in case of failure. Default is -1 (retry forever)</description>
+ <name>SuckerConnectionRetryTimes</name>
+ <type>int</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getSuckerConnectionRetryInterval" setMethod="setSuckerConnectionRetryInterval">
+ <description>The interval in milliseconds between each retry of the failed sucker's connection</description>
+ <name>SuckerConnectionRetryInterval</name>
+ <type>int</type>
+ </attribute>
-
<!-- Managed operations -->
<operation>
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml 2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml 2009-10-14 16:15:53 UTC (rev 7853)
@@ -94,6 +94,10 @@
<attribute name="EnableMessageCounters">false</attribute>
+ <attribute name="SuckerConnectionRetryTimes">-1</attribute>
+
+ <attribute name="SuckerConnectionRetryInterval">5000</attribute>
+
<!-- The password used by the message sucker connections to create connections.
THIS SHOULD ALWAYS BE CHANGED AT INSTALL TIME TO SECURE SYSTEM
<attribute name="SuckerPassword"></attribute>
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml 2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml 2009-10-14 16:15:53 UTC (rev 7853)
@@ -223,6 +223,18 @@
<name>SuckerPassword</name>
<type>java.lang.String</type>
</attribute>
+
+ <attribute access="read-write" getMethod="getSuckerConnectionRetryTimes" setMethod="setSuckerConnectionRetryTimes">
+ <description>Max times for a sucker's connection to retry in case of failure. Default is -1 (retry forever)</description>
+ <name>SuckerConnectionRetryTimes</name>
+ <type>int</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getSuckerConnectionRetryInterval" setMethod="setSuckerConnectionRetryInterval">
+ <description>The interval in milliseconds between each retry of the failed sucker's connection</description>
+ <name>SuckerConnectionRetryInterval</name>
+ <type>int</type>
+ </attribute>
<!-- Managed operations -->
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java 2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java 2009-10-14 16:15:53 UTC (rev 7853)
@@ -169,6 +169,10 @@
// For generating unique Channel ID for cluster without a shared DB
private long serverStartTime;
+ private int suckerConnectionRetryTimes = -1;
+
+ private int suckerConnectionRetryInterval = 5000;
+
// wired components
private DestinationJNDIMapper destinationJNDIMapper;
@@ -301,7 +305,7 @@
{
clusterConnectionManager = new ClusterConnectionManager(serverPeerID,
clusterPullConnectionFactoryName, defaultPreserveOrdering,
- SecurityStore.SUCKER_USER, suckerPassword);
+ SecurityStore.SUCKER_USER, suckerPassword, suckerConnectionRetryTimes, suckerConnectionRetryInterval);
clusterNotifier.registerListener(clusterConnectionManager);
}
@@ -1711,7 +1715,27 @@
return true;
}
+ public void setSuckerConnectionRetryTimes(int suckerConnectionRetryTimes)
+ {
+ this.suckerConnectionRetryTimes = suckerConnectionRetryTimes;
+ }
+ public int getSuckerConnectionRetryTimes()
+ {
+ return suckerConnectionRetryTimes;
+ }
+
+ public void setSuckerConnectionRetryInterval(int suckerConnectionRetryInterval)
+ {
+ this.suckerConnectionRetryInterval = suckerConnectionRetryInterval;
+ }
+
+ public int getSuckerConnectionRetryInterval()
+ {
+ return suckerConnectionRetryInterval;
+ }
+
+
// Inner classes --------------------------------------------------------------------------------
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2009-10-14 16:15:53 UTC (rev 7853)
@@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
@@ -62,7 +63,7 @@
private static final long CLOSE_TIMEOUT = 2000;
- private boolean trace = log.isTraceEnabled();
+ private static boolean trace = log.isTraceEnabled();
private Map connections;
@@ -81,11 +82,17 @@
private String suckerUser;
private String suckerPassword;
+
+ private int maxRetry;
+
+ private int retryInterval; //5 sec
public ClusterConnectionManager(int nodeID,
String connectionFactoryUniqueName, boolean preserveOrdering,
String suckerUser,
- String suckerPassword)
+ String suckerPassword,
+ int maxRetry,
+ int retryInterval)
{
connections = new HashMap();
@@ -99,6 +106,10 @@
this.suckerPassword = suckerPassword;
+ this.maxRetry = maxRetry;
+
+ this.retryInterval = retryInterval;
+
if (trace) { log.trace("Created " + this); }
}
@@ -347,7 +358,7 @@
}
}
else if (notification.type == ClusterNotification.TYPE_UNBIND)
- {
+ {
String queueName = (String)notification.data;
if (notification.nodeID == this.nodeID)
@@ -355,7 +366,6 @@
//Local unbind
//We need to remove any suckers corresponding to remote nodes
-
removeAllSuckers(queueName);
}
else
@@ -363,7 +373,6 @@
//Remote unbind
//We need to remove the sucker corresponding to the remote queue
-
removeSucker(queueName, notification.nodeID);
}
}
@@ -400,7 +409,8 @@
{
try
{
- ConnectionInfo info = new ConnectionInfo(new JBossConnectionFactory(delegate), suckerUser, suckerPassword);
+ ConnectionInfo info = new ConnectionInfo(new JBossConnectionFactory(delegate), suckerUser,
+ suckerPassword, nid == this.nodeID, maxRetry, retryInterval);
log.trace(this + " created connection info " + info);
@@ -607,23 +617,30 @@
}
}
- class ConnectionInfo
- {
- private JBossConnectionFactory connectionFactory;
+ public static class ConnectionInfo implements ExceptionListener
+ {
+ protected JBossConnectionFactory connectionFactory;
- private JBossConnection connection;
+ protected JBossConnection connection;
- private Session session;
+ protected Session session;
- private Map suckers;
+ protected Map suckers;
- private boolean started;
+ protected boolean started;
private String suckerUser;
private String suckerPassword;
- ConnectionInfo(JBossConnectionFactory connectionFactory, String suckerUser, String suckerPassword) throws Exception
+ protected boolean isLocal;
+
+ private int maxRetry;
+
+ private int retryInterval;
+
+ public ConnectionInfo(JBossConnectionFactory connectionFactory, String suckerUser,
+ String suckerPassword, boolean isLocal, int maxRetry, int retryInterval) throws Exception
{
this.connectionFactory = connectionFactory;
@@ -632,9 +649,15 @@
this.suckerUser = suckerUser;
this.suckerPassword = suckerPassword;
+
+ this.isLocal = isLocal;
+
+ this.maxRetry = maxRetry;
+
+ this.retryInterval = retryInterval;
}
- synchronized void start() throws Exception
+ protected synchronized void start() throws Exception
{
if (started)
{
@@ -645,11 +668,17 @@
{
connection = (JBossConnection)connectionFactory.createConnection(suckerUser, suckerPassword);
+ //local connection doesn't need listener.
+ if (!isLocal)
+ {
+ connection.setExceptionListener(this);
+ }
+
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
}
- connection.start();
-
+ connection.start();
+
started = true;
}
@@ -691,7 +720,7 @@
suckers.clear();
}
- synchronized void close()
+ protected synchronized void close()
{
closeAllSuckers();
@@ -726,12 +755,12 @@
started = false;
}
- synchronized boolean hasSucker(String queueName)
+ protected synchronized boolean hasSucker(String queueName)
{
return suckers.containsKey(queueName);
}
- synchronized void addSucker(MessageSucker sucker)
+ protected synchronized void addSucker(MessageSucker sucker)
{
if (suckers.containsKey(sucker.getQueueName()))
{
@@ -746,6 +775,115 @@
MessageSucker sucker = (MessageSucker)suckers.remove(queueName);
return sucker;
- }
- }
+ }
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+ //on exception, try to recreate all suckers.
+ public void onException(JMSException e)
+ {
+ log.warn("Connection failure detected. Clean up and retry connection. maxRetry: " + maxRetry + " retryInterval: " + retryInterval);
+ cleanupConnection();
+ retryConnection();
+ }
+
+ //first stop all the suckers
+ //then try to close the connection
+ protected synchronized void cleanupConnection()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ Iterator iter = suckers.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageSucker sucker = (MessageSucker)iter.next();
+
+ sucker.suspend();
+ }
+
+ Callable callable = new Callable() { public Object call()
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException ignore)
+ {
+ }
+ return null;
+ } };
+
+ Callable timedCallable = new TimedCallable(callable, CLOSE_TIMEOUT);
+
+ try
+ {
+ timedCallable.call();
+ }
+ catch (Throwable t)
+ {
+ //Ignore - the server might have already closed - so this is ok
+ }
+
+ connection = null;
+
+ started = false;
+ }
+
+ protected synchronized int retryConnection()
+ {
+ int retryCount = 0;
+
+ while (((maxRetry == -1) || (retryCount < maxRetry)) && (suckers.size() > 0))
+ {
+ try
+ {
+ start();
+ break;
+ }
+ catch (Exception e)
+ {
+ retryCount++;
+ if (trace)
+ {
+ log.trace("Retrying ConnectionInfo " + this + " failed, retry count: " + retryCount, e);
+ }
+ try
+ {
+ this.wait(retryInterval);
+ }
+ catch(InterruptedException ite)
+ {
+ }
+ }
+ }
+
+ if (!started)
+ {
+ log.error("Retrying ConnectionInfo " + this + " failed after maxmum retry: " + retryCount);
+ return retryCount;
+ }
+
+ //now resume the suckers
+ Iterator iter = suckers.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageSucker sucker = (MessageSucker)iter.next();
+
+ try
+ {
+ sucker.resume(session);
+ }
+ catch (JMSException e)
+ {
+ log.warn("Error resuming sucker " + sucker, e);
+ }
+ }
+
+ return retryCount;
+ }
+ }
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2009-10-14 16:15:53 UTC (rev 7853)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.impl.clusterconnection;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -77,14 +78,16 @@
private long sourceChannelID;
- private JBossQueue jbq;
+ protected JBossQueue jbq;
+ private boolean suspended = false;
+
public String toString()
{
return "MessageSucker:" + System.identityHashCode(this) + " queue:" + localQueue.getName();
}
- MessageSucker(Queue localQueue, Session sourceSession, Session localSession,
+ protected MessageSucker(Queue localQueue, Session sourceSession, Session localSession,
boolean preserveOrdering, long sourceChannelID)
{
if (trace) { log.trace("Creating message sucker, localQueue:" + localQueue + " preserveOrdering:" + preserveOrdering); }
@@ -102,7 +105,7 @@
this.sourceChannelID = sourceChannelID;
}
- synchronized void start() throws Exception
+ protected synchronized void start() throws Exception
{
if (started)
{
@@ -140,56 +143,142 @@
if (trace) { log.trace(this + " Registered sucker"); }
}
- synchronized void stop()
- {
- if (!started)
- {
- return;
- }
-
- setConsuming(false);
-
- localQueue.unregisterSucker(this);
-
- try
- {
- consumer.closing(-1);
- }
- catch (Throwable t)
- {
- // Ignore
- }
- try
- {
- consumer.close();
- }
- catch (Throwable t)
- {
- //Ignore
- }
-
- try
- {
- producer.close();
- }
- catch (Throwable t)
- {
- //Ignore
- }
+ protected void stop()
+ {
+ localQueue.unregisterSucker(this);
- sourceSession = null;
-
- localSession = null;
-
- consumer = null;
-
- clientConsumer = null;
-
- producer = null;
-
- started = false;
- }
-
+ synchronized (this)
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ setConsuming(false);
+
+ try
+ {
+ consumer.closing(-1);
+ }
+ catch (Throwable t)
+ {
+ // Ignore
+ }
+ try
+ {
+ consumer.close();
+ }
+ catch (Throwable t)
+ {
+ // Ignore
+ }
+
+ try
+ {
+ producer.close();
+ }
+ catch (Throwable t)
+ {
+ // Ignore
+ }
+
+ sourceSession = null;
+
+ localSession = null;
+
+ consumer = null;
+
+ clientConsumer = null;
+
+ producer = null;
+
+ started = false;
+ }
+ }
+
+ //the suspend stops the sucker's receiving end but doesn't unregister the sucker.
+ //we only suspend the consumer side.
+ public synchronized void suspend()
+ {
+ if (!started || suspended)
+ {
+ return;
+ }
+
+ boolean oldConsuming = consuming;
+
+ setConsuming(false);
+
+ consuming = oldConsuming;
+
+ suspended = true;
+
+ try
+ {
+ consumer.closing(-1);
+ }
+ catch (Throwable t)
+ {
+ // Ignore
+ }
+ try
+ {
+ consumer.close();
+ }
+ catch (Throwable t)
+ {
+ // Ignore
+ }
+
+ sourceSession = null;
+
+ consumer = null;
+
+ clientConsumer = null;
+ }
+
+
+ public synchronized void resume(Session srcSession) throws JMSException
+ {
+ if (!suspended)
+ {
+ return;
+ }
+
+ sourceSession = srcSession;
+
+ SessionDelegate sourcedel = ((JBossSession)sourceSession).getDelegate();
+
+ consumer = (ClientConsumerDelegate)sourcedel.createConsumerDelegate(jbq, null, false, null, false, false);
+
+ clientConsumer = ((ConsumerState)consumer.getState()).getClientConsumer();
+
+ try
+ {
+ if (consuming)
+ {
+ if (trace) { log.trace(this + " resuming client consumer"); }
+
+ clientConsumer.resume();
+ }
+ else
+ {
+ if (trace) { log.trace(this + " pausing client consumer"); }
+
+ clientConsumer.pause();
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ //We ignore the exception - we might fail to change rate when stoping a sucker for a dead server
+ }
+
+ consumer.setMessageListener(this);
+
+ suspended = false;
+ }
+
public String getQueueName()
{
return this.localQueue.getName();
@@ -199,6 +288,14 @@
{
if (trace) { log.trace(this + " setConsuming " + consume); }
+ //for supended, we set the consuming flag and do nothing.
+ //later on resume, we force the sucker to be the last set consuming state.
+ if (suspended)
+ {
+ consuming = consume;
+ return;
+ }
+
try
{
if (consume && !consuming)
Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java (from rev 7808, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java 2009-10-14 16:15:53 UTC (rev 7853)
@@ -0,0 +1,625 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.destination.JBossQueue;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Delivery;
+import org.jboss.messaging.core.contract.DeliveryObserver;
+import org.jboss.messaging.core.contract.Distributor;
+import org.jboss.messaging.core.contract.Filter;
+import org.jboss.messaging.core.contract.MessageReference;
+import org.jboss.messaging.core.impl.clusterconnection.MessageSucker;
+import org.jboss.messaging.core.impl.clusterconnection.ClusterConnectionManager.ConnectionInfo;
+import org.jboss.messaging.core.impl.tx.Transaction;
+
+/**
+ * A FakeClusterConnectionManager
+ *
+ * Used to test Message Sucker. We use this one to get rid of the links
+ * between ClusterConnectionManager and the ServerPeer. We only need to
+ * test the 'Client' aspect of message sucker, to see if it can correctly
+ * 'suck' messages. We don't care about the sending aspect of the suckers.
+ * That makes the test easier.
+ *
+ * @author howard
+ *
+ * Created Sep 14, 2009 1:38:09 PM
+ *
+ */
+public class FakeClusterConnectionManager
+{
+
+ public static final long CLOSE_TIMEOUT = 2000;
+
+ private Map connections;
+
+ private boolean started;
+
+ private int remoteID;
+
+ private int thisID;
+
+ private JBossConnectionFactory remoteFactory;
+
+ private String suckerUser;
+
+ private String suckerPassword;
+
+ private int maxRetry;
+
+ private int retryInterval;
+
+ public FakeClusterConnectionManager(int remoteID,
+ JBossConnectionFactory theFactory,
+ String suckerUser,
+ String suckerPassword,
+ int maxRetry,
+ int retryInterval,
+ int thisID)
+ {
+ connections = new HashMap();
+
+ this.remoteID = remoteID;
+
+ this.remoteFactory = theFactory;
+
+ this.suckerUser = suckerUser;
+
+ this.suckerPassword = suckerPassword;
+
+ this.maxRetry = maxRetry;
+
+ this.retryInterval = retryInterval;
+
+ this.thisID = thisID;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ started = true;
+ }
+
+ public void createConnectionInfo(boolean updateJMSObject) throws Exception
+ {
+ FakeConnectionInfo info = new FakeConnectionInfo(remoteFactory, suckerUser, suckerPassword, remoteID == thisID, maxRetry, retryInterval, updateJMSObject);
+ connections.put(remoteID, info);
+ info.start();
+ }
+
+ public int getRetryTimes(int node)
+ {
+ FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+ return info.getRetryTimes();
+ }
+
+ public String waitForReconnectionOK(int node)
+ {
+ FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+ return info.waitForReconnectionOK();
+ }
+
+ public void resetFactory(int node, JBossConnectionFactory fact)
+ {
+ FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+ info.resetFactory(fact);
+ }
+
+ public void updateQueueInSucker(int node, Queue queue) throws JMSException
+ {
+ FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+ info.updateQueueInSucker(queue);
+ }
+
+ public String checkMessageSucked(int node, TextMessage[] messages) throws JMSException
+ {
+ FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+ return info.checkMessageSucked(messages);
+ }
+
+ public String checkMessageNotSucked(int node) throws JMSException
+ {
+ FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+ return info.checkMessageNotSucked();
+ }
+
+ public String checkConnectionFailureDetected(int node)
+ {
+ FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+ return info.checkConnectionFailureDetected();
+ }
+
+ public void createSucker(Queue queue, int nid, boolean beginSuck) throws Exception
+ {
+ FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(nid));
+
+ if (!info.hasSucker(queue.getQueueName()))
+ {
+ FakeMessageSucker sucker = new FakeMessageSucker(new FakeCoreQueue(queue), info.getSession(), 0, info.suckBuffer);
+
+ info.addSucker(sucker);
+
+ sucker.start();
+
+ sucker.setConsuming(beginSuck);
+ }
+ }
+
+ class FakeCoreQueue implements org.jboss.messaging.core.contract.Queue
+ {
+
+ private String queueName;
+
+ public FakeCoreQueue(Queue queue) throws JMSException
+ {
+ queueName = queue.getQueueName();
+ }
+
+ public String getName()
+ {
+ return queueName;
+ }
+
+ public void addAllToRecoveryArea(int nodeID, Map ids)
+ {
+ }
+
+ public void addToRecoveryArea(int nodeID, long messageID, String sessionID)
+ {
+ }
+
+ public int getDownCacheSize()
+ {
+ return 0;
+ }
+
+ public Filter getFilter()
+ {
+ return null;
+ }
+
+ public int getFullSize()
+ {
+ return 0;
+ }
+
+ public Distributor getLocalDistributor()
+ {
+ return null;
+ }
+
+ public int getNodeID()
+ {
+ return 0;
+ }
+
+ public int getPageSize()
+ {
+ return 0;
+ }
+
+ public long getRecoverDeliveriesTimeout()
+ {
+ return 0;
+ }
+
+ public Map getRecoveryArea()
+ {
+ return null;
+ }
+
+ public int getRecoveryMapSize()
+ {
+ return 0;
+ }
+
+ public Distributor getRemoteDistributor()
+ {
+ return null;
+ }
+
+ public Delivery handleMove(MessageReference ref, long sourceChannelID)
+ {
+ return null;
+ }
+
+ public boolean isClustered()
+ {
+ return false;
+ }
+
+ public void mergeIn(long channelID, int nodeID) throws Exception
+ {
+ }
+
+ public List recoverDeliveries(List messageIds)
+ {
+ return null;
+ }
+
+ public void registerSucker(MessageSucker sucker)
+ {
+ }
+
+ public void removeAllFromRecoveryArea(int nodeID)
+ {
+ }
+
+ public void removeFromRecoveryArea(int nodeID, long messageID)
+ {
+ }
+
+ public void removeStrandedReferences(String sessionID)
+ {
+ }
+
+ public void setPagingParams(int fullSize, int pageSize, int downCacheSize)
+ {
+ }
+
+ public boolean unregisterSucker(MessageSucker sucker)
+ {
+ return false;
+ }
+
+ public void activate()
+ {
+ }
+
+ public List browse(Filter filter)
+ {
+ return null;
+ }
+
+ public void close()
+ {
+ }
+
+ public void deactivate()
+ {
+ }
+
+ public void deliver()
+ {
+ }
+
+ public long getChannelID()
+ {
+ return 0;
+ }
+
+ public int getDeliveringCount()
+ {
+ return 0;
+ }
+
+ public int getMaxSize()
+ {
+ return 0;
+ }
+
+ public int getMessageCount()
+ {
+ return 0;
+ }
+
+ public int getMessagesAdded()
+ {
+ return 0;
+ }
+
+ public int getScheduledCount()
+ {
+ return 0;
+ }
+
+ public boolean isActive()
+ {
+ return false;
+ }
+
+ public boolean isRecoverable()
+ {
+ return false;
+ }
+
+ public void load() throws Exception
+ {
+ }
+
+ public void removeAllReferences() throws Throwable
+ {
+ }
+
+ public void setMaxSize(int newSize)
+ {
+ }
+
+ public void unload() throws Exception
+ {
+ }
+
+ public void acknowledge(Delivery d, Transaction tx) throws Throwable
+ {
+ }
+
+ public void acknowledgeNoPersist(Delivery d) throws Throwable
+ {
+ }
+
+ public void cancel(Delivery d) throws Throwable
+ {
+ }
+
+ public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+ {
+ return null;
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+ //Note: This currently is suitable for test of only one sucker!
+ class FakeConnectionInfo extends ConnectionInfo
+ {
+ private ArrayList<TextMessage> suckBuffer = new ArrayList<TextMessage>();
+
+ private Object connFailureLock = new Object();
+
+ private boolean connFailed = false;
+
+ private Object reconnLock = new Object();
+
+ private boolean reconnOK = false;
+
+ private boolean updateJMSObjects = true;
+
+ private int lastRetryTimes;
+
+ FakeConnectionInfo(JBossConnectionFactory factory, String suckerUser, String suckerPassword,
+ boolean isLocal, int maxRetry, int retryInterval, boolean updateJMS) throws Exception
+ {
+ super(factory, suckerUser, suckerPassword, isLocal, maxRetry, retryInterval);
+ updateJMSObjects = updateJMS;
+ }
+
+ public int getRetryTimes()
+ {
+ return lastRetryTimes;
+ }
+
+ public Session getSession()
+ {
+ return super.session;
+ }
+
+ public void updateQueueInSucker(Queue queue) throws JMSException
+ {
+ Iterator iter = suckers.values().iterator();
+
+ while (iter.hasNext())
+ {
+ FakeMessageSucker sucker = (FakeMessageSucker)iter.next();
+ sucker.updateQueue(new JBossQueue(queue.getQueueName()));
+ }
+
+ }
+
+ public String waitForReconnectionOK()
+ {
+ synchronized(reconnLock)
+ {
+ if (!reconnOK)
+ {
+ try
+ {
+ reconnLock.wait(20000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ if (reconnOK)
+ {
+ reconnOK = false;
+ return null;
+ }
+ return "Reconnection failed after 10 seconds";
+ }
+ }
+
+ public String checkConnectionFailureDetected()
+ {
+ synchronized (connFailureLock)
+ {
+ if (connFailed)
+ {
+ connFailed = false;
+ return null;
+ }
+ try
+ {
+ connFailureLock.wait(10000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ if (connFailed)
+ {
+ return null;
+ }
+ else
+ {
+ return "Connection Failure not detected with in 10 sec";
+ }
+ }
+ }
+
+ public String checkMessageSucked(TextMessage[] messages) throws JMSException
+ {
+ String result = null;
+ if (messages.length != suckBuffer.size())
+ {
+ result = "Number of sucked messages not right, expected: " + messages.length + " but was: " + suckBuffer.size();
+ return result;
+ }
+ for (int i = 0; i < messages.length; i++)
+ {
+ TextMessage msg = suckBuffer.get(i);
+ if (!messages[i].getText().equals(msg.getText()))
+ {
+ result = "Message sucked not right, expected: " + messages[i].getText() + " but was: " + msg.getText();
+ break;
+ }
+ }
+ suckBuffer.clear();
+ return result;
+ }
+
+ public String checkMessageNotSucked() throws JMSException
+ {
+ String result = null;
+ if (suckBuffer.size() > 0)
+ {
+ result = "Number of sucked messages not right, expected: 0 but was: " + suckBuffer.size();
+ return result;
+ }
+ suckBuffer.clear();
+ return result;
+ }
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+ //on exception, try to recreate all suckers.
+ public void onException(JMSException e)
+ {
+ synchronized(connFailureLock)
+ {
+ this.connFailed = true;
+ connFailureLock.notify();
+ }
+ super.onException(e);
+ }
+
+ public synchronized void resetFactory(JBossConnectionFactory newFact)
+ {
+ connectionFactory = newFact;
+ this.notify();
+ }
+
+ protected synchronized void cleanupConnection()
+ {
+ if (updateJMSObjects)
+ {
+ connectionFactory = null;
+ }
+ super.cleanupConnection();
+ }
+
+ protected synchronized int retryConnection()
+ {
+ //regain factory: this is not the true behavior of reconnection.
+ //in real case, the factory should be expected to be accessible. If not, that means the node is crashed or shutdown
+ //in which case reconnection no longer apply. New connection info will be created instead.
+ while (connectionFactory == null)
+ {
+ try
+ {
+ this.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ lastRetryTimes = super.retryConnection();
+
+ synchronized(reconnLock)
+ {
+ reconnOK = super.started;
+ reconnLock.notify();
+ }
+
+ return lastRetryTimes;
+ }
+
+ public void close()
+ {
+ super.close();
+ }
+
+ public void start() throws Exception
+ {
+ super.start();
+ }
+
+ public boolean hasSucker(String queueName)
+ {
+ return super.hasSucker(queueName);
+ }
+
+ public void addSucker(MessageSucker sucker)
+ {
+ super.addSucker(sucker);
+ }
+ }
+
+ public void stop()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ Iterator iter = connections.values().iterator();
+
+ while (iter.hasNext())
+ {
+ FakeConnectionInfo info = (FakeConnectionInfo)iter.next();
+
+ info.close();
+ }
+
+ connections.clear();
+
+ started = false;
+ }
+
+
+}
Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java (from rev 7808, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/FakeMessageSucker.java 2009-10-14 16:15:53 UTC (rev 7853)
@@ -0,0 +1,116 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import java.util.List;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.destination.JBossQueue;
+import org.jboss.messaging.core.contract.Queue;
+import org.jboss.messaging.core.impl.clusterconnection.MessageSucker;
+
+/**
+ * A FakeMessageSucker
+ *
+ * @author howard
+ *
+ * Created Sep 14, 2009 2:31:27 PM
+ *
+ *
+ */
+public class FakeMessageSucker extends MessageSucker
+{
+ private List<TextMessage> buffer;
+
+ private Object queueUpdateLock = new Object();
+
+ private boolean queueNotUpdated = true;
+
+ FakeMessageSucker(Queue localQueue, Session sourceSession, long sourceChannelID, List<TextMessage> buffer)
+ {
+ super(localQueue, sourceSession, sourceSession, true, sourceChannelID);
+ this.buffer = buffer;
+ }
+
+ // Public --------------------------------------------------------
+ public void start() throws Exception
+ {
+ super.start();
+ }
+
+ public void stop()
+ {
+ super.stop();
+ }
+
+ public void updateQueue(JBossQueue q)
+ {
+ synchronized (queueUpdateLock)
+ {
+ this.jbq = q;
+ queueNotUpdated = false;
+ queueUpdateLock.notify();
+ }
+ }
+
+ public void resume(Session session) throws JMSException
+ {
+ synchronized (queueUpdateLock)
+ {
+ if (queueNotUpdated)
+ {
+ try
+ {
+ queueUpdateLock.wait(20000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ super.resume(session);
+ }
+
+ public void onMessage(Message msg)
+ {
+
+ try
+ {
+ buffer.add((TextMessage)msg);
+ msg.acknowledge();
+ }
+ catch (JMSException e)
+ {
+ }
+ }
+
+ public void resetQueue(JBossQueue newQueue)
+ {
+ jbq = newQueue;
+ }
+
+}
Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java (from rev 7808, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java 2009-10-14 16:15:53 UTC (rev 7853)
@@ -0,0 +1,305 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.server.SecurityStore;
+import org.jboss.jms.server.security.SecurityMetadataStore;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A MessageSuckerTest
+ *
+ * @author howard
+ *
+ * Created Sep 14, 2009 12:23:30 PM
+ *
+ *
+ */
+public class MessageSuckerTest extends ClusteringTestBase
+{
+
+ public MessageSuckerTest(String name)
+ {
+ super(name);
+ }
+
+ // https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+ // Initiate a Fake ClusterConnectionManager to connection to a node.
+ // send messages to the node and check if messages are sucked.
+ // then kill the node and restart the node, check if the messages still
+ // can be sucked.
+ public void testMessageSuckerReconnection() throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ try
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory)ic[0].lookup("/ConnectionFactory");
+
+ FakeClusterConnectionManager clusterConnMgr = new FakeClusterConnectionManager(0,
+ factory,
+ SecurityStore.SUCKER_USER,
+ SecurityMetadataStore.DEFAULT_SUCKER_USER_PASSWORD,
+ -1,
+ 2000,
+ 5);
+ clusterConnMgr.start();
+
+ clusterConnMgr.createConnectionInfo(true);
+
+ clusterConnMgr.createSucker(queue[0], 0, true);
+
+ conn1 = createConnectionOnServer(factory, 0);
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer1 = sess1.createProducer(queue[0]);
+
+ TextMessage[] messages = new TextMessage[1];
+
+ for (int i = 0; i < messages.length; i++)
+ {
+ messages[i] = sess1.createTextMessage("suck-msg" + i);
+ producer1.send(messages[i]);
+ }
+
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+ // ignore.
+ }
+
+ String result = clusterConnMgr.checkMessageSucked(0, messages);
+ assertNull(result, result);
+
+ // Now kill Node 0
+ ServerManagement.stop(0);
+
+ // Sucker connection should receive notification
+ clusterConnMgr.checkConnectionFailureDetected(0);
+ assertNull(result, result);
+
+ //sleep for 10 sec
+ try
+ {
+ Thread.sleep(4000);
+ }
+ catch(InterruptedException e)
+ {
+ //ignore
+ }
+
+ // Now startup Node 0 again, here we clean up the DB as the
+ // message last sent won't be removed because it is sucked, not really received.
+ ServerManagement.start(0, "all", true);
+ ServerManagement.deployQueue("testDistributedQueue", 0);
+
+ queue[0] = (Queue)ic[0].lookup("queue/testDistributedQueue");
+
+ factory = (JBossConnectionFactory)ic[0].lookup("/ConnectionFactory");
+
+ // to simulate the real case, we need to restore the connection factory and the queue
+ // in reality, they don't need to update as the node aren't really dead, so those
+ // objects are supposed to be valid.
+ clusterConnMgr.resetFactory(0, factory);
+ clusterConnMgr.updateQueueInSucker(0, queue[0]);
+
+ result = clusterConnMgr.waitForReconnectionOK(0);
+ assertNull(result, result);
+
+ // now send 1 more messages
+ conn2 = createConnectionOnServer(factory, 0);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer2 = sess2.createProducer(queue[0]);
+
+ for (int i = 0; i < messages.length; i++)
+ {
+ messages[i] = sess2.createTextMessage("new-suck-msg" + i);
+ producer2.send(messages[i]);
+ }
+
+ Thread.sleep(2000);
+
+ // should be sucked.
+ result = clusterConnMgr.checkMessageSucked(0, messages);
+ assertNull(result, result);
+
+ clusterConnMgr.stop();
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ // https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+ // Initiate a Fake ClusterConnectionManager to connection to a node.
+ // send messages to the node and check if messages are sucked. Set retry times to 2 and retryInterval 1000.
+ // then kill the node and restart the node, check if the messages cannot be sucked
+ public void testMessageSuckerReconnection2() throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ try
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory)ic[0].lookup("/ConnectionFactory");
+
+ FakeClusterConnectionManager clusterConnMgr = new FakeClusterConnectionManager(0,
+ factory,
+ SecurityStore.SUCKER_USER,
+ SecurityMetadataStore.DEFAULT_SUCKER_USER_PASSWORD,
+ 2,
+ 1000,
+ 5);
+ clusterConnMgr.start();
+
+ clusterConnMgr.createConnectionInfo(false);
+
+ clusterConnMgr.createSucker(queue[0], 0, true);
+
+ conn1 = createConnectionOnServer(factory, 0);
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer1 = sess1.createProducer(queue[0]);
+
+ TextMessage[] messages = new TextMessage[1];
+
+ for (int i = 0; i < messages.length; i++)
+ {
+ messages[i] = sess1.createTextMessage("suck-msg" + i);
+ producer1.send(messages[i]);
+ }
+
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+ // ignore.
+ }
+
+ String result = clusterConnMgr.checkMessageSucked(0, messages);
+ assertNull(result, result);
+
+ // Now kill Node 0
+ ServerManagement.stop(0);
+
+ // Sucker connection should receive notification
+ clusterConnMgr.checkConnectionFailureDetected(0);
+ assertNull(result, result);
+
+ //sleep for 4 sec to let the retry fail
+ try
+ {
+ Thread.sleep(4000);
+ }
+ catch(InterruptedException e)
+ {
+ //ignore
+ }
+
+ // Now startup Node 0 again, here we clean up the DB as the
+ // message last sent won't be removed because it is sucked, not really received.
+ ServerManagement.start(0, "all", true);
+ ServerManagement.deployQueue("testDistributedQueue", 0);
+
+ queue[0] = (Queue)ic[0].lookup("queue/testDistributedQueue");
+
+ factory = (JBossConnectionFactory)ic[0].lookup("/ConnectionFactory");
+
+ result = clusterConnMgr.waitForReconnectionOK(0);
+ assertNotNull(result, result);
+
+ int n = clusterConnMgr.getRetryTimes(0);
+ assertEquals(2, n);
+
+ // now send 1 more messages
+ conn2 = createConnectionOnServer(factory, 0);
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer2 = sess2.createProducer(queue[0]);
+
+ for (int i = 0; i < messages.length; i++)
+ {
+ messages[i] = sess2.createTextMessage("new-suck-msg" + i);
+ producer2.send(messages[i]);
+ }
+
+ Thread.sleep(2000);
+
+ // should be sucked.
+ result = clusterConnMgr.checkMessageNotSucked(0);
+ assertNull(result, result);
+
+ removeAllMessages(queue[0].getQueueName(), true, 0);
+
+ clusterConnMgr.stop();
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java 2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java 2009-10-14 16:15:53 UTC (rev 7853)
@@ -24,6 +24,7 @@
import javax.management.ObjectName;
import javax.management.RuntimeMBeanException;
+import org.jboss.jms.server.ServerPeer;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.container.LocalTestServer;
@@ -101,6 +102,27 @@
}
}
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+ //make sure the defaults are correct
+ public void testServerPeerAttributeDefaults() throws Exception
+ {
+ LocalTestServer server = new LocalTestServer();
+
+ try
+ {
+ server.start("all", null, false, true);
+ ServerPeer sp = server.getServerPeer();
+ int interval = sp.getSuckerConnectionRetryInterval();
+ assertEquals(5000, interval);
+ int maxRetry = sp.getSuckerConnectionRetryTimes();
+ assertEquals(-1, maxRetry);
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java 2009-10-14 14:36:35 UTC (rev 7852)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP07_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java 2009-10-14 16:15:53 UTC (rev 7853)
@@ -30,7 +30,6 @@
// Constants -----------------------------------------------------
public static final boolean DEFAULT_CLUSTERED_MODE = false;
-
// Static --------------------------------------------------------
public static String getHypersonicDatabase(String connectionURL)
More information about the jboss-cvs-commits
mailing list