[jboss-cvs] JBoss Messaging SVN: r2848 - in trunk: src/main/org/jboss/jms/client/container and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 5 18:27:27 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-07-05 18:27:27 -0400 (Thu, 05 Jul 2007)
New Revision: 2848
Removed:
trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveryTest.java
Modified:
trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java
trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
Log:
reverting http://jira.jboss.com/jira/browse/JBMESSAGING-1006
Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -157,9 +157,6 @@
{
log.error("Failover failed", e);
- // Marking delegate as invalid!
- state.getDelegate().invalidate();
-
throw e;
}
finally
Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -51,23 +51,22 @@
public class ClosedInterceptor implements Interceptor
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(ClosedInterceptor.class);
-
+
private static final int NOT_CLOSED = 0;
private static final int IN_CLOSING = 1;
private static final int CLOSING = 2;
private static final int IN_CLOSE = 3; // performing the close
private static final int CLOSED = -1;
- private static final int INVALID = -2; // the Delegate was marked invalid by a bad failover
// Attributes ----------------------------------------------------
-
+
private boolean trace = log.isTraceEnabled();
// The current state of the object guarded by this interceptor
private int state = NOT_CLOSED;
-
+
// The inuse count
private int inUseCount;
@@ -82,10 +81,9 @@
state == IN_CLOSING ? "IN_CLOSING" :
state == CLOSING ? "CLOSING" :
state == IN_CLOSE ? "IN_CLOSE" :
- state == CLOSED ? "CLOSED" :
- state == INVALID ? "INVALID" : "UNKNOWN";
+ state == CLOSED ? "CLOSED" : "UNKNOWN";
}
-
+
// Constructors --------------------------------------------------
public ClosedInterceptor()
@@ -126,17 +124,14 @@
{
id = DelegateIdentity.getIdentity(invocation);
}
-
+
String methodName = ((MethodInvocation)invocation).getMethod().getName();
- log.trace("Invoke on ClosedInterceptor = " + methodName + " with state = " + stateToString(state));
-
boolean isClosing = methodName.equals("closing");
boolean isClose = methodName.equals("close");
- boolean isInvalidate = methodName.equals("invalidate");
-
+
if (isClosing)
- {
+ {
if (checkClosingAlreadyDone())
{
return new Long(-1);
@@ -149,22 +144,11 @@
return null;
}
}
- else if (isInvalidate)
- {
- state = INVALID;
- invalidateRelatives(invocation);
- return null;
- }
else
{
synchronized(this)
{
// object "in use", increment inUseCount
- if (state == INVALID)
- {
- throw new IllegalStateException("The delegator is invalid, look at logs as failover probably couldn't complete");
- }
- else
if (state == IN_CLOSE || state == CLOSED)
{
log.error(this + ": method " + methodName + "() did not go through, " +
@@ -200,7 +184,7 @@
throw t;
}
finally
- {
+ {
if (isClosing)
{
// We make sure we remove ourself AFTER the invocation has been made otherwise in a
@@ -208,7 +192,7 @@
// occur properly since failover would not be able to traverse the hierarchy and update
// the delegates properly
removeSelf(invocation);
-
+
closing();
}
else if (isClose)
@@ -226,7 +210,7 @@
/**
* Check the closing notification has not already been done
- *
+ *
* @return true when already closing or closed
*/
protected synchronized boolean checkClosingAlreadyDone() throws Throwable
@@ -250,7 +234,7 @@
/**
* Check the close has not already been done and
* wait for all invocations to complete
- *
+ *
* @return true when already closed
*/
protected synchronized boolean checkCloseAlreadyDone() throws Throwable
@@ -275,9 +259,9 @@
state = CLOSED;
log.debug(this + " closed");
}
-
+
/**
- * Mark the object as no longer inuse
+ * Mark the object as no longer inuse
*/
protected synchronized void done() throws Throwable
{
@@ -287,44 +271,7 @@
}
}
-
/**
- * Invalidate children
- *
- * @param invocation the invocation
- */
- protected void invalidateRelatives(Invocation invocation)
- {
- HierarchicalState state = ((DelegateSupport)invocation.getTargetObject()).getState();
-
- // We use a clone to avoid a deadlock where requests are made to close parent and child
- // concurrently
-
- Set clone;
-
- Set children = state.getChildren();
-
- if (children == null)
- {
- if (trace) { log.trace(this + " has no children"); }
- return;
- }
-
- synchronized (children)
- {
- clone = new HashSet(children);
- }
-
- // Cycle through the children this will do a depth first close
- for (Iterator i = clone.iterator(); i.hasNext();)
- {
- HierarchicalState child = (HierarchicalState)i.next();
- DelegateSupport del = (DelegateSupport)child.getDelegate();
- del.invalidate();
- }
- }
-
- /**
* Close children and remove from parent
*
* @param invocation the invocation
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -71,12 +71,6 @@
// DelegateSupport overrides --------------------------------------------------------------------
- public void invalidate()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
-
public void synchronizeWith(DelegateSupport nd) throws Exception
{
super.synchronizeWith(nd);
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -99,11 +99,6 @@
// DelegateSupport overrides --------------------------------------------------------------------
- public void invalidate()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
public void synchronizeWith(DelegateSupport nd) throws Exception
{
log.debug(this + " synchronizing with " + nd);
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -259,11 +259,6 @@
super.synchronizeWith(newDelegate);
}
- public void invalidate()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
// Protected ------------------------------------------------------------------------------------
// Package Private ------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -80,12 +80,6 @@
// DelegateSupport overrides --------------------------------------------------------------------
- public void invalidate()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
-
public void synchronizeWith(DelegateSupport nd) throws Exception
{
log.debug(this + " synchronizing with " + nd);
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -57,12 +57,6 @@
// DelegateSupport overrides --------------------------------------------------------------------
- public void invalidate()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
-
public void synchronizeWith(DelegateSupport nd) throws Exception
{
super.synchronizeWith(nd);
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -107,12 +107,6 @@
// DelegateSupport overrides --------------------------------------------------------------------
- public void invalidate()
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
-
public void synchronizeWith(DelegateSupport nd) throws Exception
{
log.debug(this + " synchronizing with " + nd);
Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -137,9 +137,6 @@
return id;
}
- public abstract void invalidate();
-
-
/**
* During HA events, delegates corresponding to new enpoints on the new server are created and
* the state of those delegates has to be transfered to the "failed" delegates. For example, a
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -337,15 +337,7 @@
}
log.debug(this + " sending delivery recovery " + recoveryInfos + " on failover");
- try
- {
- newDelegate.recoverDeliveries(recoveryInfos);
- }
- catch (Exception e)
- {
- log.error(e.toString(),e);
- this.getDelegate().invalidate();
- }
+ newDelegate.recoverDeliveries(recoveryInfos);
}
else
{
Modified: trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -57,6 +57,4 @@
void registerFailoverListener(FailoverListener failoverListener);
boolean unregisterFailoverListener(FailoverListener failoverListener);
-
- void invalidate();
}
Modified: trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -52,6 +52,4 @@
String getMessageSelector();
Message receive(long timeout) throws JMSException;
-
- void invalidate();
}
Modified: trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -69,6 +69,4 @@
int deliveryMode,
int priority,
long timeToLive) throws JMSException;
-
- void invalidate();
}
Modified: trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -95,6 +95,4 @@
ProducerDelegate createProducerDelegate(JBossDestination destination) throws JMSException;
void acknowledgeAll() throws JMSException;
-
- void invalidate();
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -434,11 +434,6 @@
acks.add(deliveryInfo);
}
- // A result for each queue's recovery result
- // Putting results on a separate HashMap to guarantee atomicity on the execution
- // on this method
- Map resultRecoveredAck = new HashMap();
-
Iterator iter = ackMap.entrySet().iterator();
while (iter.hasNext())
@@ -486,34 +481,11 @@
Queue expiryQueueToUse =
dest.getExpiryQueue() == null ? defaultExpiryQueue : dest.getExpiryQueue();
+ int maxDeliveryAttemptsToUse =
+ dest.getMaxDeliveryAttempts() == -1 ? defaultMaxDeliveryAttempts : dest.getMaxDeliveryAttempts();
+
List dels = queue.recoverDeliveries(ids);
- resultRecoveredAck.put(queue, new Object[]{dels, acks, dlqToUse, expiryQueueToUse, dest});
- }
-
- // queue.recoverDeliveries could fail...
- // I have separated this next loop from the previous loop, as I wanted the whole recoveryDeliveries
- // to be an atomic operation. If anything goes wrong on recoverDeliveries we will keep everything
- // as it used to be.. no changes whatsoever until every single message was found on recoverDeliveries
-
- Iterator iterResults = resultRecoveredAck.entrySet().iterator();
-
-
- while (iterResults.hasNext())
- {
-
- Map.Entry entry = (Map.Entry) iterResults.next();
-
- Queue queue = (Queue)entry.getKey();
-
- Object[] value = (Object[]) entry.getValue();
-
- List dels = (List)value[0];
- List acks = (List)value[1];
- Queue dlqToUse = (Queue)value[2];
- Queue expiryQueueToUse = (Queue) value[3];
- ManagedDestination dest = (ManagedDestination) value[4];
-
Iterator iter2 = dels.iterator();
Iterator iter3 = acks.iterator();
@@ -530,9 +502,6 @@
if (trace) { log.trace(this + " Recovered delivery " + deliveryId + ", " + del); }
- int maxDeliveryAttemptsToUse =
- dest.getMaxDeliveryAttempts() == -1 ? defaultMaxDeliveryAttempts : dest.getMaxDeliveryAttempts();
-
deliveries.put(new Long(deliveryId),
new DeliveryRecord(del, -1, dlqToUse,
expiryQueueToUse, dest.getRedeliveryDelay(), maxDeliveryAttemptsToUse));
Modified: trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -375,9 +375,6 @@
List dels = new ArrayList();
- // This operation needs to be atomic.. if it fails we will rollback before throwing the exception
- ArrayList refsRemoved = new ArrayList();
-
synchronized (lock)
{
ListIterator liter = messageRefs.iterator();
@@ -394,26 +391,13 @@
// TODO we need to look in paging state too - currently not supported
//http://jira.jboss.com/jira/browse/JBMESSAGING-839
log.warn(this + " cannot find reference " + id + " (Might be paged!)");
-
- log.trace(this + " Adding references back to the list");
-
- // Adding references back to messageRefs... keeping the same order they were removed
- while (refsRemoved.size()>0)
- {
- MessageReference refAddBack = (MessageReference)refsRemoved.remove(refsRemoved.size()-1);
- log.trace("Adding " + refAddBack + " back to messageRefs before throwing exception");
- messageRefs.addFirst(refAddBack, refAddBack.getMessage().getPriority());
- }
-
- throw new IllegalStateException("Cannot find reference " + id);
+ break;
}
-
+
MessageReference ref = (MessageReference)liter.next();
if (ref.getMessage().getMessageID() == id.longValue())
{
- refsRemoved.add(ref);
-
liter.remove();
Delivery del = new SimpleDelivery(this, ref);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -98,9 +98,9 @@
{
JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
- Connection conn1 = factory.createConnection();
- Connection conn2 = factory.createConnection();
- Connection conn3 = factory.createConnection();
+ Connection conn1 = createConnectionOnServer(factory,0);
+ Connection conn2 = createConnectionOnServer(factory,1);
+ Connection conn3 = createConnectionOnServer(factory,2);
Connection[] conn = new Connection[]{conn1, conn2, conn3};
@@ -196,13 +196,13 @@
{
conn1 = createConnectionOnServer(factory, 0); //server 0
- conn2 = factory.createConnection(); //server 1
+ conn2 = createConnectionOnServer(factory, 1); //server 1
- conn3 = factory.createConnection(); //server 2
+ conn3 = createConnectionOnServer(factory, 2); //server 2
- conn4 = factory.createConnection(); //server 0
+ conn4 = createConnectionOnServer(factory, 0); //server 0
- conn5 = factory.createConnection(); //server 1
+ conn5 = createConnectionOnServer(factory, 1); //server 1
int serverID1 = getServerId(conn1);
@@ -618,74 +618,114 @@
}
- // This test needs to be removed when http://jira.jboss.org/jira/browse/JBMESSAGING-883
- // is fixed.
- //
- // This test will create two sessions on server1
- // One consumer on each session... one for queue, another to anotherQueue
- // Send 100 messages on producer1
- // Receive 50 messages on consumer1
- // Kill the server
- // Validate if the session was invalidated after failover
- // Receive 100 messages again in another consumer on server2
- // Validate if session1b was still valid.. sending and consuming messages...
- // the session should still be avlie
- //
- //
- public void testInvalidateSession() throws Exception
+ public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory )ic[0].lookup("/ClusteredConnectionFactory");
- for (int i=0; i< nodeCount; i++)
- {
- ServerManagement.deployQueue("anotherQueue", i);
- }
+ ClientClusteredConnectionFactoryDelegate delegate =
+ (ClientClusteredConnectionFactoryDelegate)factory.getDelegate();
- Queue anotherQueue = (Queue)ic[1].lookup("queue/anotherQueue");
+ Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+ assertEquals(3, nodeIDView.size());
- JBossConnection conn0 = (JBossConnection) factory.createConnection();
- JBossConnection conn1 = (JBossConnection) factory.createConnection();
- JBossConnection conn2 = (JBossConnection) factory.createConnection();
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+ ClientConnectionFactoryDelegate cf1 = delegates[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegates[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegates[2];
+
+ int server0Id = cf1.getServerID();
+
+ int server1Id = cf2.getServerID();
+
+ int server2Id = cf3.getServerID();
+
+ log.info("server 0 id: " + server0Id);
+
+ log.info("server 1 id: " + server1Id);
+
+ log.info("server 2 id: " + server2Id);
+
+ assertEquals(0, server0Id);
+
+ assertEquals(1, server1Id);
+
+ assertEquals(2, server2Id);
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info(failoverMap.get(new Integer(server0Id)));
+ log.info(failoverMap.get(new Integer(server1Id)));
+ log.info(failoverMap.get(new Integer(server2Id)));
+
+ int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+
+ // server 1 should failover onto server 2
+
+ assertEquals(server2Id, server1FailoverId);
+
+ Connection conn = null;
+
+ boolean killed = false;
+
try
{
- assertEquals(0, getServerId(conn0));
- assertEquals(1, getServerId(conn1));
- assertEquals(2, getServerId(conn2));
+ conn = createConnectionOnServer(factory, 1);
+ JBossConnection jbc = (JBossConnection)conn;
- Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer1 = session1.createProducer(queue[1]);
- MessageConsumer consumer1 = session1.createConsumer(queue[1]);
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
- Session session1b = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer1b = session1b.createProducer(anotherQueue);
- MessageConsumer consumer1b = session1b.createConsumer(anotherQueue);
+ ConnectionState state = (ConnectionState)del.getState();
- conn1.start();
+ int initialServerID = state.getServerID();
+ assertEquals(1, initialServerID);
+ Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer2 = session2.createConsumer(queue[2]);
- conn2.start();
+ MessageProducer prod = sess.createProducer(queue[1]);
- for (int i=0; i<100; i++)
+ MessageConsumer cons = sess.createConsumer(queue[1]);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
- producer1.send(session1.createTextMessage("Message:" + i));
+ TextMessage tm = sess.createTextMessage("message:" + i);
+
+ prod.send(tm);
}
- for (int i=0; i<50; i++)
+ conn.start();
+
+ //Now consume half of the messages but don't ack them these will end up in
+ //client side toAck list
+
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
- TextMessage msg = (TextMessage )consumer1.receive(1000);
- assertEquals("Message:" + i, msg.getText());
+ TextMessage tm = (TextMessage)cons.receive(500);
+
+ assertNotNull(tm);
+
+ assertEquals("message:" + i, tm.getText());
}
+ //So now, messages should be in queue[1] on server 1
+ //So we now kill server 1
+ //Which should cause transparent failover of connection conn onto server 1
+
+ log.info("here we go");
+ log.info("######");
log.info("###### KILLING (CRASHING) SERVER 1");
log.info("######");
ServerManagement.kill(1);
+ killed = true;
+
long sleepTime = 30;
log.info("killed server, now waiting for " + sleepTime + " seconds");
@@ -695,270 +735,77 @@
log.info("done wait");
- assertEquals(2, getServerId(conn1));
+ state = (ConnectionState)del.getState();
- try
- {
- log.info("########################## Consuming message on failed consumer");
- Message msg = consumer1.receive(1000);
- log.info("########################## Message consumed on failed consumer! " + msg);
- // It is supposed to fail, as ACKs won't be recovered due to the other active client
- fail("Consumer on server1 was supposed to fail!");
- }
- catch (JMSException failed)
- {
- log.info("Expected exception after consumer.receive - " + failed);
- }
+ int finalServerID = state.getServerID();
- for (int i=0; i<100; i++)
- {
- TextMessage msg = (TextMessage)consumer2.receive(1000);
- log.info("Received " + msg.getText());
- }
+ log.info("final server id= " + finalServerID);
+ //server id should now be 2
- // While one session was failed... session1b is supposed to be valid
- for (int i=0; i<10; i++)
- {
- producer1b.send(session1b.createTextMessage("MessageB:" + i));
- }
+ assertEquals(2, finalServerID);
- for (int i=0; i<10; i++)
- {
- TextMessage msg = (TextMessage)consumer1b.receive(1000);
- assertNotNull(msg);
- assertEquals("MessageB:" + i, msg.getText());
- log.info("Received " + msg.getText());
- }
+ conn.start();
+ //Now should be able to consume the rest of the messages
- try
+ log.info("here1");
+
+ TextMessage tm = null;
+
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
{
- session1.createConsumer(queue[1]);
- // the session was invalidated!
- fail("This call was supposed to fail!");
+ tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ log.debug("message is " + tm.getText());
+
+ assertEquals("message:" + i, tm.getText());
}
- catch (JMSException failed)
- {
- log.info("Expected exception on session1.createConsumer(queue[1])" + failed);
- }
+ log.info("here2");
- // this is not supposed to fail
- session1b.createConsumer(anotherQueue);
+ //Now should be able to acknowledge them
+ tm.acknowledge();
+ //Now check there are no more messages there
+ sess.close();
+
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ cons = sess.createConsumer(queue[1]);
+
+ Message m = cons.receive(500);
+
+ assertNull(m);
+
+ log.info("got to end of test");
}
finally
{
- try { conn0.close();} catch (Throwable ignored){}
- try { conn1.close();} catch (Throwable ignored){}
- try { conn2.close();} catch (Throwable ignored){}
-
- for (int i=0; i< nodeCount; i++)
+ if (conn != null)
{
- try{ServerManagement.undeployQueue("anotherQueue", i);} catch (Throwable ignored){}
+ try
+ {
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
}
+ // Resurrect dead server
+ if (killed)
+ {
+ ServerManagement.start(1, "all");
+ }
}
+
}
-
-
-
-// TODO: Reactivate this test when http://jira.jboss.org/jira/browse/JBMESSAGING-883 is done
-// public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic[0].lookup("/ClusteredConnectionFactory");
-//
-// ClientClusteredConnectionFactoryDelegate delegate =
-// (ClientClusteredConnectionFactoryDelegate)factory.getDelegate();
-//
-// Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
-// assertEquals(3, nodeIDView.size());
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// ClientConnectionFactoryDelegate cf1 = delegates[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegates[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegates[2];
-//
-// int server0Id = cf1.getServerID();
-//
-// int server1Id = cf2.getServerID();
-//
-// int server2Id = cf3.getServerID();
-//
-// log.info("server 0 id: " + server0Id);
-//
-// log.info("server 1 id: " + server1Id);
-//
-// log.info("server 2 id: " + server2Id);
-//
-// assertEquals(0, server0Id);
-//
-// assertEquals(1, server1Id);
-//
-// assertEquals(2, server2Id);
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info(failoverMap.get(new Integer(server0Id)));
-// log.info(failoverMap.get(new Integer(server1Id)));
-// log.info(failoverMap.get(new Integer(server2Id)));
-//
-// int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
-//
-// // server 1 should failover onto server 2
-//
-// assertEquals(server2Id, server1FailoverId);
-//
-// Connection conn = null;
-//
-// boolean killed = false;
-//
-// try
-// {
-// conn = factory.createConnection(); //connection on server 0
-//
-// conn.close();
-//
-// conn = factory.createConnection(); //connection on server 1
-//
-// JBossConnection jbc = (JBossConnection)conn;
-//
-// ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
-//
-// ConnectionState state = (ConnectionState)del.getState();
-//
-// int initialServerID = state.getServerID();
-//
-// assertEquals(1, initialServerID);
-//
-// Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-//
-// MessageProducer prod = sess.createProducer(queue[1]);
-//
-// MessageConsumer cons = sess.createConsumer(queue[1]);
-//
-// final int NUM_MESSAGES = 100;
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = sess.createTextMessage("message:" + i);
-//
-// prod.send(tm);
-// }
-//
-// conn.start();
-//
-// //Now consume half of the messages but don't ack them these will end up in
-// //client side toAck list
-//
-// for (int i = 0; i < NUM_MESSAGES / 2; i++)
-// {
-// TextMessage tm = (TextMessage)cons.receive(500);
-//
-// assertNotNull(tm);
-//
-// assertEquals("message:" + i, tm.getText());
-// }
-//
-// //So now, messages should be in queue[1] on server 1
-// //So we now kill server 1
-// //Which should cause transparent failover of connection conn onto server 1
-//
-// log.info("here we go");
-// log.info("######");
-// log.info("###### KILLING (CRASHING) SERVER 1");
-// log.info("######");
-//
-// ServerManagement.kill(1);
-//
-// killed = true;
-//
-// long sleepTime = 30;
-//
-// log.info("killed server, now waiting for " + sleepTime + " seconds");
-//
-// // NOTE: the sleep time needs to be longer than the Remoting connector's lease period
-// Thread.sleep(sleepTime * 1000);
-//
-// log.info("done wait");
-//
-// state = (ConnectionState)del.getState();
-//
-// int finalServerID = state.getServerID();
-//
-// log.info("final server id= " + finalServerID);
-//
-// //server id should now be 2
-//
-// assertEquals(2, finalServerID);
-//
-// conn.start();
-//
-// //Now should be able to consume the rest of the messages
-//
-// log.info("here1");
-//
-// TextMessage tm = null;
-//
-// for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
-// {
-// tm = (TextMessage)cons.receive(1000);
-//
-// assertNotNull(tm);
-//
-// log.debug("message is " + tm.getText());
-//
-// assertEquals("message:" + i, tm.getText());
-// }
-//
-// log.info("here2");
-//
-// //Now should be able to acknowledge them
-//
-// tm.acknowledge();
-//
-// //Now check there are no more messages there
-// sess.close();
-//
-// sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// cons = sess.createConsumer(queue[1]);
-//
-// Message m = cons.receive(500);
-//
-// assertNull(m);
-//
-// log.info("got to end of test");
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// try
-// {
-// conn.close();
-// }
-// catch (Exception e)
-// {
-// e.printStackTrace();
-// }
-// }
-//
-// // Resurrect dead server
-// if (killed)
-// {
-// ServerManagement.start(1, "all");
-// }
-// }
-//
-// }
-//
/*
TODO: Reactivate this test when http://jira.jboss.org/jira/browse/JBMESSAGING-883 is done
@@ -1159,123 +1006,77 @@
Destination destination = (Destination) topic[1];
- Connection conn1 = cf.createConnection();
- Connection conn2 = cf.createConnection();
+ JBossConnection conn = (JBossConnection)createConnectionOnServer(cf, 1);
- JBossConnection conn =
- (JBossConnection) getConnection(new Connection[]{conn1, conn2}, 1);
-
conn.setClientID("testClient");
conn.start();
- JBossSession session = (JBossSession) conn.createSession(true, Session.SESSION_TRANSACTED);
- ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
- SessionState sessionState = (SessionState) clientSessionDelegate.getState();
+ try
+ {
- MessageConsumer consumerHA = session.createDurableSubscriber((Topic) destination, "T1");
- JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
+ JBossSession session = (JBossSession) conn.createSession(true, Session.SESSION_TRANSACTED);
+ ClientSessionDelegate clientSessionDelegate = (ClientSessionDelegate) session.getDelegate();
+ SessionState sessionState = (SessionState) clientSessionDelegate.getState();
- org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate =
- (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
- ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
+ MessageConsumer consumerHA = session.createDurableSubscriber((Topic) destination, "T1");
+ JBossMessageConsumer jbossConsumerHA = (JBossMessageConsumer) consumerHA;
- log.info("subscriptionName=" + consumerState.getSubscriptionName());
+ org.jboss.jms.client.delegate.ClientConsumerDelegate clientDelegate =
+ (org.jboss.jms.client.delegate.ClientConsumerDelegate) jbossConsumerHA.getDelegate();
+ ConsumerState consumerState = (ConsumerState) clientDelegate.getState();
- log.info(">>Creating Producer");
- MessageProducer producer = session.createProducer(destination);
- log.info(">>creating Message");
- Message message = session.createTextMessage("Hello Before");
- log.info(">>sending Message");
- producer.send(message);
- session.commit();
+ log.info("subscriptionName=" + consumerState.getSubscriptionName());
- receiveMessage("consumerHA", consumerHA, true, false);
+ log.info(">>Creating Producer");
+ MessageProducer producer = session.createProducer(destination);
+ log.info(">>creating Message");
+ Message message = session.createTextMessage("Hello Before");
+ log.info(">>sending Message");
+ producer.send(message);
+ session.commit();
- session.commit();
- //if (true) return;
+ receiveMessage("consumerHA", consumerHA, true, false);
- Object txID = sessionState.getCurrentTxId();
+ session.commit();
+ //if (true) return;
- producer.send(session.createTextMessage("Hello again before failover"));
+ Object txID = sessionState.getCurrentTxId();
- ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
+ producer.send(session.createTextMessage("Hello again before failover"));
- JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
+ ClientConnectionDelegate delegate = (ClientConnectionDelegate) conn.getDelegate();
- ServerManagement.kill(1);
+ JMSRemotingConnection originalRemoting = delegate.getRemotingConnection();
- Thread.sleep(30000);
- // if failover happened, this object was replaced
- assertNotSame(originalRemoting, delegate.getRemotingConnection());
+ ServerManagement.kill(1);
- message = session.createTextMessage("Hello After");
- log.info(">>Sending new message");
- producer.send(message);
+ Thread.sleep(30000);
+ // if failover happened, this object was replaced
+ assertNotSame(originalRemoting, delegate.getRemotingConnection());
- assertEquals(txID, sessionState.getCurrentTxId());
- log.info(">>Final commit");
+ message = session.createTextMessage("Hello After");
+ log.info(">>Sending new message");
+ producer.send(message);
- session.commit();
+ assertEquals(txID, sessionState.getCurrentTxId());
+ log.info(">>Final commit");
- log.info("Calling alternate receiver");
- receiveMessage("consumerHA", consumerHA, true, false);
- receiveMessage("consumerHA", consumerHA, true, false);
- receiveMessage("consumerHA", consumerHA, true, true);
+ session.commit();
- session.commit();
- conn1.close();
- conn2.close();
- }
+ log.info("Calling alternate receiver");
+ receiveMessage("consumerHA", consumerHA, true, false);
+ receiveMessage("consumerHA", consumerHA, true, false);
+ receiveMessage("consumerHA", consumerHA, true, true);
- public void testInvalidate() throws Exception
- {
- JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
-
- Connection conn1 = factory.createConnection();
- JBossConnection conn2 = (JBossConnection)factory.createConnection();
- conn1.close();
-
- try
+ session.commit();
+ }
+ finally
{
- Session session = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- conn2.getDelegate().invalidate();
-
- try
+ if (conn!=null)
{
- Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- fail ("delegate supposed to fail as connection was invalidated!");
+ try { conn.close(); } catch (Throwable ignored) {}
}
- catch (javax.jms.IllegalStateException e)
- {
- log.info("Caught expected exception - " + e);
- }
-
- try
- {
- conn2.start();
- fail ("delegate supposed to fail as connection was invalidated!");
- }
- catch (javax.jms.IllegalStateException e)
- {
- log.info("Caught expected exception - " + e);
- }
-
- try
- {
- MessageConsumer consumer = session.createConsumer(queue[1]);
- fail ("delegate supposed to fail as connection was invalidated!");
- }
- catch (javax.jms.IllegalStateException e)
- {
- log.info("Caught expected exception - " + e);
- }
}
- finally
- {
- conn2.close(); // we should still be able to close invalidated clients!
- }
-
}
// Package protected ---------------------------------------------
Deleted: trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveryTest.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveryTest.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -1,168 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.test.messaging.jms.clustering;
-
-import org.jboss.jms.client.JBossConnectionFactory;
-import org.jboss.jms.client.JBossConnection;
-import org.jboss.jms.client.JBossSession;
-import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
-import org.jboss.jms.delegate.DeliveryRecovery;
-import org.jboss.jms.delegate.SessionDelegate;
-import org.jboss.jms.message.MessageProxy;
-import org.jboss.jms.exception.MessagingJMSException;
-import javax.jms.Session;
-import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
-import javax.jms.TextMessage;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-/**
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class RecoverDeliveryTest extends ClusteringTestBase
-{
-
- // Constants ------------------------------------------------------------------------------------
-
- // Attributes -----------------------------------------------------------------------------------
-
- // Static ---------------------------------------------------------------------------------------
-
- // Constructors ---------------------------------------------------------------------------------
-
- public RecoverDeliveryTest(String name)
- {
- super(name);
- }
-
- public void testAtomicRecover() throws Exception
- {
- JBossConnectionFactory factory = (JBossConnectionFactory )ic[0].lookup("/ClusteredConnectionFactory");
-
- JBossConnection conn0 = (JBossConnection) factory.createConnection();
- assertEquals(0, getServerId(conn0));
-
- try
- {
- assertEquals(0, getServerId(conn0));
-
- JBossSession session0 = (JBossSession)conn0.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer0 = session0.createProducer(queue[0]);
- MessageConsumer consumer0 = session0.createConsumer(queue[0]);
- conn0.start();
-
- for (int i=0; i<100; i++)
- {
- producer0.send(session0.createTextMessage("Message:" + i));
- }
-
- ArrayList list = new ArrayList();
-
- for (int i=0; i<10; i++)
- {
- Object msg = consumer0.receive(1000);
- list.add(msg);
- }
-
- conn0.close();
-
- conn0 = (JBossConnection) factory.createConnection();
- assertEquals(0, getServerId(conn0));
-
- session0 = (JBossSession)conn0.createSession(true, Session.SESSION_TRANSACTED);
- producer0 = session0.createProducer(queue[0]);
- consumer0 = session0.createConsumer(queue[0]);
-
-
- ArrayList recoveries = new ArrayList();
- for (Iterator iter = list.iterator(); iter.hasNext();)
- {
- MessageProxy msgProxy = (MessageProxy)iter.next();
- DeliveryRecovery recovery = new DeliveryRecovery(msgProxy.getDeliveryId(),
- msgProxy.getMessage().getMessageID(), queue[0].getQueueName());
- recoveries.add(recovery);
- }
- // Adding an invalid Delivery... so recoverDeliveries is supposed to fail
- recoveries.add(new DeliveryRecovery(9999,9999,queue[0].getQueueName()));
-
-
- SessionDelegate delegateSession = session0.getDelegate();
-
- log.info("Sending recoverDeliveries");
- try
- {
- delegateSession.recoverDeliveries(recoveries);
- fail("recoverDeliveries was supposed to fail!");
- }
- catch (MessagingJMSException e)
- {
- log.info("Receiving expected failure! -> " + e);
- }
-
- conn0.start();
-
- log.info("Receiving messages");
-
- // Since recoverDeliveries is supposed to be atomic now.. this is supposed to receive 100 messages
- for (int i=0; i<100; i++)
- {
- TextMessage msg = (TextMessage) consumer0.receive(1000);
-
- assertNotNull(msg);
- // testing order
- assertEquals("Message:" + i, msg.getText());
-
- log.info("Received " + msg.getText());
- }
- }
- finally
- {
- try { conn0.close();} catch (Throwable ignored){}
- }
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- protected void setUp() throws Exception
- {
- nodeCount = 1;
-
- super.setUp();
-
- log.debug("setup done");
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
- // Private --------------------------------------------------------------------------------------
-
- // Inner classes --------------------------------------------------------------------------------
-
-}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2007-07-05 17:35:03 UTC (rev 2847)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2007-07-05 22:27:27 UTC (rev 2848)
@@ -759,7 +759,9 @@
// We poison node 1 so that it crashes after prepare but before commit is processed
ServerManagement.poisonTheServer(1, PoisonInterceptor.TYPE_2PC_COMMIT);
-
+
+
+ log.info("################################################################## Sending a commit");
tm.commit();
log.info("########");
More information about the jboss-cvs-commits
mailing list