[jboss-cvs] JBoss Messaging SVN: r2819 - in trunk: src/main/org/jboss/jms/client/container and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Jun 30 00:48:45 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-06-30 00:48:44 -0400 (Sat, 30 Jun 2007)
New Revision: 2819

Modified:
   trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
   trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
   trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
   trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java
   trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1006

Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -153,7 +153,10 @@
       catch (Exception e)
       {
          log.error("Failover failed", e);
-         
+
+         // Marking delegate as invalid!
+         state.getDelegate().invalidate();
+
          throw e;
       }
       finally
@@ -171,6 +174,10 @@
          else
          {
             log.debug(this + " aborted failover");
+            ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)state.getDelegate();
+            connDelegate.closing();
+            connDelegate.close();
+            
             broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_FAILED, this));
          }
       }

Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -59,7 +59,8 @@
    private static final int CLOSING = 2;
    private static final int IN_CLOSE = 3; // performing the close
    private static final int CLOSED = -1;
-   
+   private static final int INVALID = -2; // the Delegate was marked invalid by a bad failover
+
    // Attributes ----------------------------------------------------
    
    private boolean trace = log.isTraceEnabled();
@@ -81,7 +82,8 @@
          state == IN_CLOSING ? "IN_CLOSING" :
             state == CLOSING ? "CLOSING" :
                state == IN_CLOSE ? "IN_CLOSE" :
-                  state == CLOSED ? "CLOSED" : "UNKNOWN";
+                  state == CLOSED ? "CLOSED" :
+                     state == INVALID ? "INVALID" : "UNKNOWN";
    }
    
    // Constructors --------------------------------------------------
@@ -126,9 +128,12 @@
       }
       
       String methodName = ((MethodInvocation)invocation).getMethod().getName();
+
+      log.trace("Invoke on ClosedInterceptor = " + methodName + " with state = " + stateToString(state));
         
       boolean isClosing = methodName.equals("closing");
       boolean isClose = methodName.equals("close");
+      boolean isInvalidate = methodName.equals("invalidate");
       
       if (isClosing)
       {         
@@ -144,11 +149,22 @@
             return null;
          }
       }
+      else if (isInvalidate)
+      {
+         state = INVALID;
+         invalidateRelatives(invocation);
+         return null;
+      }
       else
       {
          synchronized(this)
          {
             // object "in use", increment inUseCount
+            if (state == INVALID)
+            {
+                throw new IllegalStateException("The delegator is invalid, look at logs as failover probably couldn't complete");
+            }
+            else
             if (state == IN_CLOSE || state == CLOSED)
             {
                log.error(this + ": method " + methodName + "() did not go through, " +
@@ -271,9 +287,46 @@
       }
    }
 
+
    /**
+    * Invalidate children
+    *
+    * @param invocation the invocation
+    */
+   protected void invalidateRelatives(Invocation invocation)
+   {
+      HierarchicalState state = ((DelegateSupport)invocation.getTargetObject()).getState();
+
+      // We use a clone to avoid a deadlock where requests are made to close parent and child
+      // concurrently
+
+      Set clone;
+
+      Set children = state.getChildren();
+
+      if (children == null)
+      {
+         if (trace) { log.trace(this + " has no children"); }
+         return;
+      }
+
+      synchronized (children)
+      {
+         clone = new HashSet(children);
+      }
+
+      // Cycle through the children this will do a depth first close
+      for (Iterator i = clone.iterator(); i.hasNext();)
+      {
+         HierarchicalState child = (HierarchicalState)i.next();
+         DelegateSupport del = (DelegateSupport)child.getDelegate();
+         del.invalidate();
+      }
+   }
+
+   /**
     * Close children and remove from parent
-    * 
+    *
     * @param invocation the invocation
     */
    protected void maintainRelatives(Invocation invocation)

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -71,6 +71,12 @@
 
    // DelegateSupport overrides --------------------------------------------------------------------
 
+   public void invalidate()
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
+
    public void synchronizeWith(DelegateSupport nd) throws Exception
    {
       super.synchronizeWith(nd);

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -99,6 +99,11 @@
 
    // DelegateSupport overrides --------------------------------------------------------------------
 
+   public void invalidate()
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
    public void synchronizeWith(DelegateSupport nd) throws Exception
    {
       log.debug(this + " synchronizing with " + nd);

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -258,8 +258,12 @@
    {
       super.synchronizeWith(newDelegate);
    }
-   
 
+   public void invalidate()
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
    // Protected ------------------------------------------------------------------------------------
 
    // Package Private ------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -80,6 +80,12 @@
 
    // DelegateSupport overrides --------------------------------------------------------------------
 
+   public void invalidate()
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
+
    public void synchronizeWith(DelegateSupport nd) throws Exception
    {
       log.debug(this + " synchronizing with " + nd);
@@ -88,6 +94,10 @@
 
       ClientConsumerDelegate newDelegate = (ClientConsumerDelegate)nd;
 
+      // The client needs to be set first
+      client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
+         getRemotingClient();
+
       // synchronize server endpoint state
 
       // synchronize (recursively) the client-side state
@@ -99,8 +109,6 @@
       bufferSize = newDelegate.getBufferSize();
       maxDeliveries = newDelegate.getMaxDeliveries();
 
-      client = ((ConnectionState)state.getParent().getParent()).getRemotingConnection().
-         getRemotingClient();
    }
 
    public void setState(HierarchicalState state)

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -57,6 +57,12 @@
 
    // DelegateSupport overrides --------------------------------------------------------------------
 
+   public void invalidate()
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
+
    public void synchronizeWith(DelegateSupport nd) throws Exception
    {
       super.synchronizeWith(nd);

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -107,6 +107,12 @@
 
    // DelegateSupport overrides --------------------------------------------------------------------
 
+   public void invalidate()
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
+
    public void synchronizeWith(DelegateSupport nd) throws Exception
    {
       log.debug(this + " synchronizing with " + nd);

Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -136,6 +136,9 @@
    {
       return id;
    }
+
+   public abstract void invalidate();
+
    
    /**
     * During HA events, delegates corresponding to new enpoints on the new server are created and

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -337,7 +337,16 @@
          }
 
          log.debug(this + " sending delivery recovery " + recoveryInfos + " on failover");
-         newDelegate.recoverDeliveries(recoveryInfos);
+         try
+         {
+            newDelegate.recoverDeliveries(recoveryInfos);
+         }
+         catch (Exception e)
+         {
+            log.error(e.toString(),e);
+            log.info("RecoverDeliveries failed, marking session as invalidated!");
+            this.getDelegate().invalidate();
+         }
       }
       else
       {

Modified: trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/delegate/ConnectionDelegate.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -57,4 +57,6 @@
    void registerFailoverListener(FailoverListener failoverListener);
    
    boolean unregisterFailoverListener(FailoverListener failoverListener);
+
+   void invalidate();
 }

Modified: trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/delegate/ConsumerDelegate.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -52,4 +52,6 @@
    String getMessageSelector();
 
    Message receive(long timeout) throws JMSException;
+
+   void invalidate();
 }

Modified: trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/delegate/ProducerDelegate.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -69,5 +69,6 @@
              int deliveryMode,
              int priority,
              long timeToLive) throws JMSException;
-     
+
+   void invalidate();
 }

Modified: trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -95,4 +95,6 @@
    ProducerDelegate createProducerDelegate(JBossDestination destination) throws JMSException;
 
    void acknowledgeAll() throws JMSException;
+
+   void invalidate();
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -604,7 +604,7 @@
          }
       }
    }
-   
+
    void addTemporaryDestination(Destination dest)
    {
       synchronized (temporaryDestinations)

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -433,7 +433,12 @@
             
             acks.add(deliveryInfo);
          }  
-         
+
+         // A result for each queue's recovery result
+         // Putting results on a separate HashMap to guarantee atomicity on the execution
+         //   on this method
+         Map resultRecoveredAck = new HashMap();
+
          Iterator iter = ackMap.entrySet().iterator();
          
          while (iter.hasNext())
@@ -481,10 +486,33 @@
             Queue expiryQueueToUse =
                dest.getExpiryQueue() == null ? defaultExpiryQueue : dest.getExpiryQueue();
             
-            int maxDeliveryAttemptsToUse =
-               dest.getMaxDeliveryAttempts() == -1 ? defaultMaxDeliveryAttempts : dest.getMaxDeliveryAttempts();
-            
             List dels = queue.recoverDeliveries(ids);
+
+            resultRecoveredAck.put(queue, new Object[]{dels, acks, dlqToUse, expiryQueueToUse, dest});
+         }
+
+         // queue.recoverDeliveries could fail...
+         // I have separated this next loop from the previous loop, as I wanted the whole recoveryDeliveries
+         // to be an atomic operation. If anything goes wrong on recoverDeliveries we will keep everything
+         // as it used to be.. no changes whatsoever until every single message was found on recoverDeliveries
+
+         Iterator iterResults = resultRecoveredAck.entrySet().iterator();
+         
+
+         while (iterResults.hasNext())
+         {
+
+            Map.Entry entry = (Map.Entry) iterResults.next();
+
+            Queue queue = (Queue)entry.getKey();
+
+            Object[] value = (Object[]) entry.getValue();
+
+            List dels = (List)value[0];
+            List acks = (List)value[1];
+            Queue dlqToUse = (Queue)value[2];
+            Queue expiryQueueToUse = (Queue) value[3];
+            ManagedDestination dest = (ManagedDestination) value[4];
             
             Iterator iter2 = dels.iterator();
             
@@ -502,6 +530,9 @@
                
                if (trace) { log.trace(this + " Recovered delivery " + deliveryId + ", " + del); }
                
+               int maxDeliveryAttemptsToUse =
+                  dest.getMaxDeliveryAttempts() == -1 ? defaultMaxDeliveryAttempts : dest.getMaxDeliveryAttempts();
+
                deliveries.put(new Long(deliveryId),
                               new DeliveryRecord(del, -1, dlqToUse,
                                                  expiryQueueToUse, dest.getRedeliveryDelay(), maxDeliveryAttemptsToUse));
@@ -763,7 +794,7 @@
          }
       }
    }
-   
+
    void removeConsumer(int consumerId) throws Exception
    {
       synchronized (consumers)

Modified: trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -375,6 +375,9 @@
       Iterator iter = messageIds.iterator();
                   
       List dels = new ArrayList();
+
+      // This operation needs to be atomic.. if it fails we will rollback before throwing the exception
+      ArrayList refsRemoved = new ArrayList();
       
       synchronized (lock)
       {
@@ -392,13 +395,26 @@
                   // TODO we need to look in paging state too - currently not supported
                   //http://jira.jboss.com/jira/browse/JBMESSAGING-839
                   log.warn(this + " cannot find reference " + id + " (Might be paged!)");
-                  break;
+
+                  log.trace(this + " Adding references back to the list");
+
+                  // Adding references back to messageRefs... keeping the same order they were removed
+                  while (refsRemoved.size()>0)
+                  {
+                     MessageReference refAddBack = (MessageReference)refsRemoved.remove(refsRemoved.size()-1);
+                     log.trace("Adding " + refAddBack + " back to messageRefs before throwing exception");
+                     messageRefs.addFirst(refAddBack, refAddBack.getMessage().getPriority());
+                  }
+
+                  throw new IllegalStateException("Cannot find reference " + id);
                }
                
                MessageReference ref = (MessageReference)liter.next();
                
                if (ref.getMessage().getMessageID() == id.longValue())
                {
+                  refsRemoved.add(ref);
+                  
                   liter.remove();
                   
                   Delivery del = new SimpleDelivery(this, ref);

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/tests/build.xml	2007-06-30 04:48:44 UTC (rev 2819)
@@ -132,18 +132,6 @@
    </path>
 
 
-   <path id="mysql.jdbc.driver.classpath">
-      <pathelement path="${tests.root}/lib/mysql-connector-java-3.1.13-bin.jar"/>
-   </path>
-
-   <path id="oracle.jdbc.driver.classpath">
-      <pathelement path="${tests.root}/lib/ojdbc14.jar"/>
-   </path>
-
-   <path id="postgres.jdbc.driver.classpath">
-      <pathelement path="${tests.root}/postgresql-8.1-405.jdbc3.jar"/>
-   </path>
-
    <!--
         The compilation classpath.
    -->
@@ -176,9 +164,8 @@
       <path refid="jboss.jbossxb.classpath"/>
       <path refid="jgroups.jgroups.classpath"/>
       <path refid="apache.logging.classpath"/>
-      <path refid="mysql.jdbc.driver.classpath"/>
-      <path refid="oracle.jdbc.driver.classpath"/>
-      <path refid="postgres.jdbc.driver.classpath"/>
+      <path refid="any.jdbc.driver.classpath"/>
+      <path refid="hsqldb.hsqldb.classpath"/>
       <path refid="apache.tomcat.classpath"/>
       <path refid="apache.logging.classpath"/>
    </path>

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -1661,11 +1661,12 @@
       }
    }
 
+   // TODO:Reactivate as soon http://jira.jboss.org/jira/browse/JBMESSAGING-883 is done
    // http://jira.jboss.org/jira/browse/JBMESSAGING-808
-   public void testFailureRightAfterACK() throws Exception
-   {
-      failureOnInvocation(PoisonInterceptor.FAIL_AFTER_ACKNOWLEDGE_DELIVERY);
-   }
+//   public void testFailureRightAfterACK() throws Exception
+//   {
+//      failureOnInvocation(PoisonInterceptor.FAIL_AFTER_ACKNOWLEDGE_DELIVERY);
+//   }
 
    // http://jira.jboss.org/jira/browse/JBMESSAGING-808
    public void testFailureRightBeforeACK() throws Exception
@@ -2028,145 +2029,145 @@
    // See http://jira.jboss.org/jira/browse/JBMESSAGING-883
    // This tests our current behaviour - which is throwing an exception
    // This will change in 1.2.1
-   public void testFailoverDeliveryRecoveryTransacted() throws Exception
-   {
-      Connection conn0 = null;
-      Connection conn1 = null;
-
-      try
-      {
-         conn0 = cf.createConnection();
-
-         conn1 = cf.createConnection();
-
-         assertEquals(1, ((JBossConnection)conn1).getServerID());
-
-         Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
-         
-         Session session2 = conn1.createSession(true, Session.SESSION_TRANSACTED);
-
-         MessageConsumer cons1 = session1.createConsumer(queue[1]);
-         
-         MessageConsumer cons2 = session2.createConsumer(queue[1]);
-         
-         MessageProducer prod = session1.createProducer(queue[1]);
-         
-         conn1.start();
-                  
-         TextMessage tm1 = session1.createTextMessage("message1");
-         
-         TextMessage tm2 = session1.createTextMessage("message2");
-         
-         TextMessage tm3 = session1.createTextMessage("message3");
-         
-         prod.send(tm1);
-         
-         prod.send(tm2);
-         
-         prod.send(tm3);
-         
-         session1.commit();
-                           
-         TextMessage rm1 = (TextMessage)cons1.receive(1000);
-         
-         assertNotNull(rm1);
-         
-         assertEquals(tm1.getText(), rm1.getText());
-                                    
-         TextMessage rm2 = (TextMessage)cons2.receive(1000);
-         
-         assertNotNull(rm2);
-         
-         assertEquals(tm2.getText(), rm2.getText());
-         
-         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
-         ((JBossConnection)conn1).registerFailoverListener(failoverListener);
-
-         log.debug("killing node 1 ....");
-
-         ServerManagement.kill(1);
-
-         log.info("########");
-         log.info("######## KILLED NODE 1");
-         log.info("########");
-
-         // wait for the client-side failover to complete
-
-         while(true)
-         {
-            FailoverEvent event = failoverListener.getEvent(120000);
-            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
-            {
-               break;
-            }
-            if (event == null)
-            {
-               fail("Did not get expected FAILOVER_COMPLETED event");
-            }
-         }
-
-         // failover complete
-         log.info("failover completed");
-         
-
-         //now commit
-         
-         try
-         {
-            session1.commit();
-            
-            fail();
-         }
-         catch (MessagingTransactionRolledBackException e)
-         {
-            //Ok
-         }
-         
-         try
-         {
-            session2.commit();
-            
-            fail();
-         }
-         catch (MessagingTransactionRolledBackException e)
-         {
-            //Ok
-         }
-                         
-//         session1.close();
+//   public void testFailoverDeliveryRecoveryTransacted() throws Exception
+//   {
+//      Connection conn0 = null;
+//      Connection conn1 = null;
 //
-//         session2.close();;
+//      try
+//      {
+//         conn0 = cf.createConnection();
 //
-//         Session session3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         conn1 = cf.createConnection();
 //
-//         MessageConsumer cons3 = session3.createConsumer(queue[0]);
+//         assertEquals(1, ((JBossConnection)conn1).getServerID());
 //
-//         TextMessage rm3 = (TextMessage)cons3.receive(2000);
+//         Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
 //
-//         assertNotNull(rm3);
+//         Session session2 = conn1.createSession(true, Session.SESSION_TRANSACTED);
 //
-//         assertEquals(tm3.getText(), rm3.getText());
+//         MessageConsumer cons1 = session1.createConsumer(queue[1]);
 //
-//         rm3 = (TextMessage)cons3.receive(2000);
+//         MessageConsumer cons2 = session2.createConsumer(queue[1]);
 //
-//         assertNull(rm3);
+//         MessageProducer prod = session1.createProducer(queue[1]);
+//
+//         conn1.start();
+//
+//         TextMessage tm1 = session1.createTextMessage("message1");
+//
+//         TextMessage tm2 = session1.createTextMessage("message2");
+//
+//         TextMessage tm3 = session1.createTextMessage("message3");
+//
+//         prod.send(tm1);
+//
+//         prod.send(tm2);
+//
+//         prod.send(tm3);
+//
+//         session1.commit();
+//
+//         TextMessage rm1 = (TextMessage)cons1.receive(1000);
+//
+//         assertNotNull(rm1);
+//
+//         assertEquals(tm1.getText(), rm1.getText());
+//
+//         TextMessage rm2 = (TextMessage)cons2.receive(1000);
+//
+//         assertNotNull(rm2);
+//
+//         assertEquals(tm2.getText(), rm2.getText());
+//
+//         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+//         ((JBossConnection)conn1).registerFailoverListener(failoverListener);
+//
+//         log.debug("killing node 1 ....");
+//
+//         ServerManagement.kill(1);
+//
+//         log.info("########");
+//         log.info("######## KILLED NODE 1");
+//         log.info("########");
+//
+//         // wait for the client-side failover to complete
+//
+//         while(true)
+//         {
+//            FailoverEvent event = failoverListener.getEvent(120000);
+//            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+//            {
+//               break;
+//            }
+//            if (event == null)
+//            {
+//               fail("Did not get expected FAILOVER_COMPLETED event");
+//            }
+//         }
+//
+//         // failover complete
+//         log.info("failover completed");
+//
+//
+//         //now commit
+//
+//         try
+//         {
+//            session1.commit();
+//
+//            fail();
+//         }
+//         catch (MessagingTransactionRolledBackException e)
+//         {
+//            //Ok
+//         }
+//
+//         try
+//         {
+//            session2.commit();
+//
+//            fail();
+//         }
+//         catch (MessagingTransactionRolledBackException e)
+//         {
+//            //Ok
+//         }
+//
+////         session1.close();
+////
+////         session2.close();;
+////
+////         Session session3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+////
+////         MessageConsumer cons3 = session3.createConsumer(queue[0]);
+////
+////         TextMessage rm3 = (TextMessage)cons3.receive(2000);
+////
+////         assertNotNull(rm3);
+////
+////         assertEquals(tm3.getText(), rm3.getText());
+////
+////         rm3 = (TextMessage)cons3.receive(2000);
+////
+////         assertNull(rm3);
+//
+//
+//      }
+//      finally
+//      {
+//         if (conn1 != null)
+//         {
+//            conn1.close();
+//         }
+//
+//         if (conn0 != null)
+//         {
+//            conn0.close();
+//         }
+//      }
+//   }
 
-
-      }
-      finally
-      {
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-
-         if (conn0 != null)
-         {
-            conn0.close();
-         }
-      }
-   }
-
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2007-06-29 11:40:33 UTC (rev 2818)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2007-06-30 04:48:44 UTC (rev 2819)
@@ -24,15 +24,9 @@
 
 import java.util.Map;
 import java.util.Set;
+import java.util.ArrayList;
 
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Destination;
-import javax.jms.Topic;
+import javax.jms.*;
 
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossConnectionFactory;
@@ -635,118 +629,79 @@
       }
    }
 
-   public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
+
+   // This test needs to be removed when http://jira.jboss.org/jira/browse/JBMESSAGING-883
+   //  is fixed.
+   // 
+   // This test will create two sessions on server1
+   // One consumer on each session... one for queue, another to anotherQueue
+   // Send 100 messages on producer1
+   // Receive 50 messages on consumer1
+   // Kill the server
+   // Validate if the session was invalidated after failover
+   // Receive 100 messages again in another consumer on server2
+   // Validate if session1b was still valid.. sending and consuming messages...
+   //    the session should still be avlie
+   //
+   //
+   public void testInvalidateSession() throws Exception
    {
       JBossConnectionFactory factory =  (JBossConnectionFactory )ic[0].lookup("/ClusteredConnectionFactory");
 
+      for (int i=0; i< nodeCount; i++)
+      {
+         ServerManagement.deployQueue("anotherQueue", i);
+      }
+
+      Queue anotherQueue = (Queue)ic[1].lookup("queue/anotherQueue");
+
       ClientClusteredConnectionFactoryDelegate delegate =
          (ClientClusteredConnectionFactoryDelegate)factory.getDelegate();
 
-      Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
-      assertEquals(3, nodeIDView.size());
 
-      ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+      JBossConnection conn0 = (JBossConnection) factory.createConnection();
+      JBossConnection conn1 = (JBossConnection) factory.createConnection();
+      JBossConnection conn2 = (JBossConnection) factory.createConnection();
 
-      ClientConnectionFactoryDelegate cf1 = delegates[0];
-
-      ClientConnectionFactoryDelegate cf2 = delegates[1];
-
-      ClientConnectionFactoryDelegate cf3 = delegates[2];
-
-      int server0Id = cf1.getServerID();
-
-      int server1Id = cf2.getServerID();
-
-      int server2Id = cf3.getServerID();
-
-      log.info("server 0 id: " + server0Id);
-
-      log.info("server 1 id: " + server1Id);
-
-      log.info("server 2 id: " + server2Id);
-      
-      assertEquals(0, server0Id);
-      
-      assertEquals(1, server1Id);
-      
-      assertEquals(2, server2Id);
-
-      Map failoverMap = delegate.getFailoverMap();
-
-      log.info(failoverMap.get(new Integer(server0Id)));
-      log.info(failoverMap.get(new Integer(server1Id)));
-      log.info(failoverMap.get(new Integer(server2Id)));
-
-      int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
-
-      // server 1 should failover onto server 2
-
-      assertEquals(server2Id, server1FailoverId);
-
-      Connection conn = null;
-
-      boolean killed = false;
-
       try
       {
-         conn = factory.createConnection(); //connection on server 0
+         assertEquals(0, getServerId(conn0));
+         assertEquals(1, getServerId(conn1));
+         assertEquals(2, getServerId(conn2));
 
-         conn.close();
 
-         conn = factory.createConnection(); //connection on server 1
+         Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer1 = session1.createProducer(queue[1]);
+         MessageConsumer consumer1 = session1.createConsumer(queue[1]);
 
-         JBossConnection jbc = (JBossConnection)conn;
+         Session session1b = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer1b = session1b.createProducer(anotherQueue);
+         MessageConsumer consumer1b = session1b.createConsumer(anotherQueue);
 
-         ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+         conn1.start();
 
-         ConnectionState state = (ConnectionState)del.getState();
 
-         int initialServerID = state.getServerID();
 
-         assertEquals(1, initialServerID);
+         Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer consumer2 = session2.createConsumer(queue[2]);
+         conn2.start();
 
-         Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-         MessageProducer prod = sess.createProducer(queue[1]);
-
-         MessageConsumer cons = sess.createConsumer(queue[1]);
-
-         final int NUM_MESSAGES = 100;
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
+         for (int i=0; i<100; i++)
          {
-            TextMessage tm = sess.createTextMessage("message:" + i);
-
-            prod.send(tm);
+            producer1.send(session1.createTextMessage("Message:" + i));
          }
 
-         conn.start();
-
-         //Now consume half of the messages but don't ack them these will end up in
-         //client side toAck list
-
-         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         for (int i=0; i<50; i++)
          {
-            TextMessage tm = (TextMessage)cons.receive(500);
-
-            assertNotNull(tm);
-
-            assertEquals("message:" + i, tm.getText());
+            TextMessage msg = (TextMessage )consumer1.receive(1000);
+            assertEquals("Message:" + i, msg.getText());
          }
 
-         //So now, messages should be in queue[1] on server 1
-         //So we now kill server 1
-         //Which should cause transparent failover of connection conn onto server 1
-
-         log.info("here we go");
-         log.info("######");
          log.info("###### KILLING (CRASHING) SERVER 1");
          log.info("######");
 
          ServerManagement.kill(1);
 
-         killed = true;
-
          long sleepTime = 30;
 
          log.info("killed server, now waiting for " + sleepTime + " seconds");
@@ -756,77 +711,270 @@
 
          log.info("done wait");
 
-         state = (ConnectionState)del.getState();
+         assertEquals(2, getServerId(conn1));
 
-         int finalServerID = state.getServerID();
+         try
+         {
+            log.info("########################## Consuming message on failed consumer");
+            Message msg = consumer1.receive(1000);
+            log.info("########################## Message consumed on failed consumer! " + msg);
+            // It is supposed to fail, as ACKs won't be recovered due to the other active client
+            fail("Consumer on server1 was supposed to fail!");
+         }
+         catch (JMSException failed)
+         {
+            log.info("Expected exception after consumer.receive - " + failed);
+         }
 
-         log.info("final server id= " + finalServerID);
+         for (int i=0; i<100; i++)
+         {
+            TextMessage msg = (TextMessage)consumer2.receive(1000);
+            log.info("Received " + msg.getText());
+         }
 
-         //server id should now be 2
 
-         assertEquals(2, finalServerID);
+         // While one session was failed... session1b is supposed to be valid
+         for (int i=0; i<10; i++)
+         {
+            producer1b.send(session1b.createTextMessage("MessageB:" + i));
+         }
 
-         conn.start();
+         for (int i=0; i<10; i++)
+         {
+            TextMessage msg = (TextMessage)consumer1b.receive(1000);
+            assertNotNull(msg);
+            assertEquals("MessageB:" + i, msg.getText());
+            log.info("Received " + msg.getText());
+         }
 
-         //Now should be able to consume the rest of the messages
 
-         log.info("here1");
-
-         TextMessage tm = null;
-
-         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+         try
          {
-            tm = (TextMessage)cons.receive(1000);
-
-            assertNotNull(tm);
-            
-            log.debug("message is " + tm.getText());
-
-            assertEquals("message:" + i, tm.getText());
+            session1.createConsumer(queue[1]);
+            // the session was invalidated!
+            fail("This call was supposed to fail!");
          }
+         catch (JMSException failed)
+         {
+            log.info("Expected exception on session1.createConsumer(queue[1])" + failed);
+         }
 
-         log.info("here2");
 
-         //Now should be able to acknowledge them
+         // this is not supposed to fail
+         session1b.createConsumer(anotherQueue);
 
-         tm.acknowledge();
 
-         //Now check there are no more messages there
-         sess.close();
-
-         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         cons = sess.createConsumer(queue[1]);
-
-         Message m = cons.receive(500);
-
-         assertNull(m);
-
-         log.info("got to end of test");
       }
       finally
       {
-         if (conn != null)
+         try { conn0.close();} catch (Throwable ignored){}
+         try { conn1.close();} catch (Throwable ignored){}
+         try { conn2.close();} catch (Throwable ignored){}
+
+         for (int i=0; i< nodeCount; i++)
          {
-            try
-            {
-               conn.close();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
+            try{ServerManagement.undeployQueue("anotherQueue", i);} catch (Throwable ignored){}
          }
 
-         // Resurrect dead server
-         if (killed)
-         {
-            ServerManagement.start(1, "all");
-         }
       }
-
    }
 
+
+
+
+//   TODO: Reactivate this test when http://jira.jboss.org/jira/browse/JBMESSAGING-883 is done
+//   public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
+//   {
+//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic[0].lookup("/ClusteredConnectionFactory");
+//
+//      ClientClusteredConnectionFactoryDelegate delegate =
+//         (ClientClusteredConnectionFactoryDelegate)factory.getDelegate();
+//
+//      Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+//      assertEquals(3, nodeIDView.size());
+//
+//      ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//
+//      ClientConnectionFactoryDelegate cf1 = delegates[0];
+//
+//      ClientConnectionFactoryDelegate cf2 = delegates[1];
+//
+//      ClientConnectionFactoryDelegate cf3 = delegates[2];
+//
+//      int server0Id = cf1.getServerID();
+//
+//      int server1Id = cf2.getServerID();
+//
+//      int server2Id = cf3.getServerID();
+//
+//      log.info("server 0 id: " + server0Id);
+//
+//      log.info("server 1 id: " + server1Id);
+//
+//      log.info("server 2 id: " + server2Id);
+//
+//      assertEquals(0, server0Id);
+//
+//      assertEquals(1, server1Id);
+//
+//      assertEquals(2, server2Id);
+//
+//      Map failoverMap = delegate.getFailoverMap();
+//
+//      log.info(failoverMap.get(new Integer(server0Id)));
+//      log.info(failoverMap.get(new Integer(server1Id)));
+//      log.info(failoverMap.get(new Integer(server2Id)));
+//
+//      int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+//
+//      // server 1 should failover onto server 2
+//
+//      assertEquals(server2Id, server1FailoverId);
+//
+//      Connection conn = null;
+//
+//      boolean killed = false;
+//
+//      try
+//      {
+//         conn = factory.createConnection(); //connection on server 0
+//
+//         conn.close();
+//
+//         conn = factory.createConnection(); //connection on server 1
+//
+//         JBossConnection jbc = (JBossConnection)conn;
+//
+//         ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+//
+//         ConnectionState state = (ConnectionState)del.getState();
+//
+//         int initialServerID = state.getServerID();
+//
+//         assertEquals(1, initialServerID);
+//
+//         Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+//
+//         MessageProducer prod = sess.createProducer(queue[1]);
+//
+//         MessageConsumer cons = sess.createConsumer(queue[1]);
+//
+//         final int NUM_MESSAGES = 100;
+//
+//         for (int i = 0; i < NUM_MESSAGES; i++)
+//         {
+//            TextMessage tm = sess.createTextMessage("message:" + i);
+//
+//            prod.send(tm);
+//         }
+//
+//         conn.start();
+//
+//         //Now consume half of the messages but don't ack them these will end up in
+//         //client side toAck list
+//
+//         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+//         {
+//            TextMessage tm = (TextMessage)cons.receive(500);
+//
+//            assertNotNull(tm);
+//
+//            assertEquals("message:" + i, tm.getText());
+//         }
+//
+//         //So now, messages should be in queue[1] on server 1
+//         //So we now kill server 1
+//         //Which should cause transparent failover of connection conn onto server 1
+//
+//         log.info("here we go");
+//         log.info("######");
+//         log.info("###### KILLING (CRASHING) SERVER 1");
+//         log.info("######");
+//
+//         ServerManagement.kill(1);
+//
+//         killed = true;
+//
+//         long sleepTime = 30;
+//
+//         log.info("killed server, now waiting for " + sleepTime + " seconds");
+//
+//         // NOTE: the sleep time needs to be longer than the Remoting connector's lease period
+//         Thread.sleep(sleepTime * 1000);
+//
+//         log.info("done wait");
+//
+//         state = (ConnectionState)del.getState();
+//
+//         int finalServerID = state.getServerID();
+//
+//         log.info("final server id= " + finalServerID);
+//
+//         //server id should now be 2
+//
+//         assertEquals(2, finalServerID);
+//
+//         conn.start();
+//
+//         //Now should be able to consume the rest of the messages
+//
+//         log.info("here1");
+//
+//         TextMessage tm = null;
+//
+//         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+//         {
+//            tm = (TextMessage)cons.receive(1000);
+//
+//            assertNotNull(tm);
+//
+//            log.debug("message is " + tm.getText());
+//
+//            assertEquals("message:" + i, tm.getText());
+//         }
+//
+//         log.info("here2");
+//
+//         //Now should be able to acknowledge them
+//
+//         tm.acknowledge();
+//
+//         //Now check there are no more messages there
+//         sess.close();
+//
+//         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//
+//         cons = sess.createConsumer(queue[1]);
+//
+//         Message m = cons.receive(500);
+//
+//         assertNull(m);
+//
+//         log.info("got to end of test");
+//      }
+//      finally
+//      {
+//         if (conn != null)
+//         {
+//            try
+//            {
+//               conn.close();
+//            }
+//            catch (Exception e)
+//            {
+//               e.printStackTrace();
+//            }
+//         }
+//
+//         // Resurrect dead server
+//         if (killed)
+//         {
+//            ServerManagement.start(1, "all");
+//         }
+//      }
+//
+//   }
+//
    
    /*
    TODO: Reactivate this test when http://jira.jboss.org/jira/browse/JBMESSAGING-883 is done
@@ -1095,7 +1243,57 @@
       conn2.close();
    }
 
-   
+   public void testInvalidate() throws Exception
+   {
+      JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
+
+      Connection conn1 = factory.createConnection();
+      JBossConnection conn2 = (JBossConnection)factory.createConnection();
+      conn1.close();
+
+      try
+      {
+         Session session = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         conn2.getDelegate().invalidate();
+
+         try
+         {
+            Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            fail ("delegate supposed to fail as connection was invalidated!");
+         }
+         catch (javax.jms.IllegalStateException e)
+         {
+            log.info("Caught expected exception - " + e);
+         }
+
+         try
+         {
+            conn2.start();
+            fail ("delegate supposed to fail as connection was invalidated!");
+         }
+         catch (javax.jms.IllegalStateException e)
+         {
+            log.info("Caught expected exception - " + e);
+         }
+
+         try
+         {
+            MessageConsumer consumer = session.createConsumer(queue[1]);
+            fail ("delegate supposed to fail as connection was invalidated!");
+         }
+         catch (javax.jms.IllegalStateException e)
+         {
+            log.info("Caught expected exception - " + e);
+         }
+      }
+      finally
+      {
+         conn2.close(); // we should still be able to close invalidated clients!
+      }
+
+   }
+
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list