[jboss-cvs] JBoss Messaging SVN: r2774 - in trunk: src/main/org/jboss/jms/client/remoting and 16 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 12 18:43:54 EDT 2007
Author: timfox
Date: 2007-06-12 18:43:54 -0400 (Tue, 12 Jun 2007)
New Revision: 2774
Added:
trunk/src/main/org/jboss/jms/server/recovery/MessagingXAResourceWrapper.java
trunk/tests/src/org/jboss/test/messaging/jms/XAResourceRecoveryTest.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectWithRecoveryTest.java
Modified:
trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/server/recovery/MessagingXAResourceRecovery.java
trunk/src/main/org/jboss/jms/tx/MessagingXAResource.java
trunk/src/main/org/jboss/jms/tx/MessagingXid.java
trunk/src/main/org/jboss/jms/tx/ResourceManager.java
trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoundRobinRouter.java
trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
trunk/tests/bin/stop-rmi-server
trunk/tests/build.xml
trunk/tests/etc/jbossjta-properties.xml
trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java
trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
trunk/tests/src/org/jboss/test/messaging/jms/ClientExitTest.java
trunk/tests/src/org/jboss/test/messaging/jms/ManifestTest.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
trunk/tests/src/org/jboss/test/messaging/jms/message/ObjectMessageTest.java
trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainerConfiguration.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
trunk/tests/src/org/jboss/test/messaging/util/XMLUtilTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-845 http://jira.jboss.com/jira/browse/JBMESSAGING-985 http://jira.jboss.com/jira/browse/JBMESSAGING-760 http://jira.jboss.com/jira/browse/JBMESSAGING-962
Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -163,30 +163,37 @@
public Object handleClose(Invocation invocation) throws Throwable
{
- Object ret = invocation.invokeNext();
-
- ConnectionState state = getConnectionState(invocation);
-
- JMSRemotingConnection remotingConnection = state.getRemotingConnection();
-
- // remove the consolidated remoting connection listener
-
- ConsolidatedRemotingConnectionListener l = remotingConnection.removeConnectionListener();
- if (l != null)
- {
- l.clear();
- }
-
- // Finished with the connection - we need to shutdown callback server
- remotingConnection.stop();
-
- // Remove reference to message ID generator
- MessageIdGeneratorFactory.instance.checkInGenerator(state.getServerID());
-
- // And to resource manager
- ResourceManagerFactory.instance.checkInResourceManager(state.getServerID());
-
- return ret;
+ try
+ {
+ Object ret = invocation.invokeNext();
+
+ return ret;
+ }
+ finally
+ {
+ //Always cleanup in a finally - we need to cleanup if the server call to close fails too
+
+ ConnectionState state = getConnectionState(invocation);
+
+ JMSRemotingConnection remotingConnection = state.getRemotingConnection();
+
+ // remove the consolidated remoting connection listener
+
+ ConsolidatedRemotingConnectionListener l = remotingConnection.removeConnectionListener();
+ if (l != null)
+ {
+ l.clear();
+ }
+
+ // Finished with the connection - we need to shutdown callback server
+ remotingConnection.stop();
+
+ // Remove reference to message ID generator
+ MessageIdGeneratorFactory.instance.checkInGenerator(state.getServerID());
+
+ // And to resource manager
+ ResourceManagerFactory.instance.checkInResourceManager(state.getServerID());
+ }
}
public Object handleRegisterFailoverListener(Invocation invocation) throws Throwable
Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -287,17 +287,40 @@
log.debug(this + " started");
}
- public void stop() throws Throwable
+ public void stop()
{
log.debug(this + " closing");
// explicitly remove the callback listener, to avoid race conditions on server
// (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
- client.removeListener(callbackManager);
- client.disconnect();
+ try
+ {
+ client.removeListener(callbackManager);
+ }
+ catch(Throwable ignore)
+ {
+ // very unlikely to get an exception on a local remove (I suspect badly designed API),
+ // but we're failed anyway, so we don't care too much
+
+ // Actually an exception will always be thrown here if the failure was detected by the connection
+ // validator since the validator will disconnect the client before calling the connection
+ // listener.
+ log.trace(this + " failed to cleanly remove callback manager from the client", ignore);
+ }
+
+ try
+ {
+ client.disconnect();
+ }
+ catch (Throwable ignore)
+ {
+ log.trace(this + " failed to disconnect the client", ignore);
+ }
+
client = null;
+
log.debug(this + " closed");
}
@@ -338,30 +361,7 @@
log.trace(this + " failed to set disconnect timeout", ignore);
}
- try
- {
- client.removeListener(callbackManager);
- }
- catch(Throwable ignore)
- {
- // very unlikely to get an exception on a local remove (I suspect badly designed API),
- // but we're failed anyway, so we don't care too much
-
- // Actually an exception will always be thrown here if the failure was detected by the connection
- // validator since the validator will disconnect the client before calling the connection
- // listener.
-
- log.trace(this + " failed to cleanly remove callback manager from the client", ignore);
- }
-
- try
- {
- client.disconnect();
- }
- catch (Throwable ignore)
- {
- log.trace(this + " failed to disconnect the client", ignore);
- }
+ stop();
}
/**
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -331,7 +331,10 @@
waitForLastDelivery(lastDeliveryId);
//Important! We set the listener to null so the next ListenerRunner won't run
- setMessageListener(null);
+ if (listener != null)
+ {
+ setMessageListener(null);
+ }
//Now we wait for any current listener runners to run.
waitForOnMessageToComplete();
Modified: trunk/src/main/org/jboss/jms/server/recovery/MessagingXAResourceRecovery.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/recovery/MessagingXAResourceRecovery.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/src/main/org/jboss/jms/server/recovery/MessagingXAResourceRecovery.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -23,14 +23,8 @@
import java.util.StringTokenizer;
-import javax.jms.XAConnection;
-import javax.jms.XAConnectionFactory;
-import javax.jms.XASession;
-import javax.naming.Context;
-import javax.naming.InitialContext;
import javax.transaction.xa.XAResource;
-import org.jboss.jms.jndi.JMSProviderAdapter;
import org.jboss.logging.Logger;
import com.arjuna.ats.jta.recovery.XAResourceRecovery;
@@ -39,13 +33,6 @@
*
* A XAResourceRecovery instance that can be used to recover any JMS provider.
*
- *
- * This class will create a new XAConnection/XASession/XAResource on each sweep from the recovery manager.
- *
- * It can probably be optimised to keep the same XAResource between sweeps and only recreate if
- * a problem with the connection to the provider is detected, but considering that typical sweep periods
- * are of the order of 10s of seconds to several minutes, then the extra complexity of the code required
- * for that does not seem to be a good tradeoff.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
@@ -61,17 +48,13 @@
private String providerAdaptorName;
- private JMSProviderAdapter providerAdaptor;
-
private boolean hasMore;
private String username;
private String password;
- private XAConnection conn;
-
- private XAResource res;
+ private MessagingXAResourceWrapper res;
public MessagingXAResourceRecovery()
{
@@ -92,37 +75,7 @@
}
providerAdaptorName = tok.nextToken();
-
- InitialContext ic = null;
-
- try
- {
- ic = new InitialContext();
-
- providerAdaptor = (JMSProviderAdapter)ic.lookup(providerAdaptorName);
- }
- catch (Exception e)
- {
- //Note - we only log at trace, since this is likely to happen on the first pass since, when
- //deployed in JBAS the recovery manager will typically start up before the JMSProviderLoaders
- log.trace("Failed to look up provider adaptor", e);
-
- return false;
- }
- finally
- {
- if (ic != null)
- {
- try
- {
- ic.close();
- }
- catch (Exception ignore)
- {
- }
- }
- }
-
+
//Next two (optional) parameters are the username and password to use for creating the connection
//for recovery
@@ -138,8 +91,8 @@
password = tok.nextToken();
}
- hasMore = true;
-
+ res = new MessagingXAResourceWrapper(providerAdaptorName, username, password);
+
if (log.isTraceEnabled()) { log.trace(this + " initialised"); }
return true;
@@ -148,12 +101,7 @@
public boolean hasMoreResources()
{
if (log.isTraceEnabled()) { log.trace(this + " hasMoreResources"); }
-
- if (providerAdaptor == null)
- {
- return false;
- }
-
+
/*
* The way hasMoreResources is supposed to work is as follows:
* For each "sweep" the recovery manager will call hasMoreResources, then if it returns
@@ -165,89 +113,12 @@
* In our case where we only need to return one XAResource per sweep,
* hasMoreResources should basically alternate between true and false.
*
- * And we return a new XAResource every time it is called.
- * This makes this resilient to failure, since if the network fails
- * between the XAResource and it's server, on the next pass a new one will
- * be create and if the server is back up it will work.
- * This means there is no need for an XAResourceWrapper which is a technique used in the
- * old JMSProviderXAResourceRecovery
- * The recovery manager will throw away the XAResource after every sweep.
*
*/
-
- if (hasMore)
- {
- //Get a new XAResource
-
- try
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- catch (Exception ignore)
- {
- }
-
- Context ic = null;
-
- try
- {
- ic = providerAdaptor.getInitialContext();
-
- Object obj = ic.lookup(providerAdaptor.getFactoryRef());
-
- if (!(obj instanceof XAConnectionFactory))
- {
- throw new IllegalArgumentException("Connection factory from jms provider is not a XAConnectionFactory");
- }
-
- XAConnectionFactory connectionFactory = (XAConnectionFactory)obj;
-
- if (username == null)
- {
- conn = connectionFactory.createXAConnection();
- }
- else
- {
- conn = connectionFactory.createXAConnection(username, password);
- }
-
- XASession session = conn.createXASession();
-
- res = session.getXAResource();
-
- //Note the connection is closed the next time the xaresource is created or by the finalizer
-
- }
- catch (Exception e)
- {
- log.warn("Cannot create XAResource", e);
-
- hasMore = false;
- }
- finally
- {
- if (ic != null)
- {
- try
- {
- ic.close();
- }
- catch (Exception ignore)
- {
- }
- }
- }
-
- }
-
- boolean ret = hasMore;
-
+
hasMore = !hasMore;
- return ret;
+ return hasMore;
}
public XAResource getXAResource()
@@ -259,16 +130,7 @@
protected void finalize()
{
- try
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- catch (Exception ignore)
- {
- }
+ res.close();
}
}
Added: trunk/src/main/org/jboss/jms/server/recovery/MessagingXAResourceWrapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/recovery/MessagingXAResourceWrapper.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/recovery/MessagingXAResourceWrapper.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -0,0 +1,407 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.jms.server.recovery;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.jms.jndi.JMSProviderAdapter;
+import org.jboss.logging.Logger;
+import org.jboss.util.naming.Util;
+
+/**
+ * XAResourceWrapper.
+ *
+ * Mainly from org.jboss.server.XAResourceWrapper from the JBoss AS server module
+ *
+ * The reason why we don't use that class directly is that it assumes on failure of connection
+ * the RM_FAIL or RM_ERR is thrown, but in JBM we throw XA_RETRY since we want the recovery manager to be able
+ * to retry on failure without having to manually retry
+ *
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ *
+ * @author <a href="tim.fox at jboss.com">Tim Fox/a>
+ *
+ * @version $Revision: 45341 $
+ */
+public class MessagingXAResourceWrapper implements XAResource, ExceptionListener
+{
+ /** The log */
+ private static final Logger log = Logger.getLogger(MessagingXAResourceWrapper.class);
+
+ /** The jms provider name */
+ private String providerName;
+
+ /** The state lock */
+ private static final Object lock = new Object();
+
+ /** The connection */
+ private XAConnection connection;
+
+ /** The delegate XAResource */
+ private XAResource delegate;
+
+ private String username;
+
+ private String password;
+
+ public MessagingXAResourceWrapper(String providerName, String username, String password)
+ {
+ this.providerName = providerName;
+
+ this.username = username;
+
+ this.password = password;
+ }
+
+ /**
+ * Get the providerName.
+ *
+ * @return the providerName.
+ */
+ public String getProviderName()
+ {
+ return providerName;
+ }
+
+ /**
+ * Set the providerName.
+ *
+ * @param providerName the providerName.
+ */
+ public void setProviderName(String providerName)
+ {
+ this.providerName = providerName;
+ }
+
+ public Xid[] recover(int flag) throws XAException
+ {
+ log.debug("Recover " + providerName);
+ XAResource xaResource = getDelegate();
+ try
+ {
+ return xaResource.recover(flag);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ }
+
+ public void commit(Xid xid, boolean onePhase) throws XAException
+ {
+ log.debug("Commit " + providerName + " xid " + " onePhase=" + onePhase);
+ XAResource xaResource = getDelegate();
+ try
+ {
+ xaResource.commit(xid, onePhase);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ }
+
+ public void rollback(Xid xid) throws XAException
+ {
+ log.debug("Rollback " + providerName + " xid ");
+ XAResource xaResource = getDelegate();
+ try
+ {
+ xaResource.rollback(xid);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ }
+
+ public void forget(Xid xid) throws XAException
+ {
+ log.debug("Forget " + providerName + " xid ");
+ XAResource xaResource = getDelegate();
+ try
+ {
+ xaResource.forget(xid);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ }
+
+ public boolean isSameRM(XAResource xaRes) throws XAException
+ {
+ if (xaRes instanceof MessagingXAResourceWrapper)
+ xaRes = ((MessagingXAResourceWrapper) xaRes).getDelegate();
+
+ XAResource xaResource = getDelegate();
+ try
+ {
+ return xaResource.isSameRM(xaRes);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ }
+
+ public int prepare(Xid xid) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+ try
+ {
+ return xaResource.prepare(xid);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ }
+
+ public void start(Xid xid, int flags) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+ try
+ {
+ xaResource.start(xid, flags);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ }
+
+ public void end(Xid xid, int flags) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+ try
+ {
+ xaResource.end(xid, flags);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ }
+
+ public int getTransactionTimeout() throws XAException
+ {
+ XAResource xaResource = getDelegate();
+ try
+ {
+ return xaResource.getTransactionTimeout();
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ }
+
+ public boolean setTransactionTimeout(int seconds) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+ try
+ {
+ return xaResource.setTransactionTimeout(seconds);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ }
+
+ public void onException(JMSException exception)
+ {
+ log.warn("Notified of connection failure in recovery delegate for provider " + providerName, exception);
+ close();
+ }
+
+ /**
+ * Get the delegate XAResource
+ *
+ * @return the delegate
+ * @throws XAException for any problem
+ */
+ public XAResource getDelegate() throws XAException
+ {
+ XAResource result = null;
+ Exception error = null;
+ try
+ {
+ result = connect();
+ }
+ catch (Exception e)
+ {
+ log.error("********************************Failed to connect to server", e);
+ error = e;
+ }
+
+ if (result == null)
+ {
+ XAException xae = new XAException("Error trying to connect to provider " + providerName);
+ xae.errorCode = XAException.XAER_RMERR;
+ if (error != null)
+ xae.initCause(error);
+ log.debug("Cannot get delegate XAResource", xae);
+ throw xae;
+ }
+
+ return result;
+ }
+
+ /**
+ * Connect to the server if not already done so
+ *
+ * @return the delegate XAResource
+ * @throws Exception for any problem
+ */
+ protected XAResource connect() throws Exception
+ {
+ // Do we already have a valid delegate?
+ synchronized (lock)
+ {
+ if (delegate != null)
+ return delegate;
+ }
+
+ // Create the connection
+ XAConnection xaConnection;
+
+ if (username == null)
+ {
+ xaConnection = getConnectionFactory().createXAConnection();
+ }
+ else
+ {
+ xaConnection = getConnectionFactory().createXAConnection(username, password);
+ }
+
+ synchronized (lock)
+ {
+ connection = xaConnection;
+ }
+
+ // Retrieve the delegate XAResource
+ try
+ {
+ XASession session = connection.createXASession();
+ XAResource result = session.getXAResource();
+ synchronized (lock)
+ {
+ delegate = result;
+ }
+ return delegate;
+ }
+ catch (Exception e)
+ {
+ close();
+ throw e;
+ }
+ }
+
+ /**
+ * Get the XAConnectionFactory
+ *
+ * @return the connection
+ * @throws Exception for any problem
+ */
+ protected XAConnectionFactory getConnectionFactory() throws Exception
+ {
+ // Get the JMS Provider Adapter
+ if (providerName == null)
+ throw new IllegalArgumentException("Null provider name");
+ Context ctx = new InitialContext();
+
+ JMSProviderAdapter adapter = (JMSProviderAdapter) ctx.lookup(providerName);
+
+ // Determine the XAConnectionFactory name
+ String connectionFactoryRef = adapter.getFactoryRef();
+ if (connectionFactoryRef == null)
+ throw new IllegalStateException("Provider '" + providerName + "' has no FactoryRef");
+
+ // Lookup the connection factory
+ ctx = adapter.getInitialContext();
+ try
+ {
+ return (XAConnectionFactory) Util.lookup(ctx, connectionFactoryRef, XAConnectionFactory.class);
+ }
+ finally
+ {
+ ctx.close();
+ }
+ }
+
+ /**
+ * Close the connection
+ */
+ public void close()
+ {
+ try
+ {
+ XAConnection oldConnection = null;
+ synchronized (lock)
+ {
+ oldConnection = connection;
+ connection = null;
+ delegate = null;
+ }
+ if (oldConnection != null)
+ oldConnection.close();
+ }
+ catch (Exception ignored)
+ {
+ log.trace("Ignored error during close", ignored);
+ }
+ }
+
+ /**
+ * Check whether an XAException is fatal. If it is an RM problem
+ * we close the connection so the next call will reconnect.
+ *
+ * @param e the xa exception
+ * @return never
+ * @throws XAException always
+ */
+ protected XAException check(XAException e) throws XAException
+ {
+ if (e.errorCode == XAException.XA_RETRY)
+ {
+ log.debug("Fatal error in provider " + providerName, e);
+ close();
+ }
+ throw new XAException(XAException.XAER_RMFAIL);
+ }
+
+ protected void finalize() throws Throwable
+ {
+ close();
+ }
+}
Modified: trunk/src/main/org/jboss/jms/tx/MessagingXAResource.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/MessagingXAResource.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/src/main/org/jboss/jms/tx/MessagingXAResource.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -108,7 +108,11 @@
return false;
}
- return ((MessagingXAResource)xaResource).rm == this.rm;
+ boolean same = ((MessagingXAResource)xaResource).rm.getServerID() == this.rm.getServerID();
+
+ if (trace) { log.trace("Calling isSameRM, result is " + same); }
+
+ return same;
}
public void start(Xid xid, int flags) throws XAException
Modified: trunk/src/main/org/jboss/jms/tx/MessagingXid.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/MessagingXid.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/src/main/org/jboss/jms/tx/MessagingXid.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -137,9 +137,8 @@
public String toString()
{
- return getClass().getName() + "(GID: " + stringRep(getGlobalTransactionId()) +
- ", Branch: " + stringRep(getBranchQualifier()) +
- ", Format: " + getFormatId() + ")";
+ return "MessagingXid (" + System.identityHashCode(this) + " bq:" + stringRep(branchQualifier) +
+ " formatID:" + formatId + " gtxid:" + stringRep(globalTransactionId);
}
private String stringRep(byte[] bytes)
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -70,18 +70,26 @@
private ConcurrentHashMap transactions = new ConcurrentHashMap();
+ private int serverID;
+
// Static ---------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ResourceManager.class);
// Constructors ---------------------------------------------------------------------------------
- ResourceManager()
+ ResourceManager(int serverID)
{
+ this.serverID = serverID;
}
// Public ---------------------------------------------------------------------------------------
+ public int getServerID()
+ {
+ return serverID;
+ }
+
/*
* Merge another resource manager into this one - used in failover
*/
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -87,7 +87,7 @@
if (h == null)
{
- h = new Holder();
+ h = new Holder(serverID);
holders.put(i, h);
}
else
@@ -164,9 +164,9 @@
{
ResourceManager rm;
- Holder()
+ Holder(int serverID)
{
- rm = new ResourceManager();
+ rm = new ResourceManager(serverID);
}
Holder(ResourceManager rm)
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -121,14 +121,11 @@
public boolean contains(Receiver queue)
{
- //FIXME - what about failed over queues??
return localQueue == queue || nonLocalQueues.contains(queue);
}
public Iterator iterator()
- {
- //FIXME - this is broken - where are the failed over queuues?
-
+ {
List queues = new ArrayList();
if (localQueue != null)
@@ -141,11 +138,6 @@
return queues.iterator();
}
- public boolean add(Receiver receiver)
- {
- return add(receiver, false);
- }
-
public boolean remove(Receiver queue)
{
if (localQueue == queue)
@@ -203,31 +195,34 @@
return localQueue;
}
- public boolean add(Receiver receiver, boolean failedOver)
+ public boolean add(Receiver receiver)
{
- if (receiver instanceof ClusteredQueue)
- {
-
- ClusteredQueue queue = (ClusteredQueue)receiver;
-
- if (queue.isLocal())
- {
- if (localQueue != null)
- {
- throw new IllegalStateException(this + " already has local queue");
- }
- localQueue = queue;
- }
- else
- {
- nonLocalQueues.add(queue);
- }
- }
- else
- {
- localQueue = (Queue)receiver;
- }
+ if (receiver instanceof ClusteredQueue)
+ {
+ ClusteredQueue queue = (ClusteredQueue)receiver;
+ if (queue.isLocal())
+ {
+ if (localQueue != null)
+ {
+ throw new IllegalStateException(this + " already has local queue");
+ }
+ localQueue = queue;
+ }
+ else
+ {
+ nonLocalQueues.add(queue);
+ }
+ }
+ else
+ {
+ if (localQueue != null)
+ {
+ throw new IllegalStateException(this + " already has local queue");
+ }
+ localQueue = (Queue)receiver;
+ }
+
return true;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoundRobinRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoundRobinRouter.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RoundRobinRouter.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -63,7 +63,7 @@
// ArrayList<>; MUST be an arraylist for fast index access
private ArrayList queues;
- private ClusteredQueue localQueue;
+ private Queue localQueue;
private int target;
@@ -82,11 +82,11 @@
if (!queues.isEmpty())
{
- ClusteredQueue queue = (ClusteredQueue)queues.get(target);
+ Queue queue = (Queue)queues.get(target);
Delivery del = queue.handle(observer, ref, tx);
- if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }
+ if (trace) { log.trace(this + " routed to queue, it returned " + del); }
incTarget();
@@ -105,14 +105,11 @@
public boolean contains(Receiver queue)
{
- //FIXME - what about failed over queues??
return queues.contains(queue);
}
public Iterator iterator()
{
- //FIXME - this is broken - where are the failed over queuues?
-
return queues.iterator();
}
@@ -151,7 +148,6 @@
public int getNumberOfReceivers()
{
- //FIXME - what about failed over queues????
return queues.size();
}
@@ -164,22 +160,32 @@
public boolean add(Receiver receiver, boolean failedOver)
{
- ClusteredQueue queue = (ClusteredQueue)receiver;
+ Queue queue = (Queue)receiver;
- if (queue.isLocal())
+ queues.add(receiver);
+
+ if (queue instanceof ClusteredQueue)
{
- if (localQueue == null)
- {
- localQueue = queue;
- }
- else
- {
- throw new IllegalStateException("Local queue already exists");
- }
+ ClusteredQueue clusteredQueue = (ClusteredQueue)queue;
+
+ if (clusteredQueue.isLocal())
+ {
+ if (localQueue != null)
+ {
+ throw new IllegalStateException(this + " already has local queue");
+ }
+ localQueue = clusteredQueue;
+ }
}
-
- queues.add(receiver);
-
+ else
+ {
+ if (localQueue != null)
+ {
+ throw new IllegalStateException(this + " already has local queue");
+ }
+ localQueue = queue;
+ }
+
return true;
}
Modified: trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -323,7 +323,24 @@
if (trace) log.trace("Destination for message[ID=" + ref.getMessage().getMessageID() + "] is: " + queue);
+ // The actual jmx queue may not have been deployed yet, so we need to activate it if so,
+ // or the handle will have no effect
+
+ boolean deactivate = false;
+
+ if (!queue.isActive())
+ {
+ queue.activate();
+
+ deactivate = true;
+ }
+
queue.handle(null, ref, tx);
+
+ if (deactivate)
+ {
+ queue.deactivate();
+ }
}
finally
{
@@ -382,7 +399,24 @@
try
{
+ // The actual jmx queue may not have been deployed yet, so we need to activate it if so,
+ // or the acknowledge will have no effect
+
+ boolean deactivate = false;
+
+ if (!queue.isActive())
+ {
+ queue.activate();
+
+ deactivate = true;
+ }
+
del.acknowledge(tx);
+
+ if (deactivate)
+ {
+ queue.deactivate();
+ }
}
catch (Throwable t)
{
Modified: trunk/tests/bin/stop-rmi-server
===================================================================
--- trunk/tests/bin/stop-rmi-server 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/bin/stop-rmi-server 2007-06-12 22:43:54 UTC (rev 2774)
@@ -55,5 +55,5 @@
CLASSPATH=`cat $CLASSPATH_FILE`
-"$JAVA_HOME/java" $JAVA_OPTS -cp $CLASSPATH -Dtest.bind.address=localhost \
+"$JAVA_HOME/bin/java" $JAVA_OPTS -cp $CLASSPATH -Dtest.bind.address=localhost \
org.jboss.test.messaging.tools.jmx.rmi.StopRMIServer
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/build.xml 2007-06-12 22:43:54 UTC (rev 2774)
@@ -472,6 +472,7 @@
<exclude name="**/thirdparty/remoting/CallbackServerTimeoutTest.class"/>
<exclude name="**/thirdparty/remoting/ClientInvokerTimeoutTest.class"/>
<exclude name="**/thirdparty/remoting/SocketTransportCausalityTest.class"/>
+ <exclude name="**/jms/XAResourceRecoveryTest.class"/>
</fileset>
</batchtest>
</junit>
Modified: trunk/tests/etc/jbossjta-properties.xml
===================================================================
--- trunk/tests/etc/jbossjta-properties.xml 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/etc/jbossjta-properties.xml 2007-06-12 22:43:54 UTC (rev 2774)
@@ -1,19 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
-
-<!-- Arjuna JTA config for JBoss Messaging test suite
-This config has some quicker settings (like recovery period) than you would
-probably have in a real production config, this is so the tests don't take aeons to run -->
-
<transaction-service>
-
<properties depends="common" name="arjuna">
<!--
- (Must be unique across all Arjuna instances.)
- -->
- <property name="com.arjuna.ats.arjuna.xa.nodeIdentifier" value="1"/>
- <property name="com.arjuna.ats.jta.xaRecoveryNode" value="1"/>
-
- <!--
Transaction Reaper Timeout (default is 120000 microseconds).
-->
<property
@@ -64,44 +52,47 @@
-->
<property
name="com.arjuna.ats.arjuna.objectstore.transactionSync" value="ON"/>
-
+ <!--
+ (Must be unique across all Arjuna instances.)
+ -->
+ <property name="com.arjuna.ats.arjuna.xa.nodeIdentifier" value="1"/>
<!-- property
name="com.arjuna.ats.arjuna.coordinator.actionStore"
- value="HashedActionStore"
- value="JDBCActionStore"
+ value="HashedActionStore"
+ value="JDBCActionStore"
-->
<!-- property
name="com.arjuna.ats.arjuna.objectstore.jdbcTxDbAccess"
- value="JDBCAccess"
+ value="JDBCAccess"
-->
<!-- property
name="com.arjuna.ats.arjuna.objectstore.objectStoreType"
- value="ShadowNoFileLockStore"
- value="JDBCStore"
+ value="ShadowNoFileLockStore"
+ value="JDBCStore"
-->
<!-- property
name="com.arjuna.ats.arjuna.objectstore.jdbcUserDbAccess"
- value="JDBCAccess"
+ value="JDBCAccess"
-->
<!-- property
name="com.arjuna.ats.arjuna.objectstore.jdbcPoolSizeInitial"
- value="1"
+ value="1"
-->
<!-- property
name="com.arjuna.ats.arjuna.objectstore.jdbcPoolSizeMaximum"
- value="1"
+ value="1"
-->
<!-- property
name="com.arjuna.ats.arjuna.objectstore.jdbcPoolPutConnections"
- value="false"
+ value="false"
-->
<!-- property
name="com.arjuna.ats.arjuna.internal.arjuna.objectstore.cacheStore.size"
- value=""
+ value=""
-->
<!-- property
name="com.arjuna.ats.arjuna.internal.arjuna.objectstore.cacheStore.period"
- value=""
+ value=""
-->
<!--
The location for creating temporary files, e.g., Uids.
@@ -115,15 +106,64 @@
value="var"/>
-->
</properties>
- <properties depends="arjuna" name="recoverymanager">
+ <properties name="common">
+ <!-- CLF 2.0 properties -->
+ <property name="com.arjuna.common.util.logging.DebugLevel"
+ type="System" value="0x00000000"/>
+ <property name="com.arjuna.common.util.logging.FacilityLevel"
+ type="System" value="0xffffffff"/>
+ <property name="com.arjuna.common.util.logging.VisibilityLevel"
+ type="System" value="0xffffffff"/>
+ <property name="com.arjuna.common.util.logger" type="System" value="log4j"/>
+ </properties>
+ <properties depends="arjuna" name="txoj">
+ <!--
+ (default is LockStore of installation - must be writeable!)
+ -->
+ <!--
+ <property
+ name="com.arjuna.ats.txoj.lockstore.lockStoreDir"
+ value="LockStore"/>
+ -->
+ <!--
+ (default is BasicLockStore)
+ -->
+ <property name="com.arjuna.ats.txoj.lockstore.lockStoreType" value="BasicLockStore"/>
+ <!--
+ (default is NO)
+ -->
+ <property name="com.arjuna.ats.txoj.lockstore.multipleLockStore" value="NO"/>
+ <!--
+ (default is YES)
+ -->
+ <property name="com.arjuna.ats.txoj.lockstore.singleLockStore" value="YES"/>
+ <!--
+ (default is YES)
+ -->
+ <property
+ name="com.arjuna.ats.txoj.lockstore.allowNestedLocking" value="YES"/>
+ </properties>
+ <properties depends="arjuna" name="jta">
+ <!--
+ Support subtransactions in the JTA layer?
+ Default is NO.
+ -->
+ <property name="com.arjuna.ats.jta.supportSubtransactions" value="NO"/>
+ <property name="com.arjuna.ats.jta.jtaTMImplementation" value="com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple"/>
+ <!--
+ com.arjuna.ats.internal.jta.transaction.jts.TransactionManagerImple
+ -->
+ <property name="com.arjuna.ats.jta.jtaUTImplementation" value="com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple"/>
+ <!--
+ com.arjuna.ats.internal.jta.transaction.jts.UserTransactionImple
+ -->
+<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.JBMESSAGING" value="org.jboss.jms.server.recovery.MessagingXAResourceRecovery;adaptor1"/>
- <property name="com.arjuna.ats.arjuna.xa.nodeIdentifier" value="1"/>
- <property name="com.arjuna.ats.jta.xaRecoveryNode" value="1"/>
+
+ </properties>
+ <properties depends="arjuna,txoj,jta" name="recoverymanager">
<!--
- (Must be unique across all Arjuna instances.)
- -->
- <!--
Properties used only by the RecoveryManager.
-->
<!--
@@ -135,14 +175,24 @@
Default is 120 seconds.
-->
<property
- name="com.arjuna.ats.arjuna.recovery.periodicRecoveryPeriod" value="10"/>
+ name="com.arjuna.ats.arjuna.recovery.periodicRecoveryPeriod" value="20"/>
<!--
Interval in seconds between first and second pass of periodic recovery.
Default is 10 seconds.
-->
<property
- name="com.arjuna.ats.arjuna.recovery.recoveryBackoffPeriod" value="2"/>
+ name="com.arjuna.ats.arjuna.recovery.recoveryBackoffPeriod" value="10"/>
+ <!--
+ Periodic recovery modules to use. Invoked in sort-order of names.
+ -->
+ <property name="com.arjuna.ats.arjuna.recovery.recoveryExtension0" value="com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule"/>
+
+ <property name="com.arjuna.ats.arjuna.recovery.recoveryExtension1" value="com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule"/>
+
+ <property name="com.arjuna.ats.arjuna.recovery.recoveryExtension2" value="com.arjuna.ats.internal.txoj.recovery.TORecoveryModule"/>
+
+
<!--
Expired entry removal
-->
@@ -175,97 +225,7 @@
-->
<property
name="com.arjuna.ats.arjuna.recovery.transactionStatusManagerPort" value="0"/>
- <!--
- Properties used only by the RecoveryManager.
- -->
- <!--
- Periodic recovery modules to use. Invoked in sort-order of names.
- -->
-
- <property name="com.arjuna.ats.arjuna.recovery.recoveryExtension3" value="com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule"/>
-
-
</properties>
- <properties name="common">
- <!-- CLF 2.0 properties -->
- <property name="com.arjuna.common.util.logging.DebugLevel"
- type="System" value="0x00000000"/>
- <property name="com.arjuna.common.util.logging.FacilityLevel"
- type="System" value="0xffffffff"/>
- <property name="com.arjuna.common.util.logging.VisibilityLevel"
- type="System" value="0xffffffff"/>
- <property name="com.arjuna.common.util.logger" type="System" value="log4j"/>
- </properties>
- <properties depends="arjuna" name="txoj">
- <!--
- (default is LockStore of installation - must be writeable!)
- -->
- <!--
- <property
- name="com.arjuna.ats.txoj.lockstore.lockStoreDir"
- value="LockStore"/>
- -->
- <!--
- (default is BasicLockStore)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.lockStoreType" value="BasicLockStore"/>
- <!--
- (default is NO)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.multipleLockStore" value="NO"/>
- <!--
- (default is YES)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.singleLockStore" value="YES"/>
- <!--
- (default is YES)
- -->
- <property
- name="com.arjuna.ats.txoj.lockstore.allowNestedLocking" value="YES"/>
- </properties>
-
- <properties depends="arjuna" name="jta">
- <property name="com.arjuna.ats.arjuna.xa.nodeIdentifier" value="1"/>
- <property name="com.arjuna.ats.jta.xaRecoveryNode" value="1"/>
-
-
- <!-- This used when deployed in the app server and we want to do a recovery of a JMS provider, the string after the semi colon
- must match the JMS provider name from the JMSProviderAdapter -->
-
- <!--
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.JBMESSAGING" value="org.jboss.jms.recovery.JMSProviderXAResourceRecovery;DefaultJMSProvider"/>
--->
-
- <!-- This is used for recovery using the message bridge, in which case JMSProviderAdapters aren't deployed so the string after the semi-colon
- is first the provider name, then a comma, then the name of a properties file available on the classpath which has the server connection information, e.g:
- provider1.jndi.prop1=xxxx
- provider1.jndi.prop2=yyyy
- provider1.jndi.prop3=zzzz
- provider1.xaconnectionfactorylookup=xyz
- provider2.jndi.prop1=xxxx
- provider2.jndi.prop2=yyyy
- provider2.jndi.prop3=zzzz
- provider2.xaconnectionfactorylookup=xyz
- -->
-
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.JBMESSAGINGBRIDGE_SERVER0" value="org.jboss.jms.server.recovery.MessagingXAResourceRecovery;adaptor1"/>
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.JBMESSAGINGBRIDGE_SERVER1" value="org.jboss.jms.server.recovery.MessagingXAResourceRecovery;adaptor2"/>
-
- <!--
- Support subtransactions in the JTA layer?
- Default is NO.
- -->
- <property name="com.arjuna.ats.jta.supportSubtransactions" value="NO"/>
- <property name="com.arjuna.ats.jta.jtaTMImplementation" value="com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple"/>
- <!--
- com.arjuna.ats.internal.jta.transaction.jts.TransactionManagerImple
- -->
- <property name="com.arjuna.ats.jta.jtaUTImplementation" value="com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple"/>
- <!--
- com.arjuna.ats.internal.jta.transaction.jts.UserTransactionImple
- -->
-
- </properties>
<properties depends="jta" name="jdbc">
<!--
property name="com.arjuna.ats.jdbc.isolationLevel" value="TRANSACTION_SERIALIZABLE"/>
Modified: trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -539,7 +539,7 @@
MessageReference ref = createReference(0, true, "payload");
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
- log.info("ref is reliable:" + ref.getMessage().isReliable());
+ log.trace("ref is reliable:" + ref.getMessage().isReliable());
// non-transacted send, reliable message, one message
Delivery delivery = queue.handle(observer, ref, null);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -116,7 +116,6 @@
try
{
conn = cf.createTopicConnection();
- System.out.println("****** ClientID = " + conn.getClientID());
TopicSession sess = conn.createTopicSession(true, 0);
TopicPublisher pub = sess.createPublisher(topic);
pub.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -432,11 +431,8 @@
* recovery.
*/
public void testIndividualClientAcknowledge() throws Exception
- {
-
+ {
Connection conn = cf.createConnection();
- for (int i=0; i<20; i++) System.out.println("*******************************************");
- System.out.println("clientID = " + conn.getClientID());
Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSess.createProducer(queue);
@@ -1213,9 +1209,7 @@
count++;
TextMessage tm = (TextMessage)m;
-
- log.info("Got message: " + tm.getText());
-
+
// Receive first three messages then recover() session
// Only last message should be redelivered
if (count == 1)
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ClientExitTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ClientExitTest.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ClientExitTest.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -176,65 +176,10 @@
String commandLine = sb.toString();
- //System.out.println(commandLine);
-
Process process = Runtime.getRuntime().exec(commandLine);
log.trace("process: " + process);
-// final boolean verbose = true;
-//
-// final BufferedReader rs = new BufferedReader(new InputStreamReader(process.getInputStream()));
-// final BufferedReader re = new BufferedReader(new InputStreamReader(process.getErrorStream()));
-//
-// new Thread(new Runnable()
-// {
-// public void run()
-// {
-// try
-// {
-// String line;
-//
-// while((line = rs.readLine()) != null)
-// {
-// if (verbose)
-// {
-// System.out.println("GRACEFUL CLIENT STDOUT: " + line);
-// }
-// }
-// }
-// catch(Exception e)
-// {
-// log.error("exception", e);
-// }
-// }
-//
-// }, "GRACEFUL CLIENT STDOUT reader thread").start();
-//
-// new Thread(new Runnable()
-// {
-// public void run()
-// {
-// try
-// {
-// String line;
-//
-// while((line = re.readLine()) != null)
-// {
-// if (verbose)
-// {
-// System.out.println("GRACEFUL CLIENT STDERR: " + line);
-// }
-// }
-// }
-// catch(Exception e)
-// {
-// log.error("exception", e);
-// }
-// }
-//
-// }, "GRACEFUL CLIENT STDERR reader thread").start();
-
return process;
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ManifestTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ManifestTest.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ManifestTest.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -102,7 +102,7 @@
Iterator itr = attrs.entrySet().iterator();
while (itr.hasNext()) {
Object item = itr.next();
- System.out.println("MANIFEST--> " + item + " : " + attrs.get(item));
+ log.trace("MANIFEST--> " + item + " : " + attrs.get(item));
}
assertEquals(attrs.getValue("Implementation-Title"), meta.getJMSProviderName());
Added: trunk/tests/src/org/jboss/test/messaging/jms/XAResourceRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XAResourceRecoveryTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XAResourceRecoveryTest.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -0,0 +1,580 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import java.util.Hashtable;
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.jboss.jms.jndi.JMSProviderAdapter;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.TestJMSProviderAdaptor;
+import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
+import org.jboss.test.messaging.tools.jmx.ServiceContainer;
+import org.jboss.test.messaging.tools.jndi.InVMInitialContextFactory;
+import org.jboss.tm.TxUtils;
+
+import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
+
+/**
+ *
+ * A XAResourceRecoveryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class XAResourceRecoveryTest extends MessagingTestCase
+{
+ protected int nodeCount = 2;
+
+ protected ServiceContainer sc;
+
+ protected XAConnectionFactory cf0, cf1;
+
+ protected Destination queue0, queue1;
+
+ protected TransactionManager tm;
+
+ protected Transaction suspendedTx;
+
+ public XAResourceRecoveryTest(String name)
+ {
+ super(name);
+ }
+
+ protected void setUp() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ throw new IllegalStateException("This test should only be run in remote mode");
+ }
+
+ super.setUp();
+
+ log.info("Starting " + nodeCount + " servers");
+
+ for (int i = 0; i < nodeCount; i++)
+ {
+ // make sure all servers are created and started; make sure that database is zapped
+ // ONLY for the first server, the others rely on values they expect to find in shared
+ // tables; don't clear the database for those.
+ ServerManagement.start(i, "all", i == 0);
+ }
+
+ //We need a local transaction and recovery manager
+ //We must start this after the remote servers have been created or it won't
+ //have deleted the database and the recovery manager may attempt to recover transactions
+ sc = new ServiceContainer("jbossjta");
+
+ sc.start(false);
+
+ ServerManagement.undeployQueue("queue0", 0);
+
+ ServerManagement.undeployQueue("queue1", 1);
+
+ ServerManagement.deployQueue("queue0", 0);
+
+ ServerManagement.deployQueue("queue1", 1);
+
+ Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
+
+ Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+
+ InitialContext ic0 = new InitialContext(props0);
+
+ InitialContext ic1 = new InitialContext(props1);
+
+ cf0 = (XAConnectionFactory)ic0.lookup("/XAConnectionFactory");
+
+ cf1 = (XAConnectionFactory)ic1.lookup("/XAConnectionFactory");
+
+ queue0 = (Queue)ic0.lookup("/queue/queue0");
+
+ queue1 = (Queue)ic1.lookup("/queue/queue1");
+
+ InitialContext localIc = new InitialContext(InVMInitialContextFactory.getJNDIEnvironment());
+
+ tm = (TransactionManager)localIc.lookup(ServiceContainer.TRANSACTION_MANAGER_JNDI_NAME);
+
+ log.info("tm is " + tm.getClass().getName());
+ assertTrue(tm instanceof TransactionManagerImple);
+
+ drainDestination((ConnectionFactory)cf0, queue0);
+
+ drainDestination((ConnectionFactory)cf1, queue1);
+
+ if (!ServerManagement.isRemote())
+ {
+ suspendedTx = tm.suspend();
+ }
+
+ //Now install local JMSProviderAdaptor classes
+
+ Properties p1 = new Properties();
+ p1.putAll(ServerManagement.getJNDIEnvironment(1));
+
+ JMSProviderAdapter targetAdaptor =
+ new TestJMSProviderAdaptor(p1, "/XAConnectionFactory", "adaptor1");
+
+ sc.installJMSProviderAdaptor("adaptor1", targetAdaptor);
+
+ sc.startRecoveryManager();
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ ServerManagement.undeployQueue("queue0", 0);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+
+ try
+ {
+ ServerManagement.undeployQueue("queue1", 1);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ if (TxUtils.isUncommitted(tm))
+ {
+ //roll it back
+ try
+ {
+ tm.rollback();
+ }
+ catch (Throwable ignore)
+ {
+ //The connection will probably be closed so this may well throw an exception
+ }
+ }
+ if (tm.getTransaction() != null)
+ {
+ Transaction tx = tm.suspend();
+ if (tx != null)
+ log.warn("Transaction still associated with thread " + tx + " at status " + TxUtils.getStatusAsString(tx.getStatus()));
+ }
+
+ if (suspendedTx != null)
+ {
+ tm.resume(suspendedTx);
+ }
+
+ for (int i = 0; i < nodeCount; i++)
+ {
+ try
+ {
+ if (ServerManagement.isStarted(i))
+ {
+ ServerManagement.log(ServerManagement.INFO, "Undeploying Server " + i, i);
+
+ ServerManagement.stop(i);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to stop server", e);
+ }
+ }
+
+ for (int i = 1; i < nodeCount; i++)
+ {
+ try
+ {
+ ServerManagement.kill(i);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to kill server", e);
+ }
+ }
+
+ sc.uninstallJMSProviderAdaptor("adaptor1");
+
+ sc.stopRecoveryManager();
+
+ sc.stop();
+
+ super.tearDown();
+ }
+
+ public void testRecoveryOnSend() throws Exception
+ {
+ XAConnection conn0 = null;
+
+ XAConnection conn1 = null;
+
+ Connection conn2 = null;
+
+ Connection conn3 = null;
+
+ try
+ {
+ conn0 = cf0.createXAConnection();
+
+ XASession sess0 = conn0.createXASession();
+
+ MessageProducer prod0 = sess0.createProducer(queue0);
+
+ XAResource res0 = sess0.getXAResource();
+
+
+ conn1 = cf1.createXAConnection();
+
+ XASession sess1 = conn1.createXASession();
+
+ MessageProducer prod1 = sess1.createProducer(queue1);
+
+ XAResource res1 = sess1.getXAResource();
+
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res0);
+
+ tx.enlistResource(res1);
+
+
+ TextMessage tm0 = sess0.createTextMessage("message0");
+
+ prod0.send(tm0);
+
+
+ TextMessage tm1 = sess1.createTextMessage("message1");
+
+ prod1.send(tm1);
+
+
+ // Poison server 1 so it crashes on commit of dest but after prepare
+
+ //This means the transaction branch on source will get commmitted
+ //but the branch on dest won't be - it will remain prepared
+ //This corresponds to a HeuristicMixedException
+
+ ServerManagement.poisonTheServer(1, PoisonInterceptor.TYPE_2PC_COMMIT);
+
+ log.info("Poisoned server");
+
+ tx.delistResource(res0, XAResource.TMSUCCESS);
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+
+ tx.commit();
+
+ conn0.close();
+
+ conn1.close();
+
+ //Now restart the server
+
+ log.info("Restarting server");
+
+ ServerManagement.start(1, "all", false);
+
+ log.info("Restarted server");
+
+ Thread.sleep(5000);
+
+ ServerManagement.deployQueue("queue1", 1);
+
+ Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+
+ InitialContext ic1 = new InitialContext(props1);
+
+ cf1 = (XAConnectionFactory)ic1.lookup("/XAConnectionFactory");
+
+ queue1 = (Queue)ic1.lookup("/queue/queue1");
+
+
+ conn2 = ((ConnectionFactory)cf0).createConnection();
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue0);
+
+ conn2.start();
+
+ TextMessage rm0 = (TextMessage)cons2.receive(2000);
+
+ assertNotNull(rm0);
+
+ assertEquals(tm0.getText(), rm0.getText());
+
+ Message m = cons2.receive(2000);
+
+ assertNull(m);
+
+ //Now even though the commit on the second server failed since the server was dead, the recovery manager should kick in
+ //eventually and recover it.
+
+ conn3 = ((ConnectionFactory)cf1).createConnection();
+
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons3 = sess3.createConsumer(queue1);
+
+ conn3.start();
+
+ TextMessage rm1 = (TextMessage)cons3.receive(60000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ m = cons3.receive(2000);
+
+ assertNull(m);
+
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+ }
+ }
+
+ public void testRecoveryOnAck() throws Exception
+ {
+ XAConnection conn0 = null;
+
+ XAConnection conn1 = null;
+
+ Connection conn2 = null;
+
+ Connection conn3 = null;
+
+ try
+ {
+ conn0 = cf0.createXAConnection();
+
+ XASession sess0 = conn0.createXASession();
+
+ MessageProducer prod0 = sess0.createProducer(queue0);
+
+ XAResource res0 = sess0.getXAResource();
+
+
+ conn1 = cf1.createXAConnection();
+
+ XASession sess1 = conn1.createXASession();
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ XAResource res1 = sess1.getXAResource();
+
+ conn1.start();
+
+
+
+ //first send a few messages to server 1
+
+ conn2 = ((ConnectionFactory)cf1).createConnection();
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod2 = sess2.createProducer(queue1);
+
+ TextMessage tm1 = sess1.createTextMessage("message1");
+
+ prod2.send(tm1);
+
+ TextMessage tm2 = sess1.createTextMessage("message2");
+
+ prod2.send(tm2);
+
+ conn2.close();
+
+
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res0);
+
+ tx.enlistResource(res1);
+
+
+ TextMessage tm0 = sess0.createTextMessage("message0");
+
+ prod0.send(tm0);
+
+ //Consume one of the messages on dest
+
+ TextMessage rm1 = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+
+
+ // Poison server 1 so it crashes on commit of dest but after prepare
+
+ //This means the transaction branch on source will get commmitted
+ //but the branch on dest won't be - it will remain prepared
+ //This corresponds to a HeuristicMixedException
+
+ ServerManagement.poisonTheServer(1, PoisonInterceptor.TYPE_2PC_COMMIT);
+
+ log.info("Poisoned server");
+
+ tx.delistResource(res0, XAResource.TMSUCCESS);
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+
+ tx.commit();
+
+ conn0.close();
+
+ conn1.close();
+
+ //Now restart the server
+
+ log.info("Restarting server");
+
+ ServerManagement.start(1, "all", false);
+
+ log.info("Restarted server");
+
+ Thread.sleep(5000);
+
+ ServerManagement.deployQueue("queue1", 1);
+
+ Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+
+ InitialContext ic1 = new InitialContext(props1);
+
+ cf1 = (XAConnectionFactory)ic1.lookup("/XAConnectionFactory");
+
+ queue1 = (Queue)ic1.lookup("/queue/queue1");
+
+
+ conn2 = ((ConnectionFactory)cf0).createConnection();
+
+ sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue0);
+
+ conn2.start();
+
+ TextMessage rm0 = (TextMessage)cons2.receive(2000);
+
+ assertNotNull(rm0);
+
+ assertEquals(tm0.getText(), rm0.getText());
+
+ Message m = cons2.receive(2000);
+
+ assertNull(m);
+
+ //Now even though the commit on the second server failed since the server was dead, the recovery manager should kick in
+ //eventually and recover it.
+
+ conn3 = ((ConnectionFactory)cf1).createConnection();
+
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons3 = sess3.createConsumer(queue1);
+
+ conn3.start();
+
+ TextMessage rm2 = (TextMessage)cons3.receive(60000);
+
+ assertNotNull(rm2);
+
+ //tm1 should have been acked on recovery
+
+ assertEquals(tm2.getText(), rm2.getText());
+
+ m = cons3.receive(2000);
+
+ assertNull(m);
+
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+ }
+ }
+
+
+
+}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -36,7 +36,6 @@
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
-import javax.management.ObjectName;
import javax.naming.InitialContext;
import org.jboss.jms.server.bridge.Bridge;
@@ -352,6 +351,8 @@
msgs.add(tm.getText());
+ log.info("*** RECEIVED MESSAGE *** " + tm.getText());
+
count++;
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -21,14 +21,9 @@
*/
package org.jboss.test.messaging.jms.bridge;
-import java.util.Properties;
-
-import org.jboss.jms.jndi.JMSProviderAdapter;
import org.jboss.jms.server.bridge.Bridge;
import org.jboss.logging.Logger;
import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.test.messaging.tools.TestJMSProviderAdaptor;
-import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -53,36 +48,13 @@
fail("Test should only be run in a remote configuration");
}
- useArjuna = true;
+ useArjuna = false;
- super.setUp();
-
- //Now install local JMSProviderAdaptor classes
-
- Properties props0 = new Properties();
- props0.putAll(ServerManagement.getJNDIEnvironment(0));
-
- Properties props1 = new Properties();
- props1.putAll(ServerManagement.getJNDIEnvironment(1));
-
- JMSProviderAdapter sourceAdaptor =
- new TestJMSProviderAdaptor(props0, "/XAConnectionFactory", "adaptor1");
- JMSProviderAdapter targetAdaptor =
- new TestJMSProviderAdaptor(props1, "/XAConnectionFactory", "adaptor2");
-
- sc.installJMSProviderAdaptor("adaptor1", sourceAdaptor);
- sc.installJMSProviderAdaptor("adaptor2", targetAdaptor);
-
- sc.startRecoveryManager();
+ super.setUp();
}
protected void tearDown() throws Exception
{
- sc.stopRecoveryManager();
-
- sc.uninstallJMSProviderAdaptor("adaptor1");
- sc.uninstallJMSProviderAdaptor("adaptor2");
-
super.tearDown();
log.debug(this + " torn down");
@@ -138,18 +110,6 @@
testCrashAndReconnectDestCrashBeforePrepare(false);
}
- // Note this test will fail until http://jira.jboss.com/jira/browse/JBTM-192 is complete
- public void x_testCrashAndReconnectDestCrashOnCommit_P() throws Exception
- {
- testCrashAndReconnectDestCrashOnCommit(true);
- }
-
- // Note this test will fail until http://jira.jboss.com/jira/browse/JBTM-192 is complete
- public void x_testCrashAndReconnectDestCrashOnCommit_NP() throws Exception
- {
- testCrashAndReconnectDestCrashOnCommit(false);
- }
-
/*
* Send some messages
* Crash the destination server
@@ -266,13 +226,11 @@
//Send some messages
this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent);
-
-
+
//verify none are received
this.checkNoneReceived(cf1, destQueue, 2000);
-
-
+
//Now crash the dest server
log.info("About to crash server");
@@ -321,114 +279,6 @@
}
}
- /*
- * Send some messages
- * Crash the server after prepare but on commit
- * Bring up the destination server
- * Send some more messages
- * Verify all messages are received
- */
- private void testCrashAndReconnectDestCrashOnCommit(boolean persistent) throws Exception
- {
- Bridge bridge = null;
-
- try
- {
- setUpAdministeredObjects(true);
-
- final int NUM_MESSAGES = 10;
-
- bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
- null, null, null, null,
- null, 1000, -1, Bridge.QOS_ONCE_AND_ONLY_ONCE,
- NUM_MESSAGES, 5000,
- null, null);
-
- bridge.start();
-
- //Send some messages
-
- sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent);
-
-
- //Verify none are received
-
- checkNoneReceived(cf1, destQueue, 2000);
-
-
- //Poison server 1 so it crashes on commit of dest but after prepare
-
- //This means the transaction branch on source will get commmitted
- //but the branch on dest won't be - it will remain prepared
- //This corresponds to a HeuristicMixedException
-
- ServerManagement.poisonTheServer(1, PoisonInterceptor.TYPE_2PC_COMMIT);
-
- log.info("Poisoned server");
-
-
- //Wait for maxBatchTime to kick in so a batch is sent
- //This should cause the server to crash after prepare but before commit
-
- //Also the wait must be enough to allow transaction recovery to kick in
- //Since there will be a heuristically prepared branch on the consumer that needs to be rolled
- //back
-
- Thread.sleep(10000);
-
- //Restart the server
-
- log.info("Restarting server");
-
- ServerManagement.start(1, "all", false);
-
- log.info("Restarted server");
-
- ServerManagement.deployQueue("destQueue", 1);
-
- //Give enough time for transaction recovery to happen
- Thread.sleep(20000);
-
- log.info("Deployed queue");
-
- log.info("Slept");
-
- setUpAdministeredObjects(false);
-
-
- //Send some more messages
-
- this.sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent);
-
- checkMessagesReceived(cf1, destQueue, Bridge.QOS_ONCE_AND_ONLY_ONCE, NUM_MESSAGES);
-
- //Make sure no messages are left in the source dest
-
- this.checkNoneReceived(cf0, sourceQueue, 5000);
-
- log.info("Got here");
-
- }
- finally
- {
- log.info("In finally");
-
- if (bridge != null)
- {
- try
- {
- bridge.stop();
- }
- catch (Exception e)
- {
- log.error("Failed to stop bridge", e);
- }
- }
- }
- }
-
-
-
// Inner classes -------------------------------------------------------------------
}
Added: trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectWithRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectWithRecoveryTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectWithRecoveryTest.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -0,0 +1,178 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.bridge;
+
+import java.util.Properties;
+
+import org.jboss.jms.jndi.JMSProviderAdapter;
+import org.jboss.jms.server.bridge.Bridge;
+import org.jboss.logging.Logger;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.TestJMSProviderAdaptor;
+import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class ReconnectWithRecoveryTest extends BridgeTestBase
+{
+ private static final Logger log = Logger.getLogger(ReconnectTest.class);
+
+ public ReconnectWithRecoveryTest(String name)
+ {
+ super(name);
+ }
+
+ protected void setUp() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ fail("Test should only be run in a remote configuration");
+ }
+
+ useArjuna = true;
+
+ super.setUp();
+
+ //Now install local JMSProviderAdaptor classes
+
+ Properties props1 = new Properties();
+ props1.putAll(ServerManagement.getJNDIEnvironment(1));
+
+ JMSProviderAdapter targetAdaptor =
+ new TestJMSProviderAdaptor(props1, "/XAConnectionFactory", "adaptor1");
+
+ sc.installJMSProviderAdaptor("adaptor1", targetAdaptor);
+
+ sc.startRecoveryManager();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ sc.stopRecoveryManager();
+
+ sc.uninstallJMSProviderAdaptor("adaptor1");
+
+ super.tearDown();
+
+ log.debug(this + " torn down");
+ }
+
+ /*
+ * Send some messages
+ * Crash the server after prepare but on commit
+ * Bring up the destination server
+ * Send some more messages
+ * Verify all messages are received
+ */
+ public void testCrashAndReconnectDestCrashOnCommit() throws Exception
+ {
+ Bridge bridge = null;
+
+ try
+ {
+ setUpAdministeredObjects(true);
+
+ final int NUM_MESSAGES = 10;
+
+ bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+ null, null, null, null,
+ null, 1000, -1, Bridge.QOS_ONCE_AND_ONLY_ONCE,
+ NUM_MESSAGES, -1,
+ null, null);
+
+ bridge.start();
+
+ //Poison server 1 so it crashes on commit of dest but after prepare
+
+ //This means the transaction branch on source will get commmitted
+ //but the branch on dest won't be - it will remain prepared
+ //This corresponds to a HeuristicMixedException
+
+ ServerManagement.poisonTheServer(1, PoisonInterceptor.TYPE_2PC_COMMIT);
+
+ log.info("Poisoned server");
+
+ //Send some messages
+
+ sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES, true);
+
+ //Restart the server
+
+ //Wait a bit for the batch to be sent - this should cause the server to crash on commit
+
+ Thread.sleep(3000);
+
+ log.info("Restarting server");
+
+ ServerManagement.start(1, "all", false);
+
+ log.info("Restarted server");
+
+ ServerManagement.deployQueue("destQueue", 1);
+
+ log.info("Deployed queue");
+
+ //Give enough time for transaction recovery to happen
+ Thread.sleep(45000);
+
+ log.info("Slept");
+
+ setUpAdministeredObjects(false);
+
+ checkMessagesReceived(cf1, destQueue, Bridge.QOS_ONCE_AND_ONLY_ONCE, NUM_MESSAGES);
+
+ //Make sure no messages are left in the source dest
+
+ this.checkNoneReceived(cf0, sourceQueue, 5000);
+
+ log.info("Got here");
+
+ }
+ finally
+ {
+ log.info("In finally");
+
+ if (bridge != null)
+ {
+ try
+ {
+ bridge.stop();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to stop bridge", e);
+ }
+ }
+ }
+ }
+
+
+
+ // Inner classes -------------------------------------------------------------------
+
+}
+
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -1076,14 +1076,11 @@
// if failover happened, this object was replaced
assertNotSame(originalRemoting, delegate.getRemotingConnection());
- //System.out.println("Kill server1"); Thread.sleep(10000);
-
message = session.createTextMessage("Hello After");
log.info(">>Sending new message");
producer.send(message);
assertEquals(txID, sessionState.getCurrentTxId());
- System.out.println("TransactionID on client = " + txID);
log.info(">>Final commit");
session.commit();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/ObjectMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/ObjectMessageTest.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/ObjectMessageTest.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -150,7 +150,6 @@
if (!itemLocation.equals(classLocation) &&
itemLocation.toString().indexOf(pathIgnore) >= 0)
{
- //System.out.println("Location:" + itemLocation);
urls.add(itemLocation);
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -37,7 +37,6 @@
import javax.management.ObjectName;
import javax.transaction.UserTransaction;
-import org.jboss.jms.jndi.JMSProviderAdapter;
import org.jboss.jms.server.DestinationManager;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.contract.MessageStore;
@@ -436,8 +435,6 @@
String classPath = System.getProperty("java.class.path");
- //System.out.println("CLASSPATH: " + classPath);
-
if (System.getProperty("os.name").equals("Linux"))
{
sb.append("-cp").append(" ").append(classPath).append(" ");
@@ -458,8 +455,6 @@
String commandLine = sb.toString();
- //System.out.println(commandLine);
-
Process process = Runtime.getRuntime().exec(commandLine);
log.trace("process: " + process);
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -21,7 +21,6 @@
*/
package org.jboss.test.messaging.tools.jmx;
-
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
@@ -33,7 +32,6 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
@@ -90,13 +88,11 @@
import org.jboss.tm.TransactionManagerService;
import org.jboss.tm.TxManager;
import org.jboss.tm.usertx.client.ServerVMClientUserTransaction;
-import org.w3c.dom.Attr;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
-
/**
* An MBeanServer and a configurable set of services (TransactionManager, Remoting, etc) available
* for testing.
@@ -474,7 +470,6 @@
String transport = config.getRemotingTransport();
log.info("Remoting type: .............. " + (remoting ? transport : "DISABLED"));
- log.info("Database: ................... " + config.getDatabaseType());
log.info("Clustering mode: ............ " +
(this.isClustered() ? "CLUSTERED" : "NON-CLUSTERED"));
@@ -624,7 +619,7 @@
{
String databaseName = getDatabaseName();
- if (clustered && !getDatabaseType().equals("hsqldb"))
+ if (clustered && !getDatabaseName().equals("hsqldb"))
{
return "server/default/deploy/clustered-" + databaseName + "-persistence-service.xml";
}
@@ -883,11 +878,6 @@
return config.getDatabaseName();
}
- public String getDatabaseType()
- {
- return config.getDatabaseType();
- }
-
public String getRemotingTransport()
{
return config.getRemotingTransport();
@@ -1041,13 +1031,13 @@
private void startInVMDatabase() throws Exception
{
- if (!"hsqldb".equals(config.getDatabaseType()))
+ if (!"hsqldb".equals(config.getDatabaseName()))
{
// is an out-of-process DB, and it must be stared externally
return;
}
- log.debug("starting " + config.getDatabaseType() + " in-VM");
+ log.debug("starting " + config.getDatabaseName() + " in-VM");
String url = config.getDatabaseConnectionURL();
HsqlProperties props = new HsqlProperties();
@@ -1064,12 +1054,12 @@
hsqldbServer.setProperties(props);
hsqldbServer.start();
- log.debug("started " + config.getDatabaseType() + " in-VM");
+ log.debug("started " + config.getDatabaseName() + " in-VM");
}
private void stopInVMDatabase() throws Exception
{
- if (!"hsqldb".equals(config.getDatabaseType()))
+ if (!"hsqldb".equals(config.getDatabaseName()))
{
// is an out-of-process DB, and it must be stopped externally
return;
@@ -1184,11 +1174,11 @@
{
LocalManagedConnectionFactory mcf = new LocalManagedConnectionFactory();
-
-log.info("connection url:" + config.getDatabaseConnectionURL());
-log.info("driver:" + config.getDatabaseConnectionURL());
-log.info("username:" + config.getDatabaseUserName());
-log.info("password:" + config.getDatabasePassword());
+ log.info("connection url:" + config.getDatabaseConnectionURL());
+ log.info("driver:" + config.getDatabaseConnectionURL());
+ log.info("username:" + config.getDatabaseUserName());
+ log.info("password:" + config.getDatabasePassword());
+
mcf.setConnectionURL(config.getDatabaseConnectionURL());
mcf.setDriverClass(config.getDatabaseDriverClass());
mcf.setUserName(config.getDatabaseUserName());
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainerConfiguration.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainerConfiguration.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainerConfiguration.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -94,17 +94,6 @@
return database;
}
- /**
- * @return the token that follows after jdbc: in the database URL. So far, we know of
- * "hsqldb", "mysql", "oracle", "postgresql".
- *
- */
- public String getDatabaseType()
- {
- DatabaseConfiguration dbc = (DatabaseConfiguration)dbConfigurations.get(database);
- return dbc.getDatabaseType();
- }
-
public String getDatabaseConnectionURL()
{
DatabaseConfiguration dbc = (DatabaseConfiguration)dbConfigurations.get(database);
@@ -366,7 +355,6 @@
private class DatabaseConfiguration
{
private String connectionURL;
- private String type;
private String driverClass;
private String transactionIsolation;
private String username;
@@ -385,11 +373,6 @@
throw new IllegalArgumentException("Invalid connection URL: " + s);
}
- //FIXME - why the heck try and infer the database name from the connection URL
- //this is not correct
- //Different databases could be using the same url (this can be the case with sybase and mssql)
- //why not just get the database name from the system property??
- this.type = st.nextToken();
this.connectionURL = s;
}
@@ -398,26 +381,6 @@
return connectionURL;
}
- String getDatabaseType()
- {
- //Temporary kludge so I can run the test suite
- //FIXME - why the heck try and infer the database name from the connection URL
- //this is not correct
- //Different databases could be using the same url (this can be the case with sybase and mssql)
- //why not just get the database name from the system property??
-
- String theType = System.getProperty("test.database");
-
- if (theType != null)
- {
- return theType;
- }
- else
- {
- return type;
- }
- }
-
void setDatabaseDriverClass(String s)
{
this.driverClass = s;
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -139,7 +139,7 @@
sc = new ServiceContainer(containerConfig, null, serverIndex);
sc.start(clearDatabase, attrOverrides);
- if (sc.getDatabaseType().equals("hsqldb") && sc.isClustered())
+ if (sc.getDatabaseName().equals("hsqldb") && sc.isClustered())
{
throw new IllegalStateException("The test server cannot be started in clustered mode with hsqldb as a database - must use a shared database");
}
Modified: trunk/tests/src/org/jboss/test/messaging/util/XMLUtilTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/util/XMLUtilTest.java 2007-06-12 13:31:30 UTC (rev 2773)
+++ trunk/tests/src/org/jboss/test/messaging/util/XMLUtilTest.java 2007-06-12 22:43:54 UTC (rev 2774)
@@ -213,7 +213,6 @@
Element e = XMLUtil.stringToElement(s);
String tostring = XMLUtil.elementToString(e);
Element convertedAgain = XMLUtil.stringToElement(tostring);
- //System.out.println(tostring);
XMLUtil.assertEquivalent(e, convertedAgain);
}
@@ -222,7 +221,6 @@
String s = "<a b=\"something\"></a>";
Element e = XMLUtil.stringToElement(s);
String tostring = XMLUtil.elementToString(e);
- //System.out.println(tostring);
Element convertedAgain = XMLUtil.stringToElement(tostring);
XMLUtil.assertEquivalent(e, convertedAgain);
}
@@ -232,7 +230,6 @@
String s = "<a b=\"something\"/>";
Element e = XMLUtil.stringToElement(s);
String tostring = XMLUtil.elementToString(e);
- //System.out.println(tostring);
Element convertedAgain = XMLUtil.stringToElement(tostring);
XMLUtil.assertEquivalent(e, convertedAgain);
}
More information about the jboss-cvs-commits
mailing list