[jboss-cvs] JBoss Messaging SVN: r2819 - in trunk: src/main/org/jboss/jms/client/container and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Jun 30 00:48:45 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-06-30 00:48:44 -0400 (Sat, 30 Jun 2007)
New Revision: 2819
Modified:
trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java
trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1006
Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -153,7 +153,10 @@
catch (Exception e)
{
log.error("Failover failed", e);
-
+
+ // Marking delegate as invalid!
+ state.getDelegate().invalidate();
+
throw e;
}
finally
@@ -171,6 +174,10 @@
else
{
log.debug(this + " aborted failover");
+ ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)state.getDelegate();
+ connDelegate.closing();
+ connDelegate.close();
+
broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_FAILED, this));
}
}
Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -59,7 +59,8 @@
private static final int CLOSING = 2;
private static final int IN_CLOSE = 3; // performing the close
private static final int CLOSED = -1;
-
+ private static final int INVALID = -2; // the Delegate was marked invalid by a bad failover
+
// Attributes ----------------------------------------------------
private boolean trace = log.isTraceEnabled();
@@ -81,7 +82,8 @@
state == IN_CLOSING ? "IN_CLOSING" :
state == CLOSING ? "CLOSING" :
state == IN_CLOSE ? "IN_CLOSE" :
- state == CLOSED ? "CLOSED" : "UNKNOWN";
+ state == CLOSED ? "CLOSED" :
+ state == INVALID ? "INVALID" : "UNKNOWN";
}
// Constructors --------------------------------------------------
@@ -126,9 +128,12 @@
}
String methodName = ((MethodInvocation)invocation).getMethod().getName();
+
+ log.trace("Invoke on ClosedInterceptor = " + methodName + " with state = " + stateToString(state));
boolean isClosing = methodName.equals("closing");
boolean isClose = methodName.equals("close");
+ boolean isInvalidate = methodName.equals("invalidate");
if (isClosing)
{
@@ -144,11 +149,22 @@
return null;
}
}
+ else if (isInvalidate)
+ {
+ state = INVALID;
+ invalidateRelatives(invocation);
+ return null;
+ }
else
{
synchronized(this)
{
// object "in use", increment inUseCount
+ if (state == INVALID)
+ {
+ throw new IllegalStateException("The delegator is invalid, look at logs as failover probably couldn't complete");
+ }
+ else
if (state == IN_CLOSE || state == CLOSED)
{
log.error(this + ": method " + methodName + "() did not go through, " +
@@ -271,9 +287,46 @@
}
}
+
/**
+ * Invalidate children
+ *
+ * @param invocation the invocation
+ */
+ protected void invalidateRelatives(Invocation invocation)
+ {
+ HierarchicalState state = ((DelegateSupport)invocation.getTargetObject()).getState();
+
+ // We use a clone to avoid a deadlock where requests are made to close parent and child
+ // concurrently
+
+ Set clone;
+
+ Set children = state.getChildren();
+
+ if (children == null)
+ {
+ if (trace) { log.trace(this + " has no children"); }
+ return;
+ }
+
+ synchronized (children)
+ {
+ clone = new HashSet(children);
+ }
+
+ // Cycle through the children this will do a depth first close
+ for (Iterator i = clone.iterator(); i.hasNext();)
+ {
+ HierarchicalState child = (HierarchicalState)i.next();
+ DelegateSupport del = (DelegateSupport)child.getDelegate();
+ del.invalidate();
+ }
+ }
+
+ /**
* Close children and remove from parent
- *
+ *
* @param invocation the invocation
*/
protected void maintainRelatives(Invocation invocation)
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -71,6 +71,12 @@
// DelegateSupport overrides --------------------------------------------------------------------
+ public void invalidate()
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+
public void synchronizeWith(DelegateSupport nd) throws Exception
{
super.synchronizeWith(nd);
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -99,6 +99,11 @@
// DelegateSupport overrides --------------------------------------------------------------------
+ public void invalidate()
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
public void synchronizeWith(DelegateSupport nd) throws Exception
{
log.debug(this + " synchronizing with " + nd);
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -258,8 +258,12 @@
{
super.synchronizeWith(newDelegate);
}
-
+ public void invalidate()
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
// Protected ------------------------------------------------------------------------------------
// Package Private ------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -80,6 +80,12 @@
// DelegateSupport overrides --------------------------------------------------------------------
+ public void invalidate()
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+
public void synchronizeWith(DelegateSupport nd) throws Exception
{
log.debug(this + " synchronizing with " + nd);
@@ -88,6 +94,10 @@
ClientConsumerDelegate newDelegate = (ClientConsumerDelegate)nd;
+ // The client needs to be set first
+ client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
+ getRemotingClient();
+
// synchronize server endpoint state
// synchronize (recursively) the client-side state
@@ -99,8 +109,6 @@
bufferSize = newDelegate.getBufferSize();
maxDeliveries = newDelegate.getMaxDeliveries();
- client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
- getRemotingClient();
}
public void setState(HierarchicalState state)
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -57,6 +57,12 @@
// DelegateSupport overrides --------------------------------------------------------------------
+ public void invalidate()
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+
public void synchronizeWith(DelegateSupport nd) throws Exception
{
super.synchronizeWith(nd);
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -107,6 +107,12 @@
// DelegateSupport overrides --------------------------------------------------------------------
+ public void invalidate()
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+
public void synchronizeWith(DelegateSupport nd) throws Exception
{
log.debug(this + " synchronizing with " + nd);
Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -136,6 +136,9 @@
{
return id;
}
+
+ public abstract void invalidate();
+
/**
* During HA events, delegates corresponding to new enpoints on the new server are created and
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -337,7 +337,16 @@
}
log.debug(this + " sending delivery recovery " + recoveryInfos + " on failover");
- newDelegate.recoverDeliveries(recoveryInfos);
+ try
+ {
+ newDelegate.recoverDeliveries(recoveryInfos);
+ }
+ catch (Exception e)
+ {
+ log.error(e.toString(),e);
+ log.info("RecoverDeliveries failed, marking session as invalidated!");
+ this.getDelegate().invalidate();
+ }
}
else
{
Modified: trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -57,4 +57,6 @@
void registerFailoverListener(FailoverListener failoverListener);
boolean unregisterFailoverListener(FailoverListener failoverListener);
+
+ void invalidate();
}
Modified: trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -52,4 +52,6 @@
String getMessageSelector();
Message receive(long timeout) throws JMSException;
+
+ void invalidate();
}
Modified: trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -69,5 +69,6 @@
int deliveryMode,
int priority,
long timeToLive) throws JMSException;
-
+
+ void invalidate();
}
Modified: trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -95,4 +95,6 @@
ProducerDelegate createProducerDelegate(JBossDestination destination) throws JMSException;
void acknowledgeAll() throws JMSException;
+
+ void invalidate();
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -604,7 +604,7 @@
}
}
}
-
+
void addTemporaryDestination(Destination dest)
{
synchronized (temporaryDestinations)
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -433,7 +433,12 @@
acks.add(deliveryInfo);
}
-
+
+ // A result for each queue's recovery result
+ // Putting results on a separate HashMap to guarantee atomicity on the execution
+ // on this method
+ Map resultRecoveredAck = new HashMap();
+
Iterator iter = ackMap.entrySet().iterator();
while (iter.hasNext())
@@ -481,10 +486,33 @@
Queue expiryQueueToUse =
dest.getExpiryQueue() == null ? defaultExpiryQueue : dest.getExpiryQueue();
- int maxDeliveryAttemptsToUse =
- dest.getMaxDeliveryAttempts() == -1 ? defaultMaxDeliveryAttempts : dest.getMaxDeliveryAttempts();
-
List dels = queue.recoverDeliveries(ids);
+
+ resultRecoveredAck.put(queue, new Object[]{dels, acks, dlqToUse, expiryQueueToUse, dest});
+ }
+
+ // queue.recoverDeliveries could fail...
+ // I have separated this next loop from the previous loop, as I wanted the whole recoveryDeliveries
+ // to be an atomic operation. If anything goes wrong on recoverDeliveries we will keep everything
+ // as it used to be.. no changes whatsoever until every single message was found on recoverDeliveries
+
+ Iterator iterResults = resultRecoveredAck.entrySet().iterator();
+
+
+ while (iterResults.hasNext())
+ {
+
+ Map.Entry entry = (Map.Entry) iterResults.next();
+
+ Queue queue = (Queue)entry.getKey();
+
+ Object[] value = (Object[]) entry.getValue();
+
+ List dels = (List)value[0];
+ List acks = (List)value[1];
+ Queue dlqToUse = (Queue)value[2];
+ Queue expiryQueueToUse = (Queue) value[3];
+ ManagedDestination dest = (ManagedDestination) value[4];
Iterator iter2 = dels.iterator();
@@ -502,6 +530,9 @@
if (trace) { log.trace(this + " Recovered delivery " + deliveryId + ", " + del); }
+ int maxDeliveryAttemptsToUse =
+ dest.getMaxDeliveryAttempts() == -1 ? defaultMaxDeliveryAttempts : dest.getMaxDeliveryAttempts();
+
deliveries.put(new Long(deliveryId),
new DeliveryRecord(del, -1, dlqToUse,
expiryQueueToUse, dest.getRedeliveryDelay(), maxDeliveryAttemptsToUse));
@@ -763,7 +794,7 @@
}
}
}
-
+
void removeConsumer(int consumerId) throws Exception
{
synchronized (consumers)
Modified: trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -375,6 +375,9 @@
Iterator iter = messageIds.iterator();
List dels = new ArrayList();
+
+ // This operation needs to be atomic.. if it fails we will rollback before throwing the exception
+ ArrayList refsRemoved = new ArrayList();
synchronized (lock)
{
@@ -392,13 +395,26 @@
// TODO we need to look in paging state too - currently not supported
//http://jira.jboss.com/jira/browse/JBMESSAGING-839
log.warn(this + " cannot find reference " + id + " (Might be paged!)");
- break;
+
+ log.trace(this + " Adding references back to the list");
+
+ // Adding references back to messageRefs... keeping the same order they were removed
+ while (refsRemoved.size()>0)
+ {
+ MessageReference refAddBack = (MessageReference)refsRemoved.remove(refsRemoved.size()-1);
+ log.trace("Adding " + refAddBack + " back to messageRefs before throwing exception");
+ messageRefs.addFirst(refAddBack, refAddBack.getMessage().getPriority());
+ }
+
+ throw new IllegalStateException("Cannot find reference " + id);
}
MessageReference ref = (MessageReference)liter.next();
if (ref.getMessage().getMessageID() == id.longValue())
{
+ refsRemoved.add(ref);
+
liter.remove();
Delivery del = new SimpleDelivery(this, ref);
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/tests/build.xml 2007-06-30 04:48:44 UTC (rev 2819)
@@ -132,18 +132,6 @@
</path>
- <path id="mysql.jdbc.driver.classpath">
- <pathelement path="${tests.root}/lib/mysql-connector-java-3.1.13-bin.jar"/>
- </path>
-
- <path id="oracle.jdbc.driver.classpath">
- <pathelement path="${tests.root}/lib/ojdbc14.jar"/>
- </path>
-
- <path id="postgres.jdbc.driver.classpath">
- <pathelement path="${tests.root}/postgresql-8.1-405.jdbc3.jar"/>
- </path>
-
<!--
The compilation classpath.
-->
@@ -176,9 +164,8 @@
<path refid="jboss.jbossxb.classpath"/>
<path refid="jgroups.jgroups.classpath"/>
<path refid="apache.logging.classpath"/>
- <path refid="mysql.jdbc.driver.classpath"/>
- <path refid="oracle.jdbc.driver.classpath"/>
- <path refid="postgres.jdbc.driver.classpath"/>
+ <path refid="any.jdbc.driver.classpath"/>
+ <path refid="hsqldb.hsqldb.classpath"/>
<path refid="apache.tomcat.classpath"/>
<path refid="apache.logging.classpath"/>
</path>
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -1661,11 +1661,12 @@
}
}
+ // TODO:Reactivate as soon http://jira.jboss.org/jira/browse/JBMESSAGING-883 is done
// http://jira.jboss.org/jira/browse/JBMESSAGING-808
- public void testFailureRightAfterACK() throws Exception
- {
- failureOnInvocation(PoisonInterceptor.FAIL_AFTER_ACKNOWLEDGE_DELIVERY);
- }
+// public void testFailureRightAfterACK() throws Exception
+// {
+// failureOnInvocation(PoisonInterceptor.FAIL_AFTER_ACKNOWLEDGE_DELIVERY);
+// }
// http://jira.jboss.org/jira/browse/JBMESSAGING-808
public void testFailureRightBeforeACK() throws Exception
@@ -2028,145 +2029,145 @@
// See http://jira.jboss.org/jira/browse/JBMESSAGING-883
// This tests our current behaviour - which is throwing an exception
// This will change in 1.2.1
- public void testFailoverDeliveryRecoveryTransacted() throws Exception
- {
- Connection conn0 = null;
- Connection conn1 = null;
-
- try
- {
- conn0 = cf.createConnection();
-
- conn1 = cf.createConnection();
-
- assertEquals(1, ((JBossConnection)conn1).getServerID());
-
- Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
-
- Session session2 = conn1.createSession(true, Session.SESSION_TRANSACTED);
-
- MessageConsumer cons1 = session1.createConsumer(queue[1]);
-
- MessageConsumer cons2 = session2.createConsumer(queue[1]);
-
- MessageProducer prod = session1.createProducer(queue[1]);
-
- conn1.start();
-
- TextMessage tm1 = session1.createTextMessage("message1");
-
- TextMessage tm2 = session1.createTextMessage("message2");
-
- TextMessage tm3 = session1.createTextMessage("message3");
-
- prod.send(tm1);
-
- prod.send(tm2);
-
- prod.send(tm3);
-
- session1.commit();
-
- TextMessage rm1 = (TextMessage)cons1.receive(1000);
-
- assertNotNull(rm1);
-
- assertEquals(tm1.getText(), rm1.getText());
-
- TextMessage rm2 = (TextMessage)cons2.receive(1000);
-
- assertNotNull(rm2);
-
- assertEquals(tm2.getText(), rm2.getText());
-
- SimpleFailoverListener failoverListener = new SimpleFailoverListener();
- ((JBossConnection)conn1).registerFailoverListener(failoverListener);
-
- log.debug("killing node 1 ....");
-
- ServerManagement.kill(1);
-
- log.info("########");
- log.info("######## KILLED NODE 1");
- log.info("########");
-
- // wait for the client-side failover to complete
-
- while(true)
- {
- FailoverEvent event = failoverListener.getEvent(120000);
- if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
- {
- break;
- }
- if (event == null)
- {
- fail("Did not get expected FAILOVER_COMPLETED event");
- }
- }
-
- // failover complete
- log.info("failover completed");
-
-
- //now commit
-
- try
- {
- session1.commit();
-
- fail();
- }
- catch (MessagingTransactionRolledBackException e)
- {
- //Ok
- }
-
- try
- {
- session2.commit();
-
- fail();
- }
- catch (MessagingTransactionRolledBackException e)
- {
- //Ok
- }
-
-// session1.close();
+// public void testFailoverDeliveryRecoveryTransacted() throws Exception
+// {
+// Connection conn0 = null;
+// Connection conn1 = null;
//
-// session2.close();;
+// try
+// {
+// conn0 = cf.createConnection();
//
-// Session session3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// conn1 = cf.createConnection();
//
-// MessageConsumer cons3 = session3.createConsumer(queue[0]);
+// assertEquals(1, ((JBossConnection)conn1).getServerID());
//
-// TextMessage rm3 = (TextMessage)cons3.receive(2000);
+// Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
//
-// assertNotNull(rm3);
+// Session session2 = conn1.createSession(true, Session.SESSION_TRANSACTED);
//
-// assertEquals(tm3.getText(), rm3.getText());
+// MessageConsumer cons1 = session1.createConsumer(queue[1]);
//
-// rm3 = (TextMessage)cons3.receive(2000);
+// MessageConsumer cons2 = session2.createConsumer(queue[1]);
//
-// assertNull(rm3);
+// MessageProducer prod = session1.createProducer(queue[1]);
+//
+// conn1.start();
+//
+// TextMessage tm1 = session1.createTextMessage("message1");
+//
+// TextMessage tm2 = session1.createTextMessage("message2");
+//
+// TextMessage tm3 = session1.createTextMessage("message3");
+//
+// prod.send(tm1);
+//
+// prod.send(tm2);
+//
+// prod.send(tm3);
+//
+// session1.commit();
+//
+// TextMessage rm1 = (TextMessage)cons1.receive(1000);
+//
+// assertNotNull(rm1);
+//
+// assertEquals(tm1.getText(), rm1.getText());
+//
+// TextMessage rm2 = (TextMessage)cons2.receive(1000);
+//
+// assertNotNull(rm2);
+//
+// assertEquals(tm2.getText(), rm2.getText());
+//
+// SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+// ((JBossConnection)conn1).registerFailoverListener(failoverListener);
+//
+// log.debug("killing node 1 ....");
+//
+// ServerManagement.kill(1);
+//
+// log.info("########");
+// log.info("######## KILLED NODE 1");
+// log.info("########");
+//
+// // wait for the client-side failover to complete
+//
+// while(true)
+// {
+// FailoverEvent event = failoverListener.getEvent(120000);
+// if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+// {
+// break;
+// }
+// if (event == null)
+// {
+// fail("Did not get expected FAILOVER_COMPLETED event");
+// }
+// }
+//
+// // failover complete
+// log.info("failover completed");
+//
+//
+// //now commit
+//
+// try
+// {
+// session1.commit();
+//
+// fail();
+// }
+// catch (MessagingTransactionRolledBackException e)
+// {
+// //Ok
+// }
+//
+// try
+// {
+// session2.commit();
+//
+// fail();
+// }
+// catch (MessagingTransactionRolledBackException e)
+// {
+// //Ok
+// }
+//
+//// session1.close();
+////
+//// session2.close();;
+////
+//// Session session3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+////
+//// MessageConsumer cons3 = session3.createConsumer(queue[0]);
+////
+//// TextMessage rm3 = (TextMessage)cons3.receive(2000);
+////
+//// assertNotNull(rm3);
+////
+//// assertEquals(tm3.getText(), rm3.getText());
+////
+//// rm3 = (TextMessage)cons3.receive(2000);
+////
+//// assertNull(rm3);
+//
+//
+// }
+// finally
+// {
+// if (conn1 != null)
+// {
+// conn1.close();
+// }
+//
+// if (conn0 != null)
+// {
+// conn0.close();
+// }
+// }
+// }
-
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
- }
-
- if (conn0 != null)
- {
- conn0.close();
- }
- }
- }
-
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-06-30 04:48:44 UTC (rev 2819)
@@ -24,15 +24,9 @@
import java.util.Map;
import java.util.Set;
+import java.util.ArrayList;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Destination;
-import javax.jms.Topic;
+import javax.jms.*;
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
@@ -635,118 +629,79 @@
}
}
- public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
+
+ // This test needs to be removed when http://jira.jboss.org/jira/browse/JBMESSAGING-883
+ // is fixed.
+ //
+ // This test will create two sessions on server1
+ // One consumer on each session... one for queue, another to anotherQueue
+ // Send 100 messages on producer1
+ // Receive 50 messages on consumer1
+ // Kill the server
+ // Validate if the session was invalidated after failover
+ // Receive 100 messages again in another consumer on server2
+ // Validate if session1b was still valid.. sending and consuming messages...
+ // the session should still be avlie
+ //
+ //
+ public void testInvalidateSession() throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory )ic[0].lookup("/ClusteredConnectionFactory");
+ for (int i=0; i< nodeCount; i++)
+ {
+ ServerManagement.deployQueue("anotherQueue", i);
+ }
+
+ Queue anotherQueue = (Queue)ic[1].lookup("queue/anotherQueue");
+
ClientClusteredConnectionFactoryDelegate delegate =
(ClientClusteredConnectionFactoryDelegate)factory.getDelegate();
- Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
- assertEquals(3, nodeIDView.size());
- ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+ JBossConnection conn0 = (JBossConnection) factory.createConnection();
+ JBossConnection conn1 = (JBossConnection) factory.createConnection();
+ JBossConnection conn2 = (JBossConnection) factory.createConnection();
- ClientConnectionFactoryDelegate cf1 = delegates[0];
-
- ClientConnectionFactoryDelegate cf2 = delegates[1];
-
- ClientConnectionFactoryDelegate cf3 = delegates[2];
-
- int server0Id = cf1.getServerID();
-
- int server1Id = cf2.getServerID();
-
- int server2Id = cf3.getServerID();
-
- log.info("server 0 id: " + server0Id);
-
- log.info("server 1 id: " + server1Id);
-
- log.info("server 2 id: " + server2Id);
-
- assertEquals(0, server0Id);
-
- assertEquals(1, server1Id);
-
- assertEquals(2, server2Id);
-
- Map failoverMap = delegate.getFailoverMap();
-
- log.info(failoverMap.get(new Integer(server0Id)));
- log.info(failoverMap.get(new Integer(server1Id)));
- log.info(failoverMap.get(new Integer(server2Id)));
-
- int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
-
- // server 1 should failover onto server 2
-
- assertEquals(server2Id, server1FailoverId);
-
- Connection conn = null;
-
- boolean killed = false;
-
try
{
- conn = factory.createConnection(); //connection on server 0
+ assertEquals(0, getServerId(conn0));
+ assertEquals(1, getServerId(conn1));
+ assertEquals(2, getServerId(conn2));
- conn.close();
- conn = factory.createConnection(); //connection on server 1
+ Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer1 = session1.createProducer(queue[1]);
+ MessageConsumer consumer1 = session1.createConsumer(queue[1]);
- JBossConnection jbc = (JBossConnection)conn;
+ Session session1b = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer1b = session1b.createProducer(anotherQueue);
+ MessageConsumer consumer1b = session1b.createConsumer(anotherQueue);
- ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+ conn1.start();
- ConnectionState state = (ConnectionState)del.getState();
- int initialServerID = state.getServerID();
- assertEquals(1, initialServerID);
+ Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createConsumer(queue[2]);
+ conn2.start();
- Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue[1]);
-
- MessageConsumer cons = sess.createConsumer(queue[1]);
-
- final int NUM_MESSAGES = 100;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
+ for (int i=0; i<100; i++)
{
- TextMessage tm = sess.createTextMessage("message:" + i);
-
- prod.send(tm);
+ producer1.send(session1.createTextMessage("Message:" + i));
}
- conn.start();
-
- //Now consume half of the messages but don't ack them these will end up in
- //client side toAck list
-
- for (int i = 0; i < NUM_MESSAGES / 2; i++)
+ for (int i=0; i<50; i++)
{
- TextMessage tm = (TextMessage)cons.receive(500);
-
- assertNotNull(tm);
-
- assertEquals("message:" + i, tm.getText());
+ TextMessage msg = (TextMessage )consumer1.receive(1000);
+ assertEquals("Message:" + i, msg.getText());
}
- //So now, messages should be in queue[1] on server 1
- //So we now kill server 1
- //Which should cause transparent failover of connection conn onto server 1
-
- log.info("here we go");
- log.info("######");
log.info("###### KILLING (CRASHING) SERVER 1");
log.info("######");
ServerManagement.kill(1);
- killed = true;
-
long sleepTime = 30;
log.info("killed server, now waiting for " + sleepTime + " seconds");
@@ -756,77 +711,270 @@
log.info("done wait");
- state = (ConnectionState)del.getState();
+ assertEquals(2, getServerId(conn1));
- int finalServerID = state.getServerID();
+ try
+ {
+ log.info("########################## Consuming message on failed consumer");
+ Message msg = consumer1.receive(1000);
+ log.info("########################## Message consumed on failed consumer! " + msg);
+ // It is supposed to fail, as ACKs won't be recovered due to the other active client
+ fail("Consumer on server1 was supposed to fail!");
+ }
+ catch (JMSException failed)
+ {
+ log.info("Expected exception after consumer.receive - " + failed);
+ }
- log.info("final server id= " + finalServerID);
+ for (int i=0; i<100; i++)
+ {
+ TextMessage msg = (TextMessage)consumer2.receive(1000);
+ log.info("Received " + msg.getText());
+ }
- //server id should now be 2
- assertEquals(2, finalServerID);
+ // While one session was failed... session1b is supposed to be valid
+ for (int i=0; i<10; i++)
+ {
+ producer1b.send(session1b.createTextMessage("MessageB:" + i));
+ }
- conn.start();
+ for (int i=0; i<10; i++)
+ {
+ TextMessage msg = (TextMessage)consumer1b.receive(1000);
+ assertNotNull(msg);
+ assertEquals("MessageB:" + i, msg.getText());
+ log.info("Received " + msg.getText());
+ }
- //Now should be able to consume the rest of the messages
- log.info("here1");
-
- TextMessage tm = null;
-
- for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+ try
{
- tm = (TextMessage)cons.receive(1000);
-
- assertNotNull(tm);
-
- log.debug("message is " + tm.getText());
-
- assertEquals("message:" + i, tm.getText());
+ session1.createConsumer(queue[1]);
+ // the session was invalidated!
+ fail("This call was supposed to fail!");
}
+ catch (JMSException failed)
+ {
+ log.info("Expected exception on session1.createConsumer(queue[1])" + failed);
+ }
- log.info("here2");
- //Now should be able to acknowledge them
+ // this is not supposed to fail
+ session1b.createConsumer(anotherQueue);
- tm.acknowledge();
- //Now check there are no more messages there
- sess.close();
-
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- cons = sess.createConsumer(queue[1]);
-
- Message m = cons.receive(500);
-
- assertNull(m);
-
- log.info("got to end of test");
}
finally
{
- if (conn != null)
+ try { conn0.close();} catch (Throwable ignored){}
+ try { conn1.close();} catch (Throwable ignored){}
+ try { conn2.close();} catch (Throwable ignored){}
+
+ for (int i=0; i< nodeCount; i++)
{
- try
- {
- conn.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ try{ServerManagement.undeployQueue("anotherQueue", i);} catch (Throwable ignored){}
}
- // Resurrect dead server
- if (killed)
- {
- ServerManagement.start(1, "all");
- }
}
-
}
+
+
+
+// TODO: Reactivate this test when http://jira.jboss.org/jira/browse/JBMESSAGING-883 is done
+// public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
+// {
+// JBossConnectionFactory factory = (JBossConnectionFactory )ic[0].lookup("/ClusteredConnectionFactory");
+//
+// ClientClusteredConnectionFactoryDelegate delegate =
+// (ClientClusteredConnectionFactoryDelegate)factory.getDelegate();
+//
+// Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+// assertEquals(3, nodeIDView.size());
+//
+// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+// ClientConnectionFactoryDelegate cf1 = delegates[0];
+//
+// ClientConnectionFactoryDelegate cf2 = delegates[1];
+//
+// ClientConnectionFactoryDelegate cf3 = delegates[2];
+//
+// int server0Id = cf1.getServerID();
+//
+// int server1Id = cf2.getServerID();
+//
+// int server2Id = cf3.getServerID();
+//
+// log.info("server 0 id: " + server0Id);
+//
+// log.info("server 1 id: " + server1Id);
+//
+// log.info("server 2 id: " + server2Id);
+//
+// assertEquals(0, server0Id);
+//
+// assertEquals(1, server1Id);
+//
+// assertEquals(2, server2Id);
+//
+// Map failoverMap = delegate.getFailoverMap();
+//
+// log.info(failoverMap.get(new Integer(server0Id)));
+// log.info(failoverMap.get(new Integer(server1Id)));
+// log.info(failoverMap.get(new Integer(server2Id)));
+//
+// int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+//
+// // server 1 should failover onto server 2
+//
+// assertEquals(server2Id, server1FailoverId);
+//
+// Connection conn = null;
+//
+// boolean killed = false;
+//
+// try
+// {
+// conn = factory.createConnection(); //connection on server 0
+//
+// conn.close();
+//
+// conn = factory.createConnection(); //connection on server 1
+//
+// JBossConnection jbc = (JBossConnection)conn;
+//
+// ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+//
+// ConnectionState state = (ConnectionState)del.getState();
+//
+// int initialServerID = state.getServerID();
+//
+// assertEquals(1, initialServerID);
+//
+// Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+//
+// MessageProducer prod = sess.createProducer(queue[1]);
+//
+// MessageConsumer cons = sess.createConsumer(queue[1]);
+//
+// final int NUM_MESSAGES = 100;
+//
+// for (int i = 0; i < NUM_MESSAGES; i++)
+// {
+// TextMessage tm = sess.createTextMessage("message:" + i);
+//
+// prod.send(tm);
+// }
+//
+// conn.start();
+//
+// //Now consume half of the messages but don't ack them these will end up in
+// //client side toAck list
+//
+// for (int i = 0; i < NUM_MESSAGES / 2; i++)
+// {
+// TextMessage tm = (TextMessage)cons.receive(500);
+//
+// assertNotNull(tm);
+//
+// assertEquals("message:" + i, tm.getText());
+// }
+//
+// //So now, messages should be in queue[1] on server 1
+// //So we now kill server 1
+// //Which should cause transparent failover of connection conn onto server 1
+//
+// log.info("here we go");
+// log.info("######");
+// log.info("###### KILLING (CRASHING) SERVER 1");
+// log.info("######");
+//
+// ServerManagement.kill(1);
+//
+// killed = true;
+//
+// long sleepTime = 30;
+//
+// log.info("killed server, now waiting for " + sleepTime + " seconds");
+//
+// // NOTE: the sleep time needs to be longer than the Remoting connector's lease period
+// Thread.sleep(sleepTime * 1000);
+//
+// log.info("done wait");
+//
+// state = (ConnectionState)del.getState();
+//
+// int finalServerID = state.getServerID();
+//
+// log.info("final server id= " + finalServerID);
+//
+// //server id should now be 2
+//
+// assertEquals(2, finalServerID);
+//
+// conn.start();
+//
+// //Now should be able to consume the rest of the messages
+//
+// log.info("here1");
+//
+// TextMessage tm = null;
+//
+// for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+// {
+// tm = (TextMessage)cons.receive(1000);
+//
+// assertNotNull(tm);
+//
+// log.debug("message is " + tm.getText());
+//
+// assertEquals("message:" + i, tm.getText());
+// }
+//
+// log.info("here2");
+//
+// //Now should be able to acknowledge them
+//
+// tm.acknowledge();
+//
+// //Now check there are no more messages there
+// sess.close();
+//
+// sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+// cons = sess.createConsumer(queue[1]);
+//
+// Message m = cons.receive(500);
+//
+// assertNull(m);
+//
+// log.info("got to end of test");
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// try
+// {
+// conn.close();
+// }
+// catch (Exception e)
+// {
+// e.printStackTrace();
+// }
+// }
+//
+// // Resurrect dead server
+// if (killed)
+// {
+// ServerManagement.start(1, "all");
+// }
+// }
+//
+// }
+//
/*
TODO: Reactivate this test when http://jira.jboss.org/jira/browse/JBMESSAGING-883 is done
@@ -1095,7 +1243,57 @@
conn2.close();
}
-
+ public void testInvalidate() throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+ Connection conn1 = factory.createConnection();
+ JBossConnection conn2 = (JBossConnection)factory.createConnection();
+ conn1.close();
+
+ try
+ {
+ Session session = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn2.getDelegate().invalidate();
+
+ try
+ {
+ Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fail ("delegate supposed to fail as connection was invalidated!");
+ }
+ catch (javax.jms.IllegalStateException e)
+ {
+ log.info("Caught expected exception - " + e);
+ }
+
+ try
+ {
+ conn2.start();
+ fail ("delegate supposed to fail as connection was invalidated!");
+ }
+ catch (javax.jms.IllegalStateException e)
+ {
+ log.info("Caught expected exception - " + e);
+ }
+
+ try
+ {
+ MessageConsumer consumer = session.createConsumer(queue[1]);
+ fail ("delegate supposed to fail as connection was invalidated!");
+ }
+ catch (javax.jms.IllegalStateException e)
+ {
+ log.info("Caught expected exception - " + e);
+ }
+ }
+ finally
+ {
+ conn2.close(); // we should still be able to close invalidated clients!
+ }
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list