[jboss-cvs] JBoss Messaging SVN: r7799 - in branches: Branch_JBMESSAGING_1732 and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Sep 10 03:52:34 EDT 2009
Author: gaohoward
Date: 2009-09-10 03:52:34 -0400 (Thu, 10 Sep 2009)
New Revision: 7799
Added:
branches/Branch_JBMESSAGING_1732/
Modified:
branches/Branch_JBMESSAGING_1732/.classpath
branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml
branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml
branches/Branch_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
branches/Branch_JBMESSAGING_1732/tests/build.properties
branches/Branch_JBMESSAGING_1732/tests/etc/container.xml
branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java
branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java
Log:
a temporary branch
Copied: branches/Branch_JBMESSAGING_1732 (from rev 7798, branches/Branch_1_4)
Modified: branches/Branch_JBMESSAGING_1732/.classpath
===================================================================
--- branches/Branch_1_4/.classpath 2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/.classpath 2009-09-10 07:52:34 UTC (rev 7799)
@@ -22,7 +22,7 @@
<classpathentry kind="src" path="docs/examples/topic/src"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src"/>
- <classpathentry kind="lib" path="thirdparty/oswego-concurrent/lib/concurrent.jar" sourcepath="/extra/work-clebert/concurrent/concurrent/src"/>
+ <classpathentry kind="lib" path="thirdparty/oswego-concurrent/lib/concurrent.jar" sourcepath="/home/howard/projects/jboss/messaging/1.4/e_cp09/thirdparty/oswego-concurrent/lib/concurrent-src.zip"/>
<classpathentry kind="lib" path="thirdparty/jgroups/lib/jgroups.jar"/>
<classpathentry kind="lib" path="thirdparty/apache-log4j/lib/log4j.jar"/>
<classpathentry kind="lib" path="thirdparty/junit/lib/junit.jar"/>
@@ -46,7 +46,7 @@
<classpathentry kind="lib" path="thirdparty/jboss/jbossts14/lib/jbossjta.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
- <classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar"/>
+ <classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar" sourcepath="/home/howard/projects/jboss/messaging/1.4/e_cp09/thirdparty/jboss/common/lib/jboss-common-sources.jar"/>
<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-j2ee.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jboss-aop.jar"/>
<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-system.jar"/>
Modified: branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/messaging-service.xml 2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/server/default/deploy/messaging-service.xml 2009-09-10 07:52:34 UTC (rev 7799)
@@ -94,6 +94,10 @@
<attribute name="EnableMessageCounters">false</attribute>
+ <attribute name="SuckerConnectionRetryTimes">-1</attribute>
+
+ <attribute name="SuckerConnectionRetryInterval">5000</attribute>
+
<!-- The password used by the message sucker connections to create connections.
THIS SHOULD ALWAYS BE CHANGED AT INSTALL TIME TO SECURE SYSTEM
<attribute name="SuckerPassword"></attribute>
Modified: branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml 2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml 2009-09-10 07:52:34 UTC (rev 7799)
@@ -223,6 +223,18 @@
<name>SuckerPassword</name>
<type>java.lang.String</type>
</attribute>
+
+ <attribute access="read-write" getMethod="getSuckerConnectionRetryTimes" setMethod="setSuckerConnectionRetryTimes">
+ <description>Max times for a sucker's connection to retry in case of failure. Default is -1 (retry forever)</description>
+ <name>SuckerConnectionRetryTimes</name>
+ <type>int</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getSuckerConnectionRetryInterval" setMethod="setSuckerConnectionRetryInterval">
+ <description>The interval in milliseconds between each retry of the failed sucker's connection</description>
+ <name>SuckerConnectionRetryInterval</name>
+ <type>int</type>
+ </attribute>
<!-- Managed operations -->
Modified: branches/Branch_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/ServerPeer.java 2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/src/main/org/jboss/jms/server/ServerPeer.java 2009-09-10 07:52:34 UTC (rev 7799)
@@ -169,6 +169,10 @@
// For generating unique Channel ID for cluster without a shared DB
private long serverStartTime;
+ private int suckerConnectionRetryTimes = -1;
+
+ private int suckerConnectionRetryInterval = 5000;
+
// wired components
private DestinationJNDIMapper destinationJNDIMapper;
@@ -301,7 +305,7 @@
{
clusterConnectionManager = new ClusterConnectionManager(serverPeerID,
clusterPullConnectionFactoryName, defaultPreserveOrdering,
- SecurityStore.SUCKER_USER, suckerPassword);
+ SecurityStore.SUCKER_USER, suckerPassword, suckerConnectionRetryTimes, suckerConnectionRetryInterval);
clusterNotifier.registerListener(clusterConnectionManager);
}
@@ -1714,7 +1718,27 @@
return true;
}
+ public void setSuckerConnectionRetryTimes(int suckerConnectionRetryTimes)
+ {
+ this.suckerConnectionRetryTimes = suckerConnectionRetryTimes;
+ }
+ public int getSuckerConnectionRetryTimes()
+ {
+ return suckerConnectionRetryTimes;
+ }
+
+ public void setSuckerConnectionRetryInterval(int suckerConnectionRetryInterval)
+ {
+ this.suckerConnectionRetryInterval = suckerConnectionRetryInterval;
+ }
+
+ public int getSuckerConnectionRetryInterval()
+ {
+ return suckerConnectionRetryInterval;
+ }
+
+
// Inner classes --------------------------------------------------------------------------------
Modified: branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2009-09-10 07:52:34 UTC (rev 7799)
@@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
@@ -81,11 +82,17 @@
private String suckerUser;
private String suckerPassword;
+
+ private int maxRetry; //retry infinitely
+
+ private int retryInterval; //5 sec
public ClusterConnectionManager(int nodeID,
String connectionFactoryUniqueName, boolean preserveOrdering,
String suckerUser,
- String suckerPassword)
+ String suckerPassword,
+ int maxRetry,
+ int retryInterval)
{
connections = new HashMap();
@@ -99,6 +106,10 @@
this.suckerPassword = suckerPassword;
+ this.maxRetry = maxRetry;
+
+ this.retryInterval = retryInterval;
+
if (trace) { log.trace("Created " + this); }
}
@@ -400,7 +411,7 @@
{
try
{
- ConnectionInfo info = new ConnectionInfo(new JBossConnectionFactory(delegate), suckerUser, suckerPassword);
+ ConnectionInfo info = new ConnectionInfo(new JBossConnectionFactory(delegate), suckerUser, suckerPassword, nid == this.nodeID);
log.trace(this + " created connection info " + info);
@@ -607,7 +618,7 @@
}
}
- class ConnectionInfo
+ class ConnectionInfo implements ExceptionListener
{
private JBossConnectionFactory connectionFactory;
@@ -623,7 +634,9 @@
private String suckerPassword;
- ConnectionInfo(JBossConnectionFactory connectionFactory, String suckerUser, String suckerPassword) throws Exception
+ private boolean isLocal;
+
+ ConnectionInfo(JBossConnectionFactory connectionFactory, String suckerUser, String suckerPassword, boolean isLocal) throws Exception
{
this.connectionFactory = connectionFactory;
@@ -632,6 +645,8 @@
this.suckerUser = suckerUser;
this.suckerPassword = suckerPassword;
+
+ this.isLocal = isLocal;
}
synchronized void start() throws Exception
@@ -645,6 +660,12 @@
{
connection = (JBossConnection)connectionFactory.createConnection(suckerUser, suckerPassword);
+ //local connection doesn't need listener.
+ if (!isLocal)
+ {
+ connection.setExceptionListener(this);
+ }
+
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
}
@@ -693,7 +714,7 @@
synchronized void close()
{
- closeAllSuckers();
+ closeAllSuckers();
//Note we use a timed callable since remoting has a habit of hanging on attempting to close
//We do not want this to hang the system - especially failover
@@ -746,6 +767,112 @@
MessageSucker sucker = (MessageSucker)suckers.remove(queueName);
return sucker;
- }
- }
+ }
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+ //on exception, try to recreate all suckers.
+ public void onException(JMSException e)
+ {
+ cleanupConnection();
+ retryConnection();
+ }
+
+ //first stop all the suckers
+ //then try to close the connection
+ private synchronized void cleanupConnection()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ Iterator iter = suckers.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageSucker sucker = (MessageSucker)iter.next();
+
+ sucker.suspend();
+ }
+
+ Callable callable = new Callable() { public Object call()
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException ignore)
+ {
+ }
+ return null;
+ } };
+
+ Callable timedCallable = new TimedCallable(callable, CLOSE_TIMEOUT);
+
+ try
+ {
+ timedCallable.call();
+ }
+ catch (Throwable t)
+ {
+ //Ignore - the server might have already closed - so this is ok
+ }
+
+ connection = null;
+
+ started = false;
+ }
+
+ private synchronized void retryConnection()
+ {
+ int retryCount = 0;
+
+ while ((maxRetry == -1) || (retryCount < maxRetry))
+ {
+ try
+ {
+ start();
+ break;
+ }
+ catch (Exception e)
+ {
+ retryCount++;
+ if (trace)
+ {
+ log.trace("Retrying ConnectionInfo " + this + " failed, retry count: " + retryCount, e);
+ }
+ try
+ {
+ Thread.sleep(retryInterval);
+ }
+ catch(InterruptedException ite)
+ {
+ }
+ }
+ }
+
+ if (!started)
+ {
+ log.error("Retrying ConnectionInfo " + this + " failed after maxmum retry: " + retryCount);
+ return;
+ }
+
+ //now resume the suckers
+ Iterator iter = suckers.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageSucker sucker = (MessageSucker)iter.next();
+
+ try
+ {
+ sucker.resume(session);
+ }
+ catch (JMSException e)
+ {
+ log.warn("Error resuming sucker " + sucker, e);
+ }
+ }
+ }
+ }
}
Modified: branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2009-09-10 07:52:34 UTC (rev 7799)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.impl.clusterconnection;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -79,6 +80,8 @@
private JBossQueue jbq;
+ private boolean suspended = false;
+
public String toString()
{
return "MessageSucker:" + System.identityHashCode(this) + " queue:" + localQueue.getName();
@@ -191,12 +194,76 @@
started = false;
}
}
-
+
+ //the suspend stops the sucker's receiving end but doesn't unregister the sucker.
+ //we only suspend the consumer side.
+ public synchronized void suspend()
+ {
+ if (!started || suspended)
+ {
+ return;
+ }
+
+ setConsuming(false);
+
+ suspended = true;
+
+ try
+ {
+ consumer.closing(-1);
+ }
+ catch (Throwable t)
+ {
+ // Ignore
+ }
+ try
+ {
+ consumer.close();
+ }
+ catch (Throwable t)
+ {
+ // Ignore
+ }
+
+ sourceSession = null;
+
+ consumer = null;
+
+ clientConsumer = null;
+ }
+
+
+ public synchronized void resume(Session srcSession) throws JMSException
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ sourceSession = srcSession;
+
+ SessionDelegate sourcedel = ((JBossSession)sourceSession).getDelegate();
+
+ consumer = (ClientConsumerDelegate)sourcedel.createConsumerDelegate(jbq, null, false, null, false, false);
+
+ clientConsumer = ((ConsumerState)consumer.getState()).getClientConsumer();
+
+ consumer.setMessageListener(this);
+
+ suspended = false;
+
+ boolean forcedState = consuming;
+
+ consuming = !consuming;
+
+ setConsuming(forcedState);
+ }
+
public String getQueueName()
{
return this.localQueue.getName();
}
-
+
public synchronized void setConsuming(boolean consume)
{
if (trace) { log.trace(this + " setConsuming " + consume); }
@@ -206,6 +273,14 @@
return;
}
+ //for supended, we set the consuming flag and do nothing.
+ //later on resume, we force the sucker to be the last set consuming state.
+ if (suspended)
+ {
+ consuming = consume;
+ return;
+ }
+
try
{
if (consume && !consuming)
Modified: branches/Branch_JBMESSAGING_1732/tests/build.properties
===================================================================
--- branches/Branch_1_4/tests/build.properties 2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/tests/build.properties 2009-09-10 07:52:34 UTC (rev 7799)
@@ -2,5 +2,7 @@
# Local overrides for the builds initiated by build.sh
#
-test.bind.address=192.168.1.101
+test.bind.address=localhost
+jgroups.bind_addr=localhost
+
default.database=mysql
Modified: branches/Branch_JBMESSAGING_1732/tests/etc/container.xml
===================================================================
--- branches/Branch_1_4/tests/etc/container.xml 2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/tests/etc/container.xml 2009-09-10 07:52:34 UTC (rev 7799)
@@ -35,7 +35,7 @@
<url>jdbc:mysql://localhost/messaging</url>
<driver>com.mysql.jdbc.Driver</driver>
<isolation>TRANSACTION_READ_COMMITTED</isolation>
- <username>root</username>
+ <username>sa</username>
</database-configuration>
Modified: branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java 2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/jms/server/ServerPeerConfigurationTest.java 2009-09-10 07:52:34 UTC (rev 7799)
@@ -24,6 +24,7 @@
import javax.management.ObjectName;
import javax.management.RuntimeMBeanException;
+import org.jboss.jms.server.ServerPeer;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.container.LocalTestServer;
@@ -143,6 +144,27 @@
server.stop();
}
}
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1732
+ //make sure the defaults are correct
+ public void testServerPeerAttributeDefaults() throws Exception
+ {
+ LocalTestServer server = new LocalTestServer();
+
+ try
+ {
+ server.start("all", null, false, true);
+ ServerPeer sp = server.getServerPeer();
+ int interval = sp.getSuckerConnectionRetryInterval();
+ assertEquals(5000, interval);
+ int maxRetry = sp.getSuckerConnectionRetryTimes();
+ assertEquals(-1, maxRetry);
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
// Package protected ---------------------------------------------
Modified: branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java 2009-09-04 03:51:04 UTC (rev 7798)
+++ branches/Branch_JBMESSAGING_1732/tests/src/org/jboss/test/messaging/tools/container/ServiceContainerConfiguration.java 2009-09-10 07:52:34 UTC (rev 7799)
@@ -13,6 +13,7 @@
import java.util.Map;
import java.util.StringTokenizer;
+import org.jboss.logging.Logger;
import org.jboss.messaging.util.XMLUtil;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
@@ -30,7 +31,7 @@
// Constants -----------------------------------------------------
public static final boolean DEFAULT_CLUSTERED_MODE = false;
-
+ private static final DebugLogger log = new DebugLogger("hudson.log");
// Static --------------------------------------------------------
public static String getHypersonicDatabase(String connectionURL)
@@ -219,9 +220,12 @@
private void setCurrentDatabase(String xmlConfigDatabase)
{
database = System.getProperty("test.database");
+
+ log.log("xxx123--- database got from system property: " + database);
if (database == null)
{
database = xmlConfigDatabase;
+ log.log("xxx123--- database set to: " + database);
}
}
More information about the jboss-cvs-commits
mailing list