[jboss-cvs] JBoss Messaging SVN: r1993 - in trunk: src/main/org/jboss/jms/client/container and 17 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 19 13:45:03 EST 2007
Author: timfox
Date: 2007-01-19 13:45:03 -0500 (Fri, 19 Jan 2007)
New Revision: 1993
Added:
trunk/tests/src/org/jboss/test/messaging/util/prioritylinkedlist/
trunk/tests/src/org/jboss/test/messaging/util/prioritylinkedlist/PriorityLinkedListTest.java
Removed:
trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java
trunk/src/main/org/jboss/messaging/core/refqueue/BasicPrioritizedDeque.java
trunk/src/main/org/jboss/messaging/core/refqueue/PrioritizedDeque.java
trunk/src/main/org/jboss/messaging/core/refqueue/PrioritizedDequeIterator.java
trunk/tests/src/org/jboss/test/messaging/core/refqueue/
trunk/tests/src/org/jboss/test/messaging/util/prioritylinkedlist/PrioritizedReferenceQueueTest.java
Modified:
trunk/src/etc/aop-messaging-client.xml
trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java
trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
trunk/src/main/org/jboss/jms/server/bridge/BridgeService.java
trunk/src/main/org/jboss/jms/tx/ResourceManager.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
trunk/tests/build.xml
trunk/tests/etc/jbossjta-properties.xml
trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
trunk/tests/src/org/jboss/test/messaging/jms/XATest.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
trunk/tests/src/org/jboss/test/messaging/jms/message/JMSPriorityHeaderTest.java
trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
trunk/tests/src/org/jboss/test/messaging/jms/server/connectionfactory/ConnectionFactoryTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-628 http://jira.jboss.com/jira/browse/JBMESSAGING-616 http://jira.jboss.com/jira/browse/JBMESSAGING-700
Modified: trunk/src/etc/aop-messaging-client.xml
===================================================================
--- trunk/src/etc/aop-messaging-client.xml 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/etc/aop-messaging-client.xml 2007-01-19 18:45:03 UTC (rev 1993)
@@ -24,10 +24,12 @@
Clustered ConnectionFactory Stack
-->
+ <!-- By default no exception interceptor
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate->$implementing{org.jboss.jms.delegate.ConnectionFactoryDelegate}(..))">
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
</bind>
+ -->
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate->createConnectionDelegate(..))">
<advice name="handleCreateConnectionDelegate" aspect="org.jboss.jms.client.container.ClusteringAspect"/>
</bind>
@@ -36,10 +38,12 @@
(Non-clustered) ConnectionFactory Stack
-->
+ <!-- By default no client log or exception interceptor
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate->$implementing{org.jboss.jms.delegate.ConnectionFactoryDelegate}(..))">
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
</bind>
+ -->
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate->createConnectionDelegate(..))">
<advice name="handleCreateConnectionDelegate" aspect="org.jboss.jms.client.container.StateCreationAspect"/>
</bind>
@@ -49,9 +53,9 @@
-->
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->$implementing{org.jboss.jms.delegate.ConnectionDelegate}(..))">
- <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
+ <!-- <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/> -->
<interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
+ <!-- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/> -->
<interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->start())">
@@ -99,9 +103,9 @@
-->
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->$implementing{org.jboss.jms.delegate.SessionDelegate}(..))">
- <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
+ <!-- <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/> -->
<interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
+ <!-- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/> -->
<interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createMessage())">
@@ -197,9 +201,9 @@
-->
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->$implementing{org.jboss.jms.delegate.ConsumerDelegate}(..))">
- <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
+ <!-- <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/> -->
<interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
+ <!-- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/> -->
<interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getMessageListener())">
@@ -232,9 +236,9 @@
Producer Stack
-->
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->$implementing{org.jboss.jms.delegate.ProducerDelegate}(..))">
- <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
+ <!-- <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/> -->
<interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
+ <!-- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/> -->
<interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientProducerDelegate->getDeliveryMode())">
@@ -287,9 +291,9 @@
Browser Stack
-->
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->$implementing{org.jboss.jms.delegate.BrowserDelegate}(..))">
- <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
+ <!-- <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/> -->
<interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
+ <!-- <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/> -->
<interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->nextMessage())">
Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -138,35 +138,30 @@
// delivery count information from client to server. We could just do this on the server but
// we would lose delivery count info.
- // CLIENT_ACKNOWLEDGE cannot be used with MDBs so is always safe to cancel on this session
+ // CLIENT_ACKNOWLEDGE cannot be used with MDBs (i.e. no connection consumer)
+ // so is always safe to cancel on this session
- List cancels = new ArrayList();
+ cancelDeliveries(del, state.getClientAckList());
- for(Iterator i = state.getClientAckList().iterator(); i.hasNext(); )
- {
- DeliveryInfo ack = (DeliveryInfo)i.next();
-
- DefaultCancel cancel = new DefaultCancel(ack.getMessageProxy().getDeliveryId(),
- ack.getMessageProxy().getDeliveryCount(),
- false, false);
-
- cancels.add(cancel);
- }
+ state.getClientAckList().clear();
+ }
+ else if (state.isTransacted() && !state.isXA())
+ {
+ //We need to explicitly cancel any deliveries back to the server
+ //from the resource manager, otherwise delivery count won't be updated
- if (!cancels.isEmpty())
- {
- del.cancelDeliveries(cancels);
- }
+ ConnectionState connState = (ConnectionState)state.getParent();
- state.getClientAckList().clear();
+ ResourceManager rm = connState.getResourceManager();
+
+ List dels = rm.getDeliveriesForSession(state.getSessionID());
+
+ cancelDeliveries(del, dels);
}
- //TODO - we should also cancel any deliveries remaining in any transaction for the session
- //so the delivery count gets updated to the server, and not rely on the server side close
- //cancelling them
return invocation.invokeNext();
- }
+ }
public Object handleClose(Invocation invocation) throws Throwable
{
@@ -599,6 +594,27 @@
sessionToUse.cancelDelivery(new DefaultCancel(delivery.getDeliveryID(),
delivery.getMessageProxy().getDeliveryCount(), false, false));
}
+
+ private void cancelDeliveries(SessionDelegate del, List deliveryInfos) throws Exception
+ {
+ List cancels = new ArrayList();
+
+ for (Iterator i = deliveryInfos.iterator(); i.hasNext(); )
+ {
+ DeliveryInfo ack = (DeliveryInfo)i.next();
+
+ DefaultCancel cancel = new DefaultCancel(ack.getMessageProxy().getDeliveryId(),
+ ack.getMessageProxy().getDeliveryCount(),
+ false, false);
+
+ cancels.add(cancel);
+ }
+
+ if (!cancels.isEmpty())
+ {
+ del.cancelDeliveries(cancels);
+ }
+ }
// Inner Classes -------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -51,6 +51,7 @@
private int bufferSize;
private int maxDeliveries;
private long channelID;
+ private boolean usePriorityConsumerQueue;
// Static ---------------------------------------------------------------------------------------
@@ -212,7 +213,7 @@
{
return channelID;
}
-
+
// Protected ------------------------------------------------------------------------------------
protected Client getClient()
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackHandler.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -31,6 +31,6 @@
public interface CallbackHandler
{
- void handleMessage(Object message);
+ void handleMessage(Object message) throws Exception;
}
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -103,7 +103,15 @@
return;
}
- handler.handleMessage(msg);
+ try
+ {
+ handler.handleMessage(msg);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle message", e);
+ throw new HandleCallbackException(e.getMessage(), e);
+ }
}
else if (parameter instanceof ConnectionFactoryUpdateMessage)
{
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -23,7 +23,6 @@
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import javax.jms.IllegalStateException;
@@ -40,6 +39,8 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.util.Future;
+import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
+import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -198,7 +199,14 @@
// Attributes ----------------------------------------------------
- private LinkedList buffer;
+ /*
+ * The buffer is now a priority linked list
+ * This resolves problems whereby messages are delivered from the server side queue in
+ * correct priority order, but because the old consumer list was not a priority list
+ * then if messages were sitting waiting to be consumed on the client side, then higher
+ * priority messages might be behind lower priority messages and thus get consumed out of order
+ */
+ private PriorityLinkedList buffer;
private SessionDelegate sessionDelegate;
private ConsumerDelegate consumerDelegate;
private int consumerID;
@@ -232,7 +240,7 @@
this.maxBufferSize = bufferSize;
this.minBufferSize = bufferSize / 2;
- buffer = new LinkedList();
+ buffer = new BasicPriorityLinkedList(10);
isConnectionConsumer = isCC;
this.ackMode = ackMode;
this.sessionDelegate = sess;
@@ -252,7 +260,7 @@
* Handles a message sent from the server
* @param message The message
*/
- public void handleMessage(Object message)
+ public void handleMessage(Object message) throws Exception
{
MessageProxy msg = (MessageProxy) message;
@@ -272,7 +280,7 @@
msg.setReceived();
//Add it to the buffer
- buffer.add(msg);
+ buffer.addLast(msg, msg.getJMSPriority());
lastDeliveryId = msg.getDeliveryId();
@@ -530,11 +538,11 @@
this.consumerID = consumerId;
}
- public void addToFrontOfBuffer(MessageProxy proxy)
+ public void addToFrontOfBuffer(MessageProxy proxy) throws Exception
{
synchronized (mainLock)
{
- buffer.addFirst(proxy);
+ buffer.addFirst(proxy, proxy.getJMSPriority());
messageAdded();
}
Modified: trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java
===================================================================
--- trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -28,6 +28,10 @@
import java.util.Properties;
import java.util.StringTokenizer;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
import javax.transaction.xa.XAResource;
import org.jboss.logging.Logger;
@@ -35,11 +39,14 @@
import com.arjuna.ats.jta.recovery.XAResourceRecovery;
/**
- * @author <a href="adrian at jboss.com">Adrian Brock</a>
- * @author <a href="juha at jboss.com">Juha Lindfors</a>
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ *
+ * A BridgeXAResourceRecovery
*
- * @version $Revision: 1.1 $
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
*/
public class BridgeXAResourceRecovery implements XAResourceRecovery
{
@@ -47,13 +54,19 @@
private static final Logger log = Logger.getLogger(BridgeXAResourceRecovery.class);
- private BridgeXAResourceWrapper wrapper;
-
- private boolean working = false;
-
private Hashtable jndiProperties;
private String connectionFactoryLookup;
+
+ private boolean hasMore;
+
+ private String username;
+
+ private String password;
+
+ private XAConnection conn;
+
+ private XAResource res;
public BridgeXAResourceRecovery()
{
@@ -91,12 +104,16 @@
* provider1.jndi.prop3=zzzz
*
* provider1.xaconnectionfactorylookup=xyz
+ * provider1.username=bob
+ * provider1.password=blah
*
* provider2.jndi.prop1=xxxx
* provider2.jndi.prop2=yyyy
* provider2.jndi.prop3=zzzz
*
* provider2.xaconnectionfactorylookup=xyz
+ * provider2.username=xyz
+ * provider2.password=blah
*
*/
@@ -106,6 +123,10 @@
String cfKey = provider + ".xaconnectionfactorylookup";
+ String usernameKey = provider + ".username";
+
+ String passwordKey = provider + ".password";
+
jndiProperties = new Hashtable();
while (iter.hasNext())
@@ -125,6 +146,14 @@
{
connectionFactoryLookup = value;
}
+ else if (key.equals(usernameKey))
+ {
+ username = value;
+ }
+ else if (key.equals(passwordKey))
+ {
+ password = value;
+ }
}
if (connectionFactoryLookup == null)
@@ -135,6 +164,8 @@
if (log.isTraceEnabled()) { log.trace(this + " initialised"); }
+ hasMore = true;
+
return true;
}
catch (Exception e)
@@ -149,40 +180,114 @@
{
if (log.isTraceEnabled()) { log.trace(this + " hasMoreResources"); }
- // If the XAResource is already working
- if (working)
+ /*
+ * The way hasMoreResources is supposed to work is as follows:
+ * For each "sweep" the recovery manager will call hasMoreResources, then if it returns
+ * true it will call getXAResource.
+ * It will repeat that until hasMoreResources returns false.
+ * Then the sweep is over.
+ * For the next sweep hasMoreResources should return true, etc.
+ *
+ * In our case where we only need to return one XAResource per sweep,
+ * hasMoreResources should basically alternate between true and false.
+ *
+ * And we return a new XAResource every time it is called.
+ * This makes this resilient to failure, since if the network fails
+ * between the XAResource and it's server, on the next pass a new one will
+ * be create and if the server is back up it will work.
+ * This means there is no need for an XAResourceWrapper which is a technique used in the
+ * JMSProviderXAResourceRecovery
+ * The recovery manager will throw away the XAResource anyway after every sweep.
+ *
+ */
+
+ if (hasMore)
{
- log.info("Returning false");
- return false;
+ //Get a new XAResource
+
+ try
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (Exception ignore)
+ {
+ }
+
+ InitialContext ic = null;
+
+ try
+ {
+ ic = new InitialContext(jndiProperties);
+
+ XAConnectionFactory connectionFactory = (XAConnectionFactory)ic.lookup(connectionFactoryLookup);
+
+ if (username == null)
+ {
+ conn = connectionFactory.createXAConnection();
+ }
+ else
+ {
+ conn = connectionFactory.createXAConnection(username, password);
+ }
+
+ XASession session = conn.createXASession();
+
+ res = session.getXAResource();
+
+ //Note the connection is closed the next time the xaresource is created or by the finalizer
+
+ }
+ catch (Exception e)
+ {
+ log.warn("Cannot create XAResource", e);
+
+ hasMore = false;
+ }
+ finally
+ {
+ if (ic != null)
+ {
+ try
+ {
+ ic.close();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
+
}
-
- // Have we initialized yet?
- if (wrapper == null)
- {
- wrapper = new BridgeXAResourceWrapper(jndiProperties, connectionFactoryLookup);
- }
-
- // Test the connection
- try
- {
- wrapper.getTransactionTimeout();
- working = true;
- }
- catch (Exception ignored)
- {
- }
- log.info("Returning: " + working);
-
- // This will return false until we get a successful connection
- return working;
+ boolean ret = hasMore;
+
+ hasMore = !hasMore;
+
+ return ret;
}
public XAResource getXAResource()
{
if (log.isTraceEnabled()) { log.trace(this + " getXAResource"); }
- return wrapper;
+ return res;
}
+
+ protected void finalize()
+ {
+ try
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
}
Deleted: trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -1,446 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-package org.jboss.jms.recovery;
-
-import java.util.Hashtable;
-
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.XAConnection;
-import javax.jms.XAConnectionFactory;
-import javax.jms.XASession;
-import javax.naming.InitialContext;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.logging.Logger;
-
-/**
- * BridgeXAResourceWrapper.
- *
- * @author <a href="adrian at jboss.com">Adrian Brock</a>
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @version $Revision: 1.2 $
- */
-public class BridgeXAResourceWrapper implements XAResource, ExceptionListener
-{
- /** The log */
- private static final Logger log = Logger.getLogger(XAResourceWrapper.class);
-
- private boolean trace = log.isTraceEnabled();
-
- /** The state lock */
- private static final Object lock = new Object();
-
- /** The connection */
- private XAConnection connection;
-
- /** The delegate XAResource */
- private XAResource delegate;
-
- private Hashtable jndiProperties;
-
- private String connectionFactoryLookup;
-
- public BridgeXAResourceWrapper(Hashtable jndiProperties, String connectionFactoryLookup)
- {
- this.jndiProperties = jndiProperties;
-
- this.connectionFactoryLookup = connectionFactoryLookup;
- }
-
- public Xid[] recover(int flag) throws XAException
- {
- XAResource xaResource = getDelegate();
-
- if (trace) { log.trace(this + " Calling recover"); }
-
- try
- {
- return xaResource.recover(flag);
- }
- catch (XAException e)
- {
- log.info("Caught exception in recover", e);
- throw check(e);
- }
- catch (Exception e)
- {
- log.info("Caught e in recover", e);
- throw new RuntimeException(e.toString());
- }
- }
-
- public void commit(Xid xid, boolean onePhase) throws XAException
- {
- XAResource xaResource = getDelegate();
-
- if (trace) { log.trace(this + " Calling commit"); }
-
- try
- {
- xaResource.commit(xid, onePhase);
- }
- catch (XAException e)
- {
- log.info("Caught exception in commit", e);
- throw check(e);
- }
- catch (Exception e)
- {
- log.info("Caught e in commit", e);
- throw new RuntimeException(e.toString());
- }
- }
-
- public void rollback(Xid xid) throws XAException
- {
- XAResource xaResource = getDelegate();
-
- if (trace) { log.trace(this + " Calling rollback"); }
-
- try
- {
- xaResource.rollback(xid);
- }
- catch (XAException e)
- {
- log.info("Caught exception in rollback", e);
- throw check(e);
- }
- catch (Exception e)
- {
- log.info("Caught e in rollback", e);
- throw new RuntimeException(e.toString());
- }
- }
-
- public void forget(Xid xid) throws XAException
- {
- XAResource xaResource = getDelegate();
-
- if (trace) { log.trace(this + " Calling forget"); }
-
-
- try
- {
- xaResource.forget(xid);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- catch (Exception e)
- {
- log.info("Caught e in forget", e);
- throw new RuntimeException(e.toString());
- }
- }
-
- public boolean isSameRM(XAResource xaRes) throws XAException
- {
- if (xaRes instanceof XAResourceWrapper)
- xaRes = ((XAResourceWrapper) xaRes).getDelegate();
-
- if (trace) { log.trace(this + " Calling isSameRM"); }
-
-
- XAResource xaResource = getDelegate();
- try
- {
- return xaResource.isSameRM(xaRes);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- catch (Exception e)
- {
- log.info("Caught e in issamerm", e);
- throw new RuntimeException(e.toString());
- }
- }
-
- public int prepare(Xid xid) throws XAException
- {
- XAResource xaResource = getDelegate();
-
- if (trace) { log.trace(this + " Calling prepare"); }
-
-
- try
- {
- return xaResource.prepare(xid);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- catch (Exception e)
- {
- log.info("Caught e in prepare", e);
- throw new RuntimeException(e.toString());
- }
- }
-
- public void start(Xid xid, int flags) throws XAException
- {
- XAResource xaResource = getDelegate();
-
- if (trace) { log.trace(this + " Calling start"); }
-
- try
- {
- xaResource.start(xid, flags);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- catch (Exception e)
- {
- log.info("Caught e in start", e);
- throw new RuntimeException(e.toString());
- }
- }
-
- public void end(Xid xid, int flags) throws XAException
- {
- XAResource xaResource = getDelegate();
-
- if (trace) { log.trace(this + " Calling end"); }
-
- try
- {
- xaResource.end(xid, flags);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- catch (Exception e)
- {
- log.info("Caught e in end", e);
- throw new RuntimeException(e.toString());
- }
- }
-
- public int getTransactionTimeout() throws XAException
- {
- XAResource xaResource = getDelegate();
-
- if (trace) { log.trace(this + " Calling getTransactionTimeout"); }
-
- try
- {
- return xaResource.getTransactionTimeout();
- }
- catch (XAException e)
- {
- throw check(e);
- }
- catch (Exception e)
- {
- log.info("Caught e in getTransactiontimeoiut", e);
- throw new RuntimeException(e.toString());
- }
- }
-
- public boolean setTransactionTimeout(int seconds) throws XAException
- {
- XAResource xaResource = getDelegate();
-
- if (trace) { log.trace(this + " Calling setTransactionTimeout"); }
- try
- {
- return xaResource.setTransactionTimeout(seconds);
- }
- catch (XAException e)
- {
- throw check(e);
- }
- catch (Exception e)
- {
- log.info("Caught e in settranactiotntimeoiut", e);
- throw new RuntimeException(e.toString());
- }
- }
-
- public void onException(JMSException exception)
- {
- log.warn("Notified of connection failure in recovery delegate", exception);
- close();
- }
-
- /**
- * Get the delegate XAResource
- *
- * @return the delegate
- * @throws XAException for any problem
- */
- public XAResource getDelegate() throws XAException
- {
- XAResource result = null;
- Exception error = null;
- try
- {
- result = connect();
- }
- catch (Exception e)
- {
- error = e;
- }
-
- if (result == null)
- {
- XAException xae = new XAException("Error trying to connect");
- xae.errorCode = XAException.XAER_RMERR;
- if (error != null)
- xae.initCause(error);
- log.debug("Cannot get delegate XAResource", xae);
- throw xae;
- }
-
- return result;
- }
-
- /**
- * Connect to the server if not already done so
- *
- * @return the delegate XAResource
- * @throws Exception for any problem
- */
- protected XAResource connect() throws Exception
- {
- // Do we already have a valid delegate?
- synchronized (lock)
- {
- if (delegate != null)
- return delegate;
- }
-
- if (trace) { log.trace(this + " Connecting"); }
-
-
- // Create the connection
- XAConnection xaConnection = getConnectionFactory().createXAConnection();
- synchronized (lock)
- {
- connection = xaConnection;
- }
-
- // Retrieve the delegate XAResource
- try
- {
- XASession session = connection.createXASession();
- XAResource result = session.getXAResource();
- synchronized (lock)
- {
- delegate = result;
- }
- return delegate;
- }
- catch (Exception e)
- {
- close();
- throw e;
- }
- }
-
- /**
- * Get the XAConnectionFactory
- *
- * @return the connection
- * @throws Exception for any problem
- */
- protected XAConnectionFactory getConnectionFactory() throws Exception
- {
- InitialContext ic = null;
-
- try
- {
- ic = new InitialContext(jndiProperties);
-
- XAConnectionFactory connectionFactory = (XAConnectionFactory)ic.lookup(connectionFactoryLookup);
-
- return connectionFactory;
- }
- finally
- {
- if (ic != null)
- {
- ic.close();
- }
- }
- }
-
- /**
- * Close the connection
- */
- public void close()
- {
- if (trace) { log.trace(this + " Close"); }
-
- try
- {
- XAConnection oldConnection = null;
- synchronized (lock)
- {
- oldConnection = connection;
- connection = null;
- delegate = null;
- }
- if (oldConnection != null)
- oldConnection.close();
- }
- catch (Exception ignored)
- {
- log.trace("Ignored error during close", ignored);
- }
- }
-
- /**
- * Check whether an XAException is fatal. If it is an RM problem
- * we close the connection so the next call will reconnect.
- *
- * @param e the xa exception
- * @return never
- * @throws XAException always
- */
- protected XAException check(XAException e) throws XAException
- {
- if (trace) { log.trace(this + " check " + e); }
-
- if (e.errorCode == XAException.XAER_RMERR || e.errorCode == XAException.XAER_RMFAIL)
- {
- log.debug("Fatal error", e);
- close();
- }
- throw e;
- }
-
- protected void finalize() throws Throwable
- {
- close();
- }
-}
Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -42,6 +42,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.contract.MessagingComponent;
+import org.jboss.tm.TransactionManagerLocator;
import org.jboss.tm.TxManager;
/**
@@ -271,8 +272,27 @@
checkParams();
- boolean ok = setupJMSObjectsWithRetry();
+ TransactionManager tm = getTm();
+ //There may already be a JTA transaction associated to the thread
+
+ boolean ok;
+
+ Transaction toResume = null;
+ try
+ {
+ toResume = tm.suspend();
+
+ ok = setupJMSObjects();
+ }
+ finally
+ {
+ if (toResume != null)
+ {
+ tm.resume(toResume);
+ }
+ }
+
if (ok)
{
started = true;
@@ -735,10 +755,8 @@
{
if (tm == null)
{
- //tm = TransactionManagerLocator.getInstance().locate();
+ tm = TransactionManagerLocator.getInstance().locate();
- tm = com.arjuna.ats.jta.TransactionManager.transactionManager();
-
if (tm == null)
{
throw new IllegalStateException("Cannot locate a transaction manager");
Modified: trunk/src/main/org/jboss/jms/server/bridge/BridgeService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/BridgeService.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/jms/server/bridge/BridgeService.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -310,6 +310,8 @@
protected void startService() throws Exception
{
+ if (log.isTraceEnabled()) { log.trace("Starting bridge"); }
+
super.startService();
Properties sourceProps = null;
@@ -380,12 +382,18 @@
bridge.setDestConnectionFactoryFactory(destCff);
bridge.start();
+
+ if (log.isTraceEnabled()) { log.trace("Started bridge"); }
}
protected void stopService() throws Exception
{
+ if (log.isTraceEnabled()) { log.trace("Stopping bridge"); }
+
bridge.stop();
+
+ if (log.isTraceEnabled()) { log.trace("Stopped bridge"); }
}
// Private ---------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -562,9 +562,6 @@
/*
* Rollback has occurred so we need to redeliver any unacked messages corresponding to the acks
* is in the transaction.
- * NOTE! We only do this for 1PC rollback - for 2PC rollback we MUST rollback on the server
- * but if we do this we cannot redeliver locally since then we might get the same message
- * delievered twice. Therefore we must not redeliver locally.
*
*/
private void redeliverMessages(ClientTransaction ts) throws JMSException
@@ -606,14 +603,21 @@
{
connection.sendTransaction(request);
}
- catch (TransactionRolledBackException e)
- {
- throw new MessagingXAException(XAException.XA_RBROLLBACK, "An error occurred in sending transaction and the transaction was rolled back", e);
- }
catch (Throwable t)
{
//Catch anything else
- throw new MessagingXAException(XAException.XAER_RMERR, "A Throwable was caught in sending the transaction", t);
+
+ //We assume that any error is recoverable - and the recovery manager should retry again
+ //either after the network connection has been repaired (if that was the problem), or
+ //the server has been fixed.
+
+ //(In some cases it will not be possible to fix so the user will have to manually resolve the tx)
+
+ //Therefore we throw XA_RETRY
+ //Note we DO NOT throw XAER_RMFAIL or XAER_RMERR since both if these will cause the Arjuna
+ //tx mgr NOT to retry and the transaction will have to be resolve manually.
+
+ throw new MessagingXAException(XAException.XA_RETRY, "A Throwable was caught in sending the transaction", t);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -32,12 +32,12 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.messaging.core.refqueue.BasicPrioritizedDeque;
-import org.jboss.messaging.core.refqueue.PrioritizedDeque;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.util.Future;
+import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
+import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
import org.jboss.util.timeout.Timeout;
import org.jboss.util.timeout.TimeoutTarget;
@@ -83,7 +83,7 @@
protected volatile boolean receiversReady;
- protected PrioritizedDeque messageRefs;
+ protected PriorityLinkedList messageRefs;
protected boolean acceptReliableMessages;
@@ -141,7 +141,7 @@
this.recoverable = recoverable;
- messageRefs = new BasicPrioritizedDeque(10);
+ messageRefs = new BasicPriorityLinkedList(10);
refLock = new Object();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -267,8 +267,6 @@
public long reserveIDBlock(String counterName, int size) throws Exception
{
- // TODO This will need locking (e.g. SELECT ... FOR UPDATE...) in the clustered case
-
if (trace) { log.trace("Getting ID block for counter " + counterName + ", size " + size); }
if (size <= 0)
@@ -285,6 +283,8 @@
{
conn = ds.getConnection();
+ //For the clustered case - this MUST use SELECT .. FOR UPDATE or a similar
+ //construct the locks the row
String selectCounterSQL = getSQLStatement("SELECT_COUNTER");
ps = conn.prepareStatement(selectCounterSQL);
@@ -302,6 +302,10 @@
ps.close();
+ //There is a very small possibility that two threads will attempt to insert the same counter
+ //at the same time, if so, then the second one will fail eventually after a few retries by throwing
+ //a primary key violation.
+
String insertCounterSQL = getSQLStatement("INSERT_COUNTER");
ps = conn.prepareStatement(insertCounterSQL);
Deleted: trunk/src/main/org/jboss/messaging/core/refqueue/BasicPrioritizedDeque.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/refqueue/BasicPrioritizedDeque.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/messaging/core/refqueue/BasicPrioritizedDeque.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -1,193 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.refqueue;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-
-/**
- * A basic non synchronized PrioritizedDeque implementation.
- *
- * It implements this by maintaining an
- * individual LinkedList for each priority level.
- *
- * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class BasicPrioritizedDeque implements PrioritizedDeque
-{
- protected LinkedList[] linkedLists;
-
- protected int priorities;
-
- protected int size;
-
- public BasicPrioritizedDeque(int priorities)
- {
- this.priorities = priorities;
-
- initDeques();
- }
-
- public void addFirst(Object obj, int priority)
- {
- linkedLists[priority].addFirst(obj);
-
- size++;
- }
-
- public void addLast(Object obj, int priority)
- {
- linkedLists[priority].addLast(obj);
-
- size++;
- }
-
- public Object removeFirst()
- {
- Object obj = null;
-
-
- //Initially we are just using a simple prioritization algorithm:
- //Highest priority refs always get returned first.
- //This could cause starvation of lower priority refs.
-
- //TODO - A better prioritization algorithm
-
- for (int i = priorities - 1; i >= 0; i--)
- {
- LinkedList ll = linkedLists[i];
-
- if (!ll.isEmpty())
- {
- obj = ll.removeFirst();
- break;
- }
-
- }
-
- if (obj != null)
- {
- size--;
- }
-
- return obj;
- }
-
- public Object removeLast()
- {
- Object obj = null;
-
- //Initially we are just using a simple prioritization algorithm:
- //Lowest priority refs always get returned first.
-
- //TODO - A better prioritization algorithm
-
- for (int i = 0; i < priorities; i++)
- {
- LinkedList ll = linkedLists[i];
- if (!ll.isEmpty())
- {
- obj = ll.removeLast();
- }
- if (obj != null)
- {
- break;
- }
- }
-
- if (obj != null)
- {
- size--;
- }
-
- return obj;
- }
-
- public Object peekFirst()
- {
- Object obj = null;
-
- //Initially we are just using a simple prioritization algorithm:
- //Highest priority refs always get returned first.
- //This could cause starvation of lower priority refs.
-
- //TODO - A better prioritization algorithm
-
- for (int i = priorities - 1; i >= 0; i--)
- {
- LinkedList ll = linkedLists[i];
- if (!ll.isEmpty())
- {
- obj = ll.getFirst();
- }
- if (obj != null)
- {
- break;
- }
- }
-
- return obj;
- }
-
- public List getAll()
- {
- List all = new ArrayList();
- for (int i = priorities - 1; i >= 0; i--)
- {
- LinkedList deque = linkedLists[i];
- all.addAll(deque);
- }
- return all;
- }
-
- public void clear()
- {
- initDeques();
- }
-
- public int size()
- {
- return size;
- }
-
- public ListIterator iterator()
- {
- return new PrioritizedDequeIterator(linkedLists);
- }
-
- protected void initDeques()
- {
- linkedLists = new LinkedList[priorities];
- for (int i = 0; i < priorities; i++)
- {
- linkedLists[i] = new LinkedList();
- }
-
- size = 0;
- }
-
-}
Deleted: trunk/src/main/org/jboss/messaging/core/refqueue/PrioritizedDeque.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/refqueue/PrioritizedDeque.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/messaging/core/refqueue/PrioritizedDeque.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -1,55 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.refqueue;
-
-import java.util.List;
-import java.util.ListIterator;
-
-/**
- * A deque that returns objects according to a priority.<br>
- *
- * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public interface PrioritizedDeque
-{
- void addFirst(Object obj, int priority);
-
- void addLast(Object obj, int priority);
-
- Object removeFirst();
-
- Object removeLast();
-
- Object peekFirst();
-
- List getAll();
-
- void clear();
-
- int size();
-
- ListIterator iterator();
-}
Deleted: trunk/src/main/org/jboss/messaging/core/refqueue/PrioritizedDequeIterator.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/refqueue/PrioritizedDequeIterator.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/src/main/org/jboss/messaging/core/refqueue/PrioritizedDequeIterator.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -1,116 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.refqueue;
-
-import java.util.LinkedList;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-
-/**
- * A PrioritizedDequeIterator
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-class PrioritizedDequeIterator implements ListIterator
-{
- private LinkedList[] lists;
-
- private int index;
-
- private ListIterator currentIter;
-
- PrioritizedDequeIterator(LinkedList[] lists)
- {
- this.lists = lists;
-
- index = lists.length - 1;
-
- currentIter = lists[index].listIterator();
- }
-
- public void add(Object arg0)
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean hasNext()
- {
- if (currentIter.hasNext())
- {
- return true;
- }
- while (index >= 0)
- {
- if (index == 0 || currentIter.hasNext())
- {
- break;
- }
- index--;
- currentIter = lists[index].listIterator();
- }
- return currentIter.hasNext();
- }
-
- public boolean hasPrevious()
- {
- throw new UnsupportedOperationException();
- }
-
- public Object next()
- {
- if (!hasNext())
- {
- throw new NoSuchElementException();
- }
- return currentIter.next();
- }
-
- public int nextIndex()
- {
- throw new UnsupportedOperationException();
- }
-
- public Object previous()
- {
- throw new UnsupportedOperationException();
- }
-
- public int previousIndex()
- {
- throw new UnsupportedOperationException();
- }
-
- public void remove()
- {
- currentIter.remove();
- }
-
- public void set(Object obj)
- {
- throw new UnsupportedOperationException();
- }
-
-}
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/build.xml 2007-01-19 18:45:03 UTC (rev 1993)
@@ -350,12 +350,16 @@
<!-- ======================================================================================== -->
<target name="tests" depends="tests-jar, prepare-testdirs, clear-test-logs">
- <antcall target="crash-tests"/>
+ <antcall target="crash-tests"/>
<antcall target="invm-tests"/>
+
<antcall target="remote-tests"/> <!-- default remoting configuration (socket) -->
+ <!--
<antcall target="remote-tests">
<param name="test.remoting" value="http"/>
</antcall>
+
+ -->
<antcall target="clustering-tests"/>
</target>
Modified: trunk/tests/etc/jbossjta-properties.xml
===================================================================
--- trunk/tests/etc/jbossjta-properties.xml 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/etc/jbossjta-properties.xml 2007-01-19 18:45:03 UTC (rev 1993)
@@ -181,10 +181,7 @@
<!--
Periodic recovery modules to use. Invoked in sort-order of names.
-->
- <!--
- <property name="com.arjuna.ats.arjuna.recovery.recoveryExtension1" value="com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule"/>
- <property name="com.arjuna.ats.arjuna.recovery.recoveryExtension2" value="com.arjuna.ats.internal.txoj.recovery.TORecoveryModule"/>
- -->
+
<property name="com.arjuna.ats.arjuna.recovery.recoveryExtension3" value="com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule"/>
@@ -226,6 +223,7 @@
<property
name="com.arjuna.ats.txoj.lockstore.allowNestedLocking" value="YES"/>
</properties>
+
<properties depends="arjuna" name="jta">
<property name="com.arjuna.ats.arjuna.xa.nodeIdentifier" value="1"/>
<property name="com.arjuna.ats.jta.xaRecoveryNode" value="1"/>
@@ -249,13 +247,14 @@
provider2.jndi.prop3=zzzz
provider2.xaconnectionfactorylookup=xyz
-->
- <!--
+
<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.JBMESSAGINGBRIDGE_SERVER0" value="org.jboss.jms.recovery.BridgeXAResourceRecovery;server0,bridge-recovery.properties"/>
<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.JBMESSAGINGBRIDGE_SERVER1" value="org.jboss.jms.recovery.BridgeXAResourceRecovery;server1,bridge-recovery.properties"/>
--->
-<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.JBM_TEST" value="org.jboss.test.messaging.jms.bridge.RecoveryTest.NullXAResourceRecovery"/>
<!--
+ <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.JBM_BRIDGE_TEST" value="org.jboss.test.messaging.jms.bridge.NullXAResourceRecovery"/>
+ -->
+ <!--
Support subtransactions in the JTA layer?
Default is NO.
-->
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -287,16 +287,16 @@
assertNull(m);
- //Message should all be in the default expiry queue - let's check
+ //Message should all be in the override expiry queue - let's check
- MessageConsumer cons3 = sess.createConsumer(defaultExpiry);
+ MessageConsumer cons3 = sess.createConsumer(overrideExpiry);
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = (TextMessage)cons3.receive(1000);
assertNotNull(tm);
-
+
assertEquals("Message:" + i, tm.getText());
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -1179,6 +1179,8 @@
}
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -2614,7 +2614,7 @@
{
if (failOnPrepare)
{
- throw new XAException(XAException.XAER_RMERR);
+ throw new XAException(XAException.XAER_RMFAIL);
}
return XAResource.XA_OK;
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -42,7 +42,6 @@
/**
* A BridgeMBeanTest
*
- *
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
@@ -299,7 +298,11 @@
-1, null, null, 5000, -1,
sprops1, sprops2);
+ log.trace("Constructed bridge");
+
ServerManagement.getServer(0).invoke(on, "create", new Object[0], new String[0]);
+
+ log.trace("Created bridge");
{
String cfLookup = (String)ServerManagement.getAttribute(on, "SourceConnectionFactoryLookup");
@@ -309,7 +312,7 @@
assertEquals("/Wibble", cfLookup);
ServerManagement.setAttribute(on, "SourceConnectionFactoryLookup", "/XAConnectionFactory");
}
-
+
{
String cfLookup = (String)ServerManagement.getAttribute(on, "TargetConnectionFactoryLookup");
assertEquals("/XAConnectionFactory", cfLookup);
@@ -580,10 +583,14 @@
InitialContext icSource = new InitialContext(props1);
InitialContext icTarget = new InitialContext(props2);
+ log.trace("Checking bridged bridge");
+
checkBridged(icSource, icTarget, "/ConnectionFactory", "/ConnectionFactory",
"/queue/sourceQueue", "/queue/targetQueue");
+ log.trace("Checked bridge");
+
}
finally
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -34,12 +34,15 @@
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
import org.jboss.jms.server.bridge.Bridge;
import org.jboss.jms.server.bridge.ConnectionFactoryFactory;
import org.jboss.jms.server.bridge.JNDIConnectionFactoryFactory;
import org.jboss.logging.Logger;
import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.tm.TransactionManagerLocator;
/**
* A BridgeTest
@@ -528,6 +531,11 @@
public void testParams() throws Exception
{
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+
try
{
ServerManagement.deployQueue("sourceQueue", 0);
@@ -730,6 +738,11 @@
public void testSelector() throws Exception
{
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+
Connection connSource = null;
Connection connDest = null;
@@ -871,8 +884,185 @@
}
}
+ public void testStartBridgeWithJTATransactionAlreadyRunning() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+
+ Connection connSource = null;
+
+ Connection connDest = null;
+
+ Bridge bridge = null;
+
+ Transaction toResume = null;
+
+ Transaction started = null;
+
+ TransactionManager mgr = TransactionManagerLocator.getInstance().locate();
+
+ try
+ {
+
+ toResume = mgr.suspend();
+
+ mgr.begin();
+
+ started = mgr.getTransaction();
+
+ ServerManagement.deployTopic("sourceTopic", 0);
+
+ ServerManagement.deployQueue("destQueue", 1);
+
+ Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
+
+ Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+
+ ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
+
+ ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
+
+ InitialContext ic0 = new InitialContext(props0);
+
+ InitialContext ic1 = new InitialContext(props1);
+
+ ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+
+ ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ Topic sourceTopic = (Topic)ic0.lookup("/topic/sourceTopic");
+
+ Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
+
+ final int BATCH_SIZE = 10;
+
+ bridge = new Bridge(cff0, cff1, sourceTopic, destQueue,
+ null, null, null, null,
+ null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+ 1, -1,
+ null, null);
+
+ bridge.start();
+
+ connSource = cf0.createConnection();
+
+ connDest = cf1.createConnection();
+
+ Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sessSend.createProducer(sourceTopic);
+
+ for (int i = 0; i < BATCH_SIZE; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sessRec.createConsumer(destQueue);
+
+ connDest.start();
+
+ for (int i = 0 ; i < BATCH_SIZE; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ }
+ finally
+ {
+ if (started != null)
+ {
+ try
+ {
+ started.rollback();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to rollback", e);
+ }
+ }
+
+ if (toResume != null)
+ {
+ try
+ {
+ mgr.resume(toResume);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to resume", e);
+ }
+ }
+
+ if (connSource != null)
+ {
+ try
+ {
+ connSource.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close connection", e);
+ }
+ }
+
+ if (connDest != null)
+ {
+ try
+ {
+ connDest.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close connection", e);
+ }
+ }
+
+ if (bridge != null)
+ {
+ bridge.stop();
+ }
+
+ try
+ {
+ ServerManagement.undeployTopic("sourceTopic", 0);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("destQueue", 1);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+ }
+ }
+
public void testNonDurableSubscriber() throws Exception
{
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+
Connection connSource = null;
Connection connDest = null;
@@ -1003,6 +1193,11 @@
public void testDurableSubscriber() throws Exception
{
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+
Connection connSource = null;
Connection connDest = null;
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -64,14 +64,13 @@
// tables; don't clear the database for those.
ServerManagement.start(i, "all,-transaction,jbossjta", i == 0);
}
- }
-
- //We need a local transaction and recovery manager
- //We must start this after the remote servers have been created or it won't
- //have deleted the database and the recovery manager may attempt to recover transactions
- sc = new ServiceContainer("jbossjta");
- sc.start(false);
-
+
+ //We need a local transaction and recovery manager
+ //We must start this after the remote servers have been created or it won't
+ //have deleted the database and the recovery manager may attempt to recover transactions
+ sc = new ServiceContainer("jbossjta");
+ sc.start(false);
+ }
}
protected void tearDown() throws Exception
@@ -106,10 +105,12 @@
log.error("Failed to kill server", e);
}
}
+
+ sc.stop();
}
- sc.stop();
+
super.tearDown();
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -28,6 +28,7 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -57,6 +58,13 @@
{
private static final Logger log = Logger.getLogger(ReconnectTest.class);
+ protected ConnectionFactoryFactory cff0, cff1;
+
+ protected ConnectionFactory cf0, cf1;
+
+ protected Destination sourceQueue, destQueue;
+
+
public ReconnectTest(String name)
{
super(name);
@@ -65,11 +73,40 @@
protected void setUp() throws Exception
{
super.setUp();
+
+ if (ServerManagement.isRemote())
+ {
+ ServerManagement.deployQueue("sourceQueue", 0);
+
+ ServerManagement.deployQueue("destQueue", 1);
+ }
+
}
protected void tearDown() throws Exception
{
super.tearDown();
+
+ if (ServerManagement.isRemote())
+ {
+ try
+ {
+ ServerManagement.undeployQueue("sourceQueue", 0);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("destQueue", 1);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+ }
}
// Crash and reconnect
@@ -157,7 +194,8 @@
- public void testCrashAndReconnectDestCrashOnCommit_P() throws Exception
+ // Note this test will fail until http://jira.jboss.com/jira/browse/JBTM-192 is complete
+ public void x_testCrashAndReconnectDestCrashOnCommit_P() throws Exception
{
if (!ServerManagement.isRemote())
{
@@ -166,7 +204,8 @@
testCrashAndReconnectDestCrashOnCommit(true);
}
- public void testCrashAndReconnectDestCrashOnCommit_NP() throws Exception
+ // Note this test will fail until http://jira.jboss.com/jira/browse/JBTM-192 is complete
+ public void x_testCrashAndReconnectDestCrashOnCommit_NP() throws Exception
{
if (!ServerManagement.isRemote())
{
@@ -174,49 +213,60 @@
}
testCrashAndReconnectDestCrashOnCommit(false);
}
-
-
- /*
- * Send some messages
- * Crash the destination server
- * Bring the destination server back up
- * Send some more messages
- * Verify all messages are received
- */
- private void testCrashAndReconnectDestBasic(int qosMode, boolean persistent) throws Exception
+
+ private void setUpAdministeredObjects() throws Exception
{
- Connection connSource = null;
-
- Connection connDest = null;
-
- Bridge bridge = null;
-
+ InitialContext ic0 = null, ic1 = null;
try
{
- ServerManagement.deployQueue("sourceQueue", 0);
-
- ServerManagement.deployQueue("destQueue", 1);
-
Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
+ cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
- ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
+ cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
- InitialContext ic0 = new InitialContext(props0);
+ ic0 = new InitialContext(props0);
- InitialContext ic1 = new InitialContext(props1);
+ ic1 = new InitialContext(props1);
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+ cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+ cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
+ sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
+ destQueue = (Queue)ic1.lookup("/queue/destQueue");
+ }
+ finally
+ {
+ if (ic0 != null)
+ {
+ ic0.close();
+ }
+ if (ic1 != null)
+ {
+ ic1.close();
+ }
+ }
+ }
+
+ /*
+ * Send some messages
+ * Crash the destination server
+ * Bring the destination server back up
+ * Send some more messages
+ * Verify all messages are received
+ */
+ private void testCrashAndReconnectDestBasic(int qosMode, boolean persistent) throws Exception
+ {
+ Bridge bridge = null;
+ try
+ {
+ setUpAdministeredObjects();
+
bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
null, null, null, null,
null, 1000, -1, qosMode,
@@ -225,41 +275,16 @@
bridge.start();
- connSource = cf0.createConnection();
-
- connDest = cf1.createConnection();
-
- Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sessSend.createProducer(sourceQueue);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
final int NUM_MESSAGES = 10;
- for (int i = 0; i < NUM_MESSAGES / 2; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
+ //Send some messages
- Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES /2 , persistent);
- MessageConsumer cons = sessRec.createConsumer(destQueue);
-
- connDest.start();
-
//Verify none are received
- Message m = cons.receive(1000);
+ checkNoneReceived(cf1, destQueue);
- assertNull(m);
-
- connDest.close();
-
- ic1.close();
-
//Now crash the dest server
log.info("About to crash server");
@@ -273,78 +298,31 @@
//Restart the server
+ log.info("Restarting server");
+
ServerManagement.start(1, "all", false);
ServerManagement.deployQueue("destQueue", 1);
-
- cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- ic1 = new InitialContext(props1);
+
+ setUpAdministeredObjects();
- cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- destQueue = (Queue)ic1.lookup("/queue/destQueue");
+ //Send some more messages
- connDest = cf1.createConnection();
+ log.info("Sending more messages");
- sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent);
- cons = sessRec.createConsumer(destQueue);
-
- connDest.start();
-
- //Send some more messages
-
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
+ Thread.sleep(3000);
- //If Qos=once and only once then all messages should be received
- //If Qos=at most then only the second half will be received
- //If Qos=dups ok then the the first half will be received twice followed by the second half
-
- checkMessagesReceived(qosMode, cons, NUM_MESSAGES);
+ checkMessagesReceived(cf1, destQueue, qosMode, NUM_MESSAGES);
//Make sure no messages are left in the source dest
- MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
-
- connSource.start();
-
- m = cons2.receive(1000);
-
- assertNull(m);
+ this.checkNoneReceived(cf0, sourceQueue);
}
finally
{
- if (connSource != null)
- {
- try
- {
- connSource.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
- if (connDest != null)
- {
- try
- {
- connDest.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
-
+
if (bridge != null)
{
try
@@ -355,25 +333,7 @@
{
log.error("Failed to stop bridge", e);
}
- }
-
- try
- {
- ServerManagement.undeployQueue("sourceQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 1);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
+ }
}
}
@@ -389,39 +349,13 @@
* Verify all messages are received
*/
private void testCrashAndReconnectDestCrashBeforePrepare(boolean persistent) throws Exception
- {
- Connection connSource = null;
-
- Connection connDest = null;
-
+ {
Bridge bridge = null;
try
{
- ServerManagement.deployQueue("sourceQueue", 0);
+ setUpAdministeredObjects();
- ServerManagement.deployQueue("destQueue", 1);
-
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- InitialContext ic1 = new InitialContext(props1);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
-
bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
null, null, null, null,
null, 1000, -1, Bridge.QOS_ONCE_AND_ONLY_ONCE,
@@ -429,42 +363,19 @@
null, null);
bridge.start();
-
- connSource = cf0.createConnection();
- connDest = cf1.createConnection();
-
- Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sessSend.createProducer(sourceQueue);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
final int NUM_MESSAGES = 10;
-
- for (int i = 0; i < NUM_MESSAGES / 2; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
- prod.send(tm);
- }
+ //Send some messages
- Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent);
- MessageConsumer cons = sessRec.createConsumer(destQueue);
- connDest.start();
+ //verify none are received
- //Verify none are received
+ this.checkNoneReceived(cf1, destQueue);
- Message m = cons.receive(1000);
- assertNull(m);
-
- connDest.close();
-
- ic1.close();
-
//Now crash the dest server
log.info("About to crash server");
@@ -482,73 +393,22 @@
ServerManagement.deployQueue("destQueue", 1);
- cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- ic1 = new InitialContext(props1);
+ setUpAdministeredObjects();
- cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- destQueue = (Queue)ic1.lookup("/queue/destQueue");
+ sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent);
+
+ checkMessagesReceived(cf1, destQueue, Bridge.QOS_ONCE_AND_ONLY_ONCE, NUM_MESSAGES);
- connDest = cf1.createConnection();
-
- sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- cons = sessRec.createConsumer(destQueue);
-
- connDest.start();
-
- //Send some more messages
-
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- checkMessagesReceived(Bridge.QOS_ONCE_AND_ONLY_ONCE, cons, NUM_MESSAGES);
-
//Make sure no messages are left in the source dest
- MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
+ checkNoneReceived(cf0, sourceQueue);
- connSource.start();
-
- m = cons2.receive(1000);
-
- assertNull(m);
-
log.info("Got here");
}
finally
{
- if (connSource != null)
- {
- try
- {
- connSource.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
- if (connDest != null)
- {
- try
- {
- connDest.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
-
+
if (bridge != null)
{
try
@@ -561,23 +421,6 @@
}
}
- try
- {
- ServerManagement.undeployQueue("sourceQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 1);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
}
}
@@ -590,38 +433,12 @@
*/
private void testCrashAndReconnectDestCrashOnCommit(boolean persistent) throws Exception
{
- Connection connSource = null;
-
- Connection connDest = null;
-
Bridge bridge = null;
try
{
- ServerManagement.deployQueue("sourceQueue", 0);
+ setUpAdministeredObjects();
- ServerManagement.deployQueue("destQueue", 1);
-
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- InitialContext ic1 = new InitialContext(props1);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
-
final int NUM_MESSAGES = 10;
bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
@@ -632,42 +449,16 @@
bridge.start();
- connSource = cf0.createConnection();
+ //Send some messages
- connDest = cf1.createConnection();
+ sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent);
- Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sessSend.createProducer(sourceQueue);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES / 2; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
-
- log.info("sent message:" + tm.getJMSMessageID());
-
- }
-
- Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sessRec.createConsumer(destQueue);
-
- connDest.start();
-
+
//Verify none are received
- Message m = cons.receive(1000);
-
- assertNull(m);
+ checkNoneReceived(cf1, destQueue);
- connDest.close();
-
- ic1.close();
-
+
//Poison server 1 so it crashes on commit of dest but after prepare
//This means the transaction branch on source will get commmitted
@@ -707,78 +498,26 @@
log.info("Slept");
- cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- ic1 = new InitialContext(props1);
+ setUpAdministeredObjects();
- cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- destQueue = (Queue)ic1.lookup("/queue/destQueue");
-
- connDest = cf1.createConnection();
-
- sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- cons = sessRec.createConsumer(destQueue);
-
- connDest.start();
//Send some more messages
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
-
- log.info("sent message: " + tm.getJMSMessageID());
-
- }
+ this.sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent);
- checkMessagesReceived(Bridge.QOS_ONCE_AND_ONLY_ONCE, cons, NUM_MESSAGES);
+ checkMessagesReceived(cf1, destQueue, Bridge.QOS_ONCE_AND_ONLY_ONCE, NUM_MESSAGES);
//Make sure no messages are left in the source dest
- MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
+ this.checkNoneReceived(cf0, sourceQueue);
- connSource.start();
-
- m = cons2.receive(1000);
-
- assertNull(m);
-
log.info("Got here");
}
finally
{
- log.info("In finally");
+ log.info("In finally");
- if (connSource != null)
- {
- try
- {
- connSource.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
- if (connDest != null)
- {
- try
- {
- connDest.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
-
if (bridge != null)
{
try
@@ -790,69 +529,138 @@
log.error("Failed to stop bridge", e);
}
}
+ }
+ }
+
+ private void sendMessages(ConnectionFactory cf, Destination dest, int start, int numMessages, boolean persistent)
+ throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
- try
- {
- ServerManagement.undeployQueue("sourceQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try
+ MessageProducer prod = sess.createProducer(dest);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = start; i < start + numMessages; i++)
{
- ServerManagement.undeployQueue("destQueue", 1);
+ TextMessage tm = sess.createTextMessage("message" + i);
+
+ prod.send(tm);
}
- catch (Exception e)
+ }
+ finally
+ {
+ if (conn != null)
{
- log.error("Failed to undeploy", e);
+ conn.close();
}
- }
+ }
}
-
- private void checkMessagesReceived(int qosMode, MessageConsumer cons, int numMessages) throws Exception
+ private void checkNoneReceived(ConnectionFactory cf, Destination dest) throws Exception
{
- //Consume the messages
+ Connection conn = null;
- Set msgs = new HashSet();
-
- while (true)
+ try
{
- TextMessage tm = (TextMessage)cons.receive(2000);
+ conn = cf.createConnection();
- if (tm == null)
- {
- break;
- }
+ conn.start();
- log.info("received message:" + tm.getJMSMessageID());
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- msgs.add(tm.getText());
-
+ MessageConsumer cons = sess.createConsumer(dest);
+
+ Message m = cons.receive(2000);
+
+ assertNull(m);
+
}
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ private void checkMessagesReceived(ConnectionFactory cf, Destination dest, int qosMode, int numMessages) throws Exception
+ {
+ Connection conn = null;
- if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE || qosMode == Bridge.QOS_DUPLICATES_OK)
- {
- //All the messages should be received
+ try
+ {
+ conn = cf.createConnection();
- for (int i = 0; i < numMessages; i++)
+ conn.start();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess.createConsumer(dest);
+
+ // Consume the messages
+
+ Set msgs = new HashSet();
+
+ log.info("checkMessagesReceived");
+
+ int count = 0;
+
+ while (true)
{
- assertTrue(msgs.contains("message" + i));
+ TextMessage tm = (TextMessage)cons.receive(2000);
+
+ if (tm == null)
+ {
+ break;
+ }
+
+ log.info("got message:" + tm.getJMSMessageID());
+
+ msgs.add(tm.getText());
+
+ count++;
+
}
- //Should be no more
- if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE)
+ log.info("message received " + count);
+
+ if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE || qosMode == Bridge.QOS_DUPLICATES_OK)
+ {
+ //All the messages should be received
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ assertTrue(msgs.contains("message" + i));
+ }
+
+ //Should be no more
+ if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE)
+ {
+ assertEquals(numMessages, msgs.size());
+ }
+ }
+ else if (qosMode == Bridge.QOS_AT_MOST_ONCE)
{
- assertEquals(numMessages, msgs.size());
- }
+ //No *guarantee* that any messages will be received
+ //but you still might get some depending on how/where the crash occurred
+ }
+
}
- else if (qosMode == Bridge.QOS_AT_MOST_ONCE)
+ finally
{
- //No *guarantee* that any messages will be received
- //but you still might get some depending on how/where the crash occurred
- }
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
// Inner classes -------------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/JMSPriorityHeaderTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/JMSPriorityHeaderTest.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/JMSPriorityHeaderTest.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -253,7 +253,180 @@
}
+ /*
+ * If messages are sent to a queue with certain priorities, and a consumer is already open
+ * then it is likely that they will be immediately sent to the consumer.
+ * However the list in the consumer is not a priority list, so messages will be consumed in the
+ * order they reached the consumer, not in the order of priority
+ * See http://jira.jboss.com/jira/browse/JBMESSAGING-628
+ */
+ public void testMessageOrderWithConsumerBuffering() throws Exception
+ {
+ Connection conn = cf.createConnection();
+
+ conn.start();
+
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sessSend.createProducer(queue);
+
+ TextMessage m0 = sessSend.createTextMessage("a");
+ TextMessage m1 = sessSend.createTextMessage("b");
+ TextMessage m2 = sessSend.createTextMessage("c");
+ TextMessage m3 = sessSend.createTextMessage("d");
+ TextMessage m4 = sessSend.createTextMessage("e");
+ TextMessage m5 = sessSend.createTextMessage("f");
+ TextMessage m6 = sessSend.createTextMessage("g");
+ TextMessage m7 = sessSend.createTextMessage("h");
+ TextMessage m8 = sessSend.createTextMessage("i");
+ TextMessage m9 = sessSend.createTextMessage("j");
+
+ Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sessReceive.createConsumer(queue);
+
+
+ prod.send(m0, DeliveryMode.NON_PERSISTENT, 0, 0);
+ prod.send(m1, DeliveryMode.NON_PERSISTENT, 1, 0);
+ prod.send(m2, DeliveryMode.NON_PERSISTENT, 2, 0);
+ prod.send(m3, DeliveryMode.NON_PERSISTENT, 3, 0);
+ prod.send(m4, DeliveryMode.NON_PERSISTENT, 4, 0);
+ prod.send(m5, DeliveryMode.NON_PERSISTENT, 5, 0);
+ prod.send(m6, DeliveryMode.NON_PERSISTENT, 6, 0);
+ prod.send(m7, DeliveryMode.NON_PERSISTENT, 7, 0);
+ prod.send(m8, DeliveryMode.NON_PERSISTENT, 8, 0);
+ prod.send(m9, DeliveryMode.NON_PERSISTENT, 9, 0);
+
+
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("j", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("i", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("h", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("g", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("f", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("e", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("d", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("c", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("b", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("a", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(500);
+ assertNull(t);
+ }
+
+ cons.close();
+
+ cons = sessReceive.createConsumer(queue);
+
+ prod.send(m0, DeliveryMode.NON_PERSISTENT, 0, 0);
+ prod.send(m1, DeliveryMode.NON_PERSISTENT, 0, 0);
+ prod.send(m2, DeliveryMode.NON_PERSISTENT, 0, 0);
+ prod.send(m3, DeliveryMode.NON_PERSISTENT, 3, 0);
+ prod.send(m4, DeliveryMode.NON_PERSISTENT, 3, 0);
+ prod.send(m5, DeliveryMode.NON_PERSISTENT, 4, 0);
+ prod.send(m6, DeliveryMode.NON_PERSISTENT, 4, 0);
+ prod.send(m7, DeliveryMode.NON_PERSISTENT, 5, 0);
+ prod.send(m8, DeliveryMode.NON_PERSISTENT, 5, 0);
+ prod.send(m9, DeliveryMode.NON_PERSISTENT, 6, 0);
+
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("j", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("h", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("i", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("f", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("g", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("d", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("e", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("a", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("b", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receive(1000);
+ assertNotNull(t);
+ assertEquals("c", t.getText());
+ }
+ {
+ TextMessage t = (TextMessage)cons.receiveNoWait();
+ assertNull(t);
+ }
+
+ conn.close();
+ }
+
public void testSimple() throws Exception
{
Connection conn = cf.createConnection();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -30,11 +30,21 @@
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
import javax.naming.InitialContext;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.jms.bridge.RecoveryTest.DummyXAResource;
import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.tm.TransactionManagerLocator;
/**
*
@@ -84,6 +94,8 @@
queue = (Queue)initialContext.lookup("/queue/Queue");
+ this.drainDestination(cf, queue);
+
topic = (Topic)initialContext.lookup("/topic/Topic");
}
@@ -222,6 +234,355 @@
conn.close();
}
+ public void testDeliveryCountUpdatedOnCloseTransacted() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue);
+
+ Session consumerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSess.createConsumer(queue);
+ conn.start();
+
+ TextMessage tm = producerSess.createTextMessage("message1");
+
+ producer.send(tm);
+
+ TextMessage rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(1, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertFalse(rm.getJMSRedelivered());
+
+ consumerSess.rollback();
+
+ rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(2, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertTrue(rm.getJMSRedelivered());
+
+ consumerSess.rollback();
+
+ rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(3, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertTrue(rm.getJMSRedelivered());
+
+ //Now close the session without committing
+
+ log.info("Closing session");
+
+ consumerSess.close();
+
+ consumerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ consumer = consumerSess.createConsumer(queue);
+
+ rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(4, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertTrue(rm.getJMSRedelivered());
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testDeliveryCountUpdatedOnCloseClientAck() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue);
+
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue);
+ conn.start();
+
+ TextMessage tm = producerSess.createTextMessage("message1");
+
+ producer.send(tm);
+
+ TextMessage rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(1, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertFalse(rm.getJMSRedelivered());
+
+ consumerSess.recover();
+
+ rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(2, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertTrue(rm.getJMSRedelivered());
+
+ consumerSess.recover();
+
+ rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(3, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertTrue(rm.getJMSRedelivered());
+
+ //Now close the session without committing
+
+ consumerSess.close();
+
+ consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ consumer = consumerSess.createConsumer(queue);
+
+ rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(4, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertTrue(rm.getJMSRedelivered());
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testDeliveryCountUpdatedOnCloseXA() throws Exception
+ {
+ XAConnection xaConn = null;
+
+ Connection conn = null;
+
+ TransactionManager mgr = TransactionManagerLocator.getInstance().locate();
+
+ Transaction toResume = null;
+
+ Transaction tx = null;
+
+ try
+ {
+ toResume = mgr.suspend();
+
+ conn = cf.createConnection();
+
+ //Send a message
+
+ Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue);
+
+ TextMessage tm = producerSess.createTextMessage("message1");
+
+ producer.send(tm);
+
+
+
+ xaConn = ((XAConnectionFactory)cf).createXAConnection();
+
+ XASession consumerSess = xaConn.createXASession();
+ MessageConsumer consumer = consumerSess.createConsumer(queue);
+ xaConn.start();
+
+ DummyXAResource res = new DummyXAResource();
+
+ mgr.begin();
+
+ tx = mgr.getTransaction();
+
+ tx.enlistResource(res);
+
+ tx.enlistResource(consumerSess.getXAResource());
+
+ TextMessage rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(1, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertFalse(rm.getJMSRedelivered());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tx.delistResource(consumerSess.getXAResource(), XAResource.TMSUCCESS);
+
+ tx.rollback();
+
+ mgr.begin();
+
+ tx = mgr.getTransaction();
+
+ tx.enlistResource(res);
+
+ tx.enlistResource(consumerSess.getXAResource());
+
+ rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(2, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertTrue(rm.getJMSRedelivered());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tx.delistResource(consumerSess.getXAResource(), XAResource.TMSUCCESS);
+
+ tx.rollback();
+
+ mgr.begin();
+
+ tx = mgr.getTransaction();
+
+ tx.enlistResource(res);
+
+ tx.enlistResource(consumerSess.getXAResource());
+
+ rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(3, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertTrue(rm.getJMSRedelivered());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tx.delistResource(consumerSess.getXAResource(), XAResource.TMSUCCESS);
+
+ tx.rollback();
+
+ log.info("Closing the consumer");
+
+ //Must close consumer first
+
+ consumer.close();
+
+ consumerSess.close();
+
+ consumerSess = xaConn.createXASession();
+
+ consumer = consumerSess.createConsumer(queue);
+
+ mgr.begin();
+
+ tx = mgr.getTransaction();
+
+ tx.enlistResource(res);
+
+ tx.enlistResource(consumerSess.getXAResource());
+
+ rm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(rm);
+
+ assertEquals(tm.getText(), rm.getText());
+
+ assertEquals(4, rm.getIntProperty("JMSXDeliveryCount"));
+
+ assertTrue(rm.getJMSRedelivered());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tx.delistResource(consumerSess.getXAResource(), XAResource.TMSUCCESS);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (tx != null)
+ {
+ try
+ {
+ tx.commit();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ if (toResume != null)
+ {
+ try
+ {
+ mgr.resume(toResume);
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
+ }
+
+
+
+
+
class Receiver implements Runnable
{
MessageConsumer cons;
@@ -303,6 +664,61 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
+ static class DummyXAResource implements XAResource
+ {
+ DummyXAResource()
+ {
+ }
+
+ public void commit(Xid arg0, boolean arg1) throws XAException
+ {
+ }
+
+ public void end(Xid arg0, int arg1) throws XAException
+ {
+ }
+
+ public void forget(Xid arg0) throws XAException
+ {
+ }
+
+ public int getTransactionTimeout() throws XAException
+ {
+ return 0;
+ }
+
+ public boolean isSameRM(XAResource arg0) throws XAException
+ {
+ return false;
+ }
+
+ public int prepare(Xid arg0) throws XAException
+ {
+ return XAResource.XA_OK;
+ }
+
+ public Xid[] recover(int arg0) throws XAException
+ {
+ return null;
+ }
+
+ public void rollback(Xid arg0) throws XAException
+ {
+ }
+
+ public boolean setTransactionTimeout(int arg0) throws XAException
+ {
+ return false;
+ }
+
+ public void start(Xid arg0, int arg1) throws XAException
+ {
+
+ }
+
+ }
+
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/connectionfactory/ConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/connectionfactory/ConnectionFactoryTest.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/connectionfactory/ConnectionFactoryTest.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -21,17 +21,23 @@
*/
package org.jboss.test.messaging.jms.server.connectionfactory;
-import org.jboss.test.messaging.MessagingTestCase;
-import org.jboss.test.messaging.tools.ServerManagement;
-
-import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
+import javax.jms.Connection;
import javax.jms.ConnectionFactory;
-import javax.jms.XAConnectionFactory;
+import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
+import javax.jms.Session;
import javax.jms.TopicConnectionFactory;
+import javax.jms.XAConnectionFactory;
import javax.management.ObjectName;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+import org.jboss.jms.client.JBossMessageConsumer;
+import org.jboss.jms.client.delegate.ClientConsumerDelegate;
+import org.jboss.jms.client.state.ConsumerState;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
/**
* Tests a deployed ConnectionFactory service.
*
@@ -124,7 +130,56 @@
// OK
}
}
+
+ public void testDeploymentWithPrefetch() throws Exception
+ {
+ ServerManagement.deployQueue("testQueue");
+
+ Queue queue = (Queue)initialContext.lookup("/queue/testQueue");
+
+ String objectName = "somedomain:service=SomeConnectionFactory";
+ String[] jndiBindings = new String[] { "/SomeConnectionFactory" };
+ final int prefetchSize = 777777;
+
+ ServerManagement.deployConnectionFactory(objectName, jndiBindings, prefetchSize);
+
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/SomeConnectionFactory");
+
+ assertNotNull(cf);
+ assertTrue(cf instanceof QueueConnectionFactory);
+ assertTrue(cf instanceof TopicConnectionFactory);
+
+
+ Connection conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageConsumer cons = (JBossMessageConsumer)sess.createConsumer(queue);
+
+ ClientConsumerDelegate del = (ClientConsumerDelegate)cons.getDelegate();
+
+ ConsumerState state = (ConsumerState)del.getState();
+
+ int size = state.getBufferSize();
+
+ assertEquals(prefetchSize, size);
+
+ ServerManagement.undeployConnectionFactory(new ObjectName(objectName));
+
+ ServerManagement.undeployQueue("testQueue");
+
+ try
+ {
+ initialContext.lookup("/SomeConnectionFactory");
+ fail("should throw exception");
+ }
+ catch(NameNotFoundException e)
+ {
+ // OK
+ }
+ }
+
public void testDeploymentMultipleJNDIBindings() throws Exception
{
String objectName = "somedomain:service=SomeConnectionFactory";
Copied: trunk/tests/src/org/jboss/test/messaging/util/prioritylinkedlist (from rev 1992, trunk/tests/src/org/jboss/test/messaging/core/refqueue)
Deleted: trunk/tests/src/org/jboss/test/messaging/util/prioritylinkedlist/PrioritizedReferenceQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/refqueue/PrioritizedReferenceQueueTest.java 2007-01-19 06:28:02 UTC (rev 1992)
+++ trunk/tests/src/org/jboss/test/messaging/util/prioritylinkedlist/PrioritizedReferenceQueueTest.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -1,604 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.core.refqueue;
-
-import java.util.Iterator;
-import java.util.ListIterator;
-
-import org.jboss.messaging.core.refqueue.BasicPrioritizedDeque;
-import org.jboss.test.messaging.MessagingTestCase;
-
-/**
- * @author <a href="tim.fox at jboss.com>Tim Fox</a>
- *
- * $Id$
- */
-public class PrioritizedReferenceQueueTest extends MessagingTestCase
-{
- protected BasicPrioritizedDeque deque;
-
- protected Wibble a;
- protected Wibble b;
- protected Wibble c;
- protected Wibble d;
- protected Wibble e;
- protected Wibble f;
- protected Wibble g;
- protected Wibble h;
- protected Wibble i;
- protected Wibble j;
- protected Wibble k;
- protected Wibble l;
- protected Wibble m;
- protected Wibble n;
- protected Wibble o;
- protected Wibble p;
- protected Wibble q;
- protected Wibble r;
- protected Wibble s;
- protected Wibble t;
- protected Wibble u;
- protected Wibble v;
- protected Wibble w;
- protected Wibble x;
- protected Wibble y;
- protected Wibble z;
-
- public PrioritizedReferenceQueueTest(String name)
- {
- super(name);
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- deque = new BasicPrioritizedDeque(10);
-
- a = new Wibble("a");
- b = new Wibble("b");
- c = new Wibble("c");
- d = new Wibble("d");
- e = new Wibble("e");
- f = new Wibble("f");
- g = new Wibble("g");
- h = new Wibble("h");
- i = new Wibble("i");
- j = new Wibble("j");
- k = new Wibble("k");
- l = new Wibble("l");
- m = new Wibble("m");
- n = new Wibble("n");
- o = new Wibble("o");
- p = new Wibble("p");
- q = new Wibble("q");
- r = new Wibble("r");
- s = new Wibble("s");
- t = new Wibble("t");
- u = new Wibble("u");
- v = new Wibble("v");
- w = new Wibble("w");
- x = new Wibble("x");
- y = new Wibble("y");
- z = new Wibble("z");
- }
-
-
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public void testAddFirst() throws Exception
- {
- deque.addFirst(a, 0);
- deque.addFirst(b, 0);
- deque.addFirst(c, 0);
- deque.addFirst(d, 0);
- deque.addFirst(e, 0);
-
-
- assertEquals(e, deque.removeFirst());
- assertEquals(d, deque.removeFirst());
- assertEquals(c, deque.removeFirst());
- assertEquals(b, deque.removeFirst());
- assertEquals(a, deque.removeFirst());
- assertNull(deque.removeFirst());
- }
-
- public void testAddLast() throws Exception
- {
- deque.addLast(a, 0);
- deque.addLast(b, 0);
- deque.addLast(c, 0);
- deque.addLast(d, 0);
- deque.addLast(e, 0);
-
- assertEquals(a, deque.removeFirst());
- assertEquals(b, deque.removeFirst());
- assertEquals(c, deque.removeFirst());
- assertEquals(d, deque.removeFirst());
- assertEquals(e, deque.removeFirst());
- assertNull(deque.removeFirst());
-
- }
-
-
- public void testRemoveFirst() throws Exception
- {
- deque.addLast(a, 0);
- deque.addLast(b, 1);
- deque.addLast(c, 2);
- deque.addLast(d, 3);
- deque.addLast(e, 4);
- deque.addLast(f, 5);
- deque.addLast(g, 6);
- deque.addLast(h, 7);
- deque.addLast(i, 8);
- deque.addLast(j, 9);
-
- assertEquals(j, deque.removeFirst());
- assertEquals(i, deque.removeFirst());
- assertEquals(h, deque.removeFirst());
- assertEquals(g, deque.removeFirst());
- assertEquals(f, deque.removeFirst());
- assertEquals(e, deque.removeFirst());
- assertEquals(d, deque.removeFirst());
- assertEquals(c, deque.removeFirst());
- assertEquals(b, deque.removeFirst());
- assertEquals(a, deque.removeFirst());
-
- assertNull(deque.removeFirst());
-
- deque.addLast(a, 9);
- deque.addLast(b, 8);
- deque.addLast(c, 7);
- deque.addLast(d, 6);
- deque.addLast(e, 5);
- deque.addLast(f, 4);
- deque.addLast(g, 3);
- deque.addLast(h, 2);
- deque.addLast(i, 1);
- deque.addLast(j, 0);
-
- assertEquals(a, deque.removeFirst());
- assertEquals(b, deque.removeFirst());
- assertEquals(c, deque.removeFirst());
- assertEquals(d, deque.removeFirst());
- assertEquals(e, deque.removeFirst());
- assertEquals(f, deque.removeFirst());
- assertEquals(g, deque.removeFirst());
- assertEquals(h, deque.removeFirst());
- assertEquals(i, deque.removeFirst());
- assertEquals(j, deque.removeFirst());
-
- assertNull(deque.removeFirst());
-
- deque.addLast(a, 9);
- deque.addLast(b, 0);
- deque.addLast(c, 8);
- deque.addLast(d, 1);
- deque.addLast(e, 7);
- deque.addLast(f, 2);
- deque.addLast(g, 6);
- deque.addLast(h, 3);
- deque.addLast(i, 5);
- deque.addLast(j, 4);
-
- assertEquals(a, deque.removeFirst());
- assertEquals(c, deque.removeFirst());
- assertEquals(e, deque.removeFirst());
- assertEquals(g, deque.removeFirst());
- assertEquals(i, deque.removeFirst());
- assertEquals(j, deque.removeFirst());
- assertEquals(h, deque.removeFirst());
- assertEquals(f, deque.removeFirst());
- assertEquals(d, deque.removeFirst());
- assertEquals(b, deque.removeFirst());
-
- assertNull(deque.removeFirst());
-
- deque.addLast(a, 0);
- deque.addLast(b, 3);
- deque.addLast(c, 3);
- deque.addLast(d, 3);
- deque.addLast(e, 6);
- deque.addLast(f, 6);
- deque.addLast(g, 6);
- deque.addLast(h, 9);
- deque.addLast(i, 9);
- deque.addLast(j, 9);
-
- assertEquals(h, deque.removeFirst());
- assertEquals(i, deque.removeFirst());
- assertEquals(j, deque.removeFirst());
- assertEquals(e, deque.removeFirst());
- assertEquals(f, deque.removeFirst());
- assertEquals(g, deque.removeFirst());
- assertEquals(b, deque.removeFirst());
- assertEquals(c, deque.removeFirst());
- assertEquals(d, deque.removeFirst());
- assertEquals(a, deque.removeFirst());
-
- assertNull(deque.removeFirst());
-
- deque.addLast(a, 5);
- deque.addLast(b, 5);
- deque.addLast(c, 5);
- deque.addLast(d, 5);
- deque.addLast(e, 5);
- deque.addLast(f, 5);
- deque.addLast(g, 5);
- deque.addLast(h, 5);
- deque.addLast(i, 5);
- deque.addLast(j, 5);
-
- assertEquals(a, deque.removeFirst());
- assertEquals(b, deque.removeFirst());
- assertEquals(c, deque.removeFirst());
- assertEquals(d, deque.removeFirst());
- assertEquals(e, deque.removeFirst());
- assertEquals(f, deque.removeFirst());
- assertEquals(g, deque.removeFirst());
- assertEquals(h, deque.removeFirst());
- assertEquals(i, deque.removeFirst());
- assertEquals(j, deque.removeFirst());
-
- assertNull(deque.removeFirst());
-
- deque.addLast(j, 5);
- deque.addLast(i, 5);
- deque.addLast(h, 5);
- deque.addLast(g, 5);
- deque.addLast(f, 5);
- deque.addLast(e, 5);
- deque.addLast(d, 5);
- deque.addLast(c, 5);
- deque.addLast(b, 5);
- deque.addLast(a, 5);
-
- assertEquals(j, deque.removeFirst());
- assertEquals(i, deque.removeFirst());
- assertEquals(h, deque.removeFirst());
- assertEquals(g, deque.removeFirst());
- assertEquals(f, deque.removeFirst());
- assertEquals(e, deque.removeFirst());
- assertEquals(d, deque.removeFirst());
- assertEquals(c, deque.removeFirst());
- assertEquals(b, deque.removeFirst());
- assertEquals(a, deque.removeFirst());
-
- assertNull(deque.removeFirst());
-
- }
-
- public void testGetAll() throws Exception
- {
- deque.addLast(a, 0);
- deque.addLast(b, 3);
- deque.addLast(c, 3);
- deque.addLast(d, 3);
- deque.addLast(e, 6);
- deque.addLast(f, 6);
- deque.addLast(g, 6);
- deque.addLast(h, 9);
- deque.addLast(i, 9);
- deque.addLast(j, 9);
-
-
- Iterator iter = deque.getAll().iterator();
- int count = 0;
- while (iter.hasNext())
- {
- Object o = iter.next();
- if (count == 0)
- {
- assertEquals(h, o);
- }
- if (count == 1)
- {
- assertEquals(i, o);
- }
- if (count == 2)
- {
- assertEquals(j, o);
- }
- if (count == 3)
- {
- assertEquals(e, o);
- }
- if (count == 4)
- {
- assertEquals(f, o);
- }
- if (count == 5)
- {
- assertEquals(g, o);
- }
- if (count == 6)
- {
- assertEquals(b, o);
- }
- if (count == 7)
- {
- assertEquals(c, o);
- }
- if (count == 8)
- {
- assertEquals(d, o);
- }
- if (count == 9)
- {
- assertEquals(a, o);
- }
- count++;
- }
- assertEquals(10, count);
- }
-
- public void testIterator()
- {
- deque.addLast(a, 9);
- deque.addLast(b, 9);
- deque.addLast(c, 8);
- deque.addLast(d, 8);
- deque.addLast(e, 7);
- deque.addLast(f, 7);
- deque.addLast(g, 7);
- deque.addLast(h, 6);
- deque.addLast(i, 6);
- deque.addLast(j, 6);
- deque.addLast(k, 5);
- deque.addLast(l, 5);
- deque.addLast(m, 4);
- deque.addLast(n, 4);
- deque.addLast(o, 4);
- deque.addLast(p, 3);
- deque.addLast(q, 3);
- deque.addLast(r, 3);
- deque.addLast(s, 2);
- deque.addLast(t, 2);
- deque.addLast(u, 2);
- deque.addLast(v, 1);
- deque.addLast(w, 1);
- deque.addLast(x, 1);
- deque.addLast(y, 0);
- deque.addLast(z, 0);
-
- ListIterator iter = deque.iterator();
-
- int c = 0;
- while (iter.hasNext())
- {
- Wibble w = (Wibble)iter.next();
- c++;
- }
- assertEquals(c, 26);
-
- iter = deque.iterator();
- assertTrue(iter.hasNext());
- Wibble w = (Wibble)iter.next();
- assertEquals("a", w.s);
- w = (Wibble)iter.next();
- assertEquals("b", w.s);
- w = (Wibble)iter.next();
- assertEquals("c", w.s);
- w = (Wibble)iter.next();
- assertEquals("d", w.s);
- w = (Wibble)iter.next();
- assertEquals("e", w.s);
- w = (Wibble)iter.next();
- assertEquals("f", w.s);
- w = (Wibble)iter.next();
- assertEquals("g", w.s);
- w = (Wibble)iter.next();
- assertEquals("h", w.s);
- w = (Wibble)iter.next();
- assertEquals("i", w.s);
- w = (Wibble)iter.next();
- assertEquals("j", w.s);
- w = (Wibble)iter.next();
- assertEquals("k", w.s);
- w = (Wibble)iter.next();
- assertEquals("l", w.s);
- w = (Wibble)iter.next();
- assertEquals("m", w.s);
- w = (Wibble)iter.next();
- assertEquals("n", w.s);
- w = (Wibble)iter.next();
- assertEquals("o", w.s);
- w = (Wibble)iter.next();
- assertEquals("p", w.s);
- w = (Wibble)iter.next();
- assertEquals("q", w.s);
- w = (Wibble)iter.next();
- assertEquals("r", w.s);
- w = (Wibble)iter.next();
- assertEquals("s", w.s);
- w = (Wibble)iter.next();
- assertEquals("t", w.s);
- w = (Wibble)iter.next();
- assertEquals("u", w.s);
- w = (Wibble)iter.next();
- assertEquals("v", w.s);
- w = (Wibble)iter.next();
- assertEquals("w", w.s);
- w = (Wibble)iter.next();
- assertEquals("x", w.s);
- w = (Wibble)iter.next();
- assertEquals("y", w.s);
- w = (Wibble)iter.next();
- assertEquals("z", w.s);
- assertFalse(iter.hasNext());
-
- iter = deque.iterator();
- assertTrue(iter.hasNext());
- w = (Wibble)iter.next();
- assertEquals("a", w.s);
-
- iter.remove();
-
- w = (Wibble)iter.next();
- assertEquals("b", w.s);
- w = (Wibble)iter.next();
- assertEquals("c", w.s);
- w = (Wibble)iter.next();
- assertEquals("d", w.s);
-
- iter.remove();
-
- w = (Wibble)iter.next();
- assertEquals("e", w.s);
- w = (Wibble)iter.next();
- assertEquals("f", w.s);
- w = (Wibble)iter.next();
- assertEquals("g", w.s);
- w = (Wibble)iter.next();
- assertEquals("h", w.s);
- w = (Wibble)iter.next();
- assertEquals("i", w.s);
- w = (Wibble)iter.next();
- assertEquals("j", w.s);
-
- iter.remove();
-
- w = (Wibble)iter.next();
- assertEquals("k", w.s);
- w = (Wibble)iter.next();
- assertEquals("l", w.s);
- w = (Wibble)iter.next();
- assertEquals("m", w.s);
- w = (Wibble)iter.next();
- assertEquals("n", w.s);
- w = (Wibble)iter.next();
- assertEquals("o", w.s);
- w = (Wibble)iter.next();
- assertEquals("p", w.s);
- w = (Wibble)iter.next();
- assertEquals("q", w.s);
- w = (Wibble)iter.next();
- assertEquals("r", w.s);
- w = (Wibble)iter.next();
- assertEquals("s", w.s);
- w = (Wibble)iter.next();
- assertEquals("t", w.s);
- w = (Wibble)iter.next();
- assertEquals("u", w.s);
- w = (Wibble)iter.next();
- assertEquals("v", w.s);
- w = (Wibble)iter.next();
- assertEquals("w", w.s);
- w = (Wibble)iter.next();
- assertEquals("x", w.s);
- w = (Wibble)iter.next();
- assertEquals("y", w.s);
- w = (Wibble)iter.next();
- assertEquals("z", w.s);
- iter.remove();
- assertFalse(iter.hasNext());
-
- iter = deque.iterator();
- assertTrue(iter.hasNext());
- w = (Wibble)iter.next();
- assertEquals("b", w.s);
- w = (Wibble)iter.next();
- assertEquals("c", w.s);
- w = (Wibble)iter.next();
- assertEquals("e", w.s);
- w = (Wibble)iter.next();
- assertEquals("f", w.s);
- w = (Wibble)iter.next();
- assertEquals("g", w.s);
- w = (Wibble)iter.next();
- assertEquals("h", w.s);
- w = (Wibble)iter.next();
- assertEquals("i", w.s);
- w = (Wibble)iter.next();
- assertEquals("k", w.s);
- w = (Wibble)iter.next();
- assertEquals("l", w.s);
- w = (Wibble)iter.next();
- assertEquals("m", w.s);
- w = (Wibble)iter.next();
- assertEquals("n", w.s);
- w = (Wibble)iter.next();
- assertEquals("o", w.s);
- w = (Wibble)iter.next();
- assertEquals("p", w.s);
- w = (Wibble)iter.next();
- assertEquals("q", w.s);
- w = (Wibble)iter.next();
- assertEquals("r", w.s);
- w = (Wibble)iter.next();
- assertEquals("s", w.s);
- w = (Wibble)iter.next();
- assertEquals("t", w.s);
- w = (Wibble)iter.next();
- assertEquals("u", w.s);
- w = (Wibble)iter.next();
- assertEquals("v", w.s);
- w = (Wibble)iter.next();
- assertEquals("w", w.s);
- w = (Wibble)iter.next();
- assertEquals("x", w.s);
- w = (Wibble)iter.next();
- assertEquals("y", w.s);
- assertFalse(iter.hasNext());
-
- }
-
-
- public void testClear()
- {
- deque.addLast(a, 0);
- deque.addLast(b, 3);
- deque.addLast(c, 3);
- deque.addLast(d, 3);
- deque.addLast(e, 6);
- deque.addLast(f, 6);
- deque.addLast(g, 6);
- deque.addLast(h, 9);
- deque.addLast(i, 9);
- deque.addLast(j, 9);
-
- deque.clear();
-
- assertNull(deque.removeFirst());
-
- assertTrue(deque.getAll().isEmpty());
- }
-
- class Wibble
- {
- String s;
- Wibble(String s)
- {
- this.s = s;
- }
- public String toString()
- {
- return s;
- }
- }
-
-}
-
Copied: trunk/tests/src/org/jboss/test/messaging/util/prioritylinkedlist/PriorityLinkedListTest.java (from rev 1992, trunk/tests/src/org/jboss/test/messaging/core/refqueue/PrioritizedReferenceQueueTest.java)
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/util/prioritylinkedlist/PriorityLinkedListTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/util/prioritylinkedlist/PriorityLinkedListTest.java 2007-01-19 18:45:03 UTC (rev 1993)
@@ -0,0 +1,626 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.util.prioritylinkedlist;
+
+import java.util.Iterator;
+import java.util.ListIterator;
+
+import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
+import org.jboss.test.messaging.MessagingTestCase;
+
+/**
+ * @author <a href="tim.fox at jboss.com>Tim Fox</a>
+ *
+ * $Id$
+ */
+public class PriorityLinkedListTest extends MessagingTestCase
+{
+ protected BasicPriorityLinkedList list;
+
+ protected Wibble a;
+ protected Wibble b;
+ protected Wibble c;
+ protected Wibble d;
+ protected Wibble e;
+ protected Wibble f;
+ protected Wibble g;
+ protected Wibble h;
+ protected Wibble i;
+ protected Wibble j;
+ protected Wibble k;
+ protected Wibble l;
+ protected Wibble m;
+ protected Wibble n;
+ protected Wibble o;
+ protected Wibble p;
+ protected Wibble q;
+ protected Wibble r;
+ protected Wibble s;
+ protected Wibble t;
+ protected Wibble u;
+ protected Wibble v;
+ protected Wibble w;
+ protected Wibble x;
+ protected Wibble y;
+ protected Wibble z;
+
+ public PriorityLinkedListTest(String name)
+ {
+ super(name);
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ list = new BasicPriorityLinkedList(10);
+
+ a = new Wibble("a");
+ b = new Wibble("b");
+ c = new Wibble("c");
+ d = new Wibble("d");
+ e = new Wibble("e");
+ f = new Wibble("f");
+ g = new Wibble("g");
+ h = new Wibble("h");
+ i = new Wibble("i");
+ j = new Wibble("j");
+ k = new Wibble("k");
+ l = new Wibble("l");
+ m = new Wibble("m");
+ n = new Wibble("n");
+ o = new Wibble("o");
+ p = new Wibble("p");
+ q = new Wibble("q");
+ r = new Wibble("r");
+ s = new Wibble("s");
+ t = new Wibble("t");
+ u = new Wibble("u");
+ v = new Wibble("v");
+ w = new Wibble("w");
+ x = new Wibble("x");
+ y = new Wibble("y");
+ z = new Wibble("z");
+ }
+
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testSpeed() throws Exception
+ {
+ final int NUM_MESSAGES = 1000000;
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ list.addLast(new Object(), i % 10);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Object obj = list.removeFirst();
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("That took " + (end -start) + " ms");
+
+ }
+
+ public void testAddFirst() throws Exception
+ {
+ list.addFirst(a, 0);
+ list.addFirst(b, 0);
+ list.addFirst(c, 0);
+ list.addFirst(d, 0);
+ list.addFirst(e, 0);
+
+
+ assertEquals(e, list.removeFirst());
+ assertEquals(d, list.removeFirst());
+ assertEquals(c, list.removeFirst());
+ assertEquals(b, list.removeFirst());
+ assertEquals(a, list.removeFirst());
+ assertNull(list.removeFirst());
+ }
+
+ public void testAddLast() throws Exception
+ {
+ list.addLast(a, 0);
+ list.addLast(b, 0);
+ list.addLast(c, 0);
+ list.addLast(d, 0);
+ list.addLast(e, 0);
+
+ assertEquals(a, list.removeFirst());
+ assertEquals(b, list.removeFirst());
+ assertEquals(c, list.removeFirst());
+ assertEquals(d, list.removeFirst());
+ assertEquals(e, list.removeFirst());
+ assertNull(list.removeFirst());
+
+ }
+
+
+ public void testRemoveFirst() throws Exception
+ {
+ list.addLast(a, 0);
+ list.addLast(b, 1);
+ list.addLast(c, 2);
+ list.addLast(d, 3);
+ list.addLast(e, 4);
+ list.addLast(f, 5);
+ list.addLast(g, 6);
+ list.addLast(h, 7);
+ list.addLast(i, 8);
+ list.addLast(j, 9);
+
+ assertEquals(j, list.removeFirst());
+ assertEquals(i, list.removeFirst());
+ assertEquals(h, list.removeFirst());
+ assertEquals(g, list.removeFirst());
+ assertEquals(f, list.removeFirst());
+ assertEquals(e, list.removeFirst());
+ assertEquals(d, list.removeFirst());
+ assertEquals(c, list.removeFirst());
+ assertEquals(b, list.removeFirst());
+ assertEquals(a, list.removeFirst());
+
+ assertNull(list.removeFirst());
+
+ list.addLast(a, 9);
+ list.addLast(b, 8);
+ list.addLast(c, 7);
+ list.addLast(d, 6);
+ list.addLast(e, 5);
+ list.addLast(f, 4);
+ list.addLast(g, 3);
+ list.addLast(h, 2);
+ list.addLast(i, 1);
+ list.addLast(j, 0);
+
+ assertEquals(a, list.removeFirst());
+ assertEquals(b, list.removeFirst());
+ assertEquals(c, list.removeFirst());
+ assertEquals(d, list.removeFirst());
+ assertEquals(e, list.removeFirst());
+ assertEquals(f, list.removeFirst());
+ assertEquals(g, list.removeFirst());
+ assertEquals(h, list.removeFirst());
+ assertEquals(i, list.removeFirst());
+ assertEquals(j, list.removeFirst());
+
+ assertNull(list.removeFirst());
+
+ list.addLast(a, 9);
+ list.addLast(b, 0);
+ list.addLast(c, 8);
+ list.addLast(d, 1);
+ list.addLast(e, 7);
+ list.addLast(f, 2);
+ list.addLast(g, 6);
+ list.addLast(h, 3);
+ list.addLast(i, 5);
+ list.addLast(j, 4);
+
+ assertEquals(a, list.removeFirst());
+ assertEquals(c, list.removeFirst());
+ assertEquals(e, list.removeFirst());
+ assertEquals(g, list.removeFirst());
+ assertEquals(i, list.removeFirst());
+ assertEquals(j, list.removeFirst());
+ assertEquals(h, list.removeFirst());
+ assertEquals(f, list.removeFirst());
+ assertEquals(d, list.removeFirst());
+ assertEquals(b, list.removeFirst());
+
+ assertNull(list.removeFirst());
+
+ list.addLast(a, 0);
+ list.addLast(b, 3);
+ list.addLast(c, 3);
+ list.addLast(d, 3);
+ list.addLast(e, 6);
+ list.addLast(f, 6);
+ list.addLast(g, 6);
+ list.addLast(h, 9);
+ list.addLast(i, 9);
+ list.addLast(j, 9);
+
+ assertEquals(h, list.removeFirst());
+ assertEquals(i, list.removeFirst());
+ assertEquals(j, list.removeFirst());
+ assertEquals(e, list.removeFirst());
+ assertEquals(f, list.removeFirst());
+ assertEquals(g, list.removeFirst());
+ assertEquals(b, list.removeFirst());
+ assertEquals(c, list.removeFirst());
+ assertEquals(d, list.removeFirst());
+ assertEquals(a, list.removeFirst());
+
+ assertNull(list.removeFirst());
+
+ list.addLast(a, 5);
+ list.addLast(b, 5);
+ list.addLast(c, 5);
+ list.addLast(d, 5);
+ list.addLast(e, 5);
+ list.addLast(f, 5);
+ list.addLast(g, 5);
+ list.addLast(h, 5);
+ list.addLast(i, 5);
+ list.addLast(j, 5);
+
+ assertEquals(a, list.removeFirst());
+ assertEquals(b, list.removeFirst());
+ assertEquals(c, list.removeFirst());
+ assertEquals(d, list.removeFirst());
+ assertEquals(e, list.removeFirst());
+ assertEquals(f, list.removeFirst());
+ assertEquals(g, list.removeFirst());
+ assertEquals(h, list.removeFirst());
+ assertEquals(i, list.removeFirst());
+ assertEquals(j, list.removeFirst());
+
+ assertNull(list.removeFirst());
+
+ list.addLast(j, 5);
+ list.addLast(i, 5);
+ list.addLast(h, 5);
+ list.addLast(g, 5);
+ list.addLast(f, 5);
+ list.addLast(e, 5);
+ list.addLast(d, 5);
+ list.addLast(c, 5);
+ list.addLast(b, 5);
+ list.addLast(a, 5);
+
+ assertEquals(j, list.removeFirst());
+ assertEquals(i, list.removeFirst());
+ assertEquals(h, list.removeFirst());
+ assertEquals(g, list.removeFirst());
+ assertEquals(f, list.removeFirst());
+ assertEquals(e, list.removeFirst());
+ assertEquals(d, list.removeFirst());
+ assertEquals(c, list.removeFirst());
+ assertEquals(b, list.removeFirst());
+ assertEquals(a, list.removeFirst());
+
+ assertNull(list.removeFirst());
+
+ }
+
+ public void testGetAll() throws Exception
+ {
+ list.addLast(a, 0);
+ list.addLast(b, 3);
+ list.addLast(c, 3);
+ list.addLast(d, 3);
+ list.addLast(e, 6);
+ list.addLast(f, 6);
+ list.addLast(g, 6);
+ list.addLast(h, 9);
+ list.addLast(i, 9);
+ list.addLast(j, 9);
+
+
+ Iterator iter = list.getAll().iterator();
+ int count = 0;
+ while (iter.hasNext())
+ {
+ Object o = iter.next();
+ if (count == 0)
+ {
+ assertEquals(h, o);
+ }
+ if (count == 1)
+ {
+ assertEquals(i, o);
+ }
+ if (count == 2)
+ {
+ assertEquals(j, o);
+ }
+ if (count == 3)
+ {
+ assertEquals(e, o);
+ }
+ if (count == 4)
+ {
+ assertEquals(f, o);
+ }
+ if (count == 5)
+ {
+ assertEquals(g, o);
+ }
+ if (count == 6)
+ {
+ assertEquals(b, o);
+ }
+ if (count == 7)
+ {
+ assertEquals(c, o);
+ }
+ if (count == 8)
+ {
+ assertEquals(d, o);
+ }
+ if (count == 9)
+ {
+ assertEquals(a, o);
+ }
+ count++;
+ }
+ assertEquals(10, count);
+ }
+
+ public void testIterator()
+ {
+ list.addLast(a, 9);
+ list.addLast(b, 9);
+ list.addLast(c, 8);
+ list.addLast(d, 8);
+ list.addLast(e, 7);
+ list.addLast(f, 7);
+ list.addLast(g, 7);
+ list.addLast(h, 6);
+ list.addLast(i, 6);
+ list.addLast(j, 6);
+ list.addLast(k, 5);
+ list.addLast(l, 5);
+ list.addLast(m, 4);
+ list.addLast(n, 4);
+ list.addLast(o, 4);
+ list.addLast(p, 3);
+ list.addLast(q, 3);
+ list.addLast(r, 3);
+ list.addLast(s, 2);
+ list.addLast(t, 2);
+ list.addLast(u, 2);
+ list.addLast(v, 1);
+ list.addLast(w, 1);
+ list.addLast(x, 1);
+ list.addLast(y, 0);
+ list.addLast(z, 0);
+
+ ListIterator iter = list.iterator();
+
+ int c = 0;
+ while (iter.hasNext())
+ {
+ Wibble w = (Wibble)iter.next();
+ c++;
+ }
+ assertEquals(c, 26);
+
+ iter = list.iterator();
+ assertTrue(iter.hasNext());
+ Wibble w = (Wibble)iter.next();
+ assertEquals("a", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("b", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("c", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("d", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("e", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("f", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("g", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("h", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("i", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("j", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("k", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("l", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("m", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("n", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("o", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("p", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("q", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("r", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("s", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("t", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("u", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("v", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("w", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("x", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("y", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("z", w.s);
+ assertFalse(iter.hasNext());
+
+ iter = list.iterator();
+ assertTrue(iter.hasNext());
+ w = (Wibble)iter.next();
+ assertEquals("a", w.s);
+
+ iter.remove();
+
+ w = (Wibble)iter.next();
+ assertEquals("b", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("c", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("d", w.s);
+
+ iter.remove();
+
+ w = (Wibble)iter.next();
+ assertEquals("e", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("f", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("g", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("h", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("i", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("j", w.s);
+
+ iter.remove();
+
+ w = (Wibble)iter.next();
+ assertEquals("k", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("l", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("m", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("n", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("o", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("p", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("q", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("r", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("s", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("t", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("u", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("v", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("w", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("x", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("y", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("z", w.s);
+ iter.remove();
+ assertFalse(iter.hasNext());
+
+ iter = list.iterator();
+ assertTrue(iter.hasNext());
+ w = (Wibble)iter.next();
+ assertEquals("b", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("c", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("e", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("f", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("g", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("h", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("i", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("k", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("l", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("m", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("n", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("o", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("p", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("q", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("r", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("s", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("t", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("u", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("v", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("w", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("x", w.s);
+ w = (Wibble)iter.next();
+ assertEquals("y", w.s);
+ assertFalse(iter.hasNext());
+
+ }
+
+
+ public void testClear()
+ {
+ list.addLast(a, 0);
+ list.addLast(b, 3);
+ list.addLast(c, 3);
+ list.addLast(d, 3);
+ list.addLast(e, 6);
+ list.addLast(f, 6);
+ list.addLast(g, 6);
+ list.addLast(h, 9);
+ list.addLast(i, 9);
+ list.addLast(j, 9);
+
+ list.clear();
+
+ assertNull(list.removeFirst());
+
+ assertTrue(list.getAll().isEmpty());
+ }
+
+ class Wibble
+ {
+ String s;
+ Wibble(String s)
+ {
+ this.s = s;
+ }
+ public String toString()
+ {
+ return s;
+ }
+ }
+
+}
+
More information about the jboss-cvs-commits
mailing list