[jboss-cvs] JBoss Messaging SVN: r2468 - in trunk: src/main/org/jboss/jms/client/remoting and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Feb 27 11:48:23 EST 2007


Author: timfox
Date: 2007-02-27 11:48:23 -0500 (Tue, 27 Feb 2007)
New Revision: 2468

Modified:
   trunk/src/etc/server/default/deploy/messaging-service.xml
   trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
Log:
Fixed another bug in failover plus a few bits and pieces



Modified: trunk/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/messaging-service.xml	2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/etc/server/default/deploy/messaging-service.xml	2007-02-27 16:48:23 UTC (rev 2468)
@@ -33,8 +33,8 @@
       <attribute name="DefaultExpiryQueue">jboss.messaging.destination:service=Queue,name=ExpiryQueue</attribute>
       <attribute name="DefaultRedeliveryDelay">0</attribute>
       <attribute name="QueueStatsSamplePeriod">5000</attribute>
-      <attribute name="FailoverStartTimeout">3000</attribute>
-      <attribute name="FailoverCompleteTimeout">12000</attribute>
+      <attribute name="FailoverStartTimeout">60000</attribute>
+      <attribute name="FailoverCompleteTimeout">300000</attribute>
       <attribute name="DefaultMessageCounterHistoryDayLimit">-1</attribute>
 
       <depends optional-attribute-name="PersistenceManager">jboss.messaging:service=PersistenceManager</depends>

Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-02-27 16:48:23 UTC (rev 2468)
@@ -88,7 +88,7 @@
       if (parameter instanceof ClientDelivery)
       {
          ClientDelivery dr = (ClientDelivery)parameter;
-         
+          
          Message msg = dr.getMessage();
          
          MessageProxy proxy = JBossMessage.

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-27 16:48:23 UTC (rev 2468)
@@ -42,6 +42,7 @@
 import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
 import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
 
+import EDU.oswego.cs.dl.util.concurrent.Executor;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
 /**
@@ -505,7 +506,7 @@
    /**
     * Needed for failover
     */
-   public void synchronizeWith(MessageCallbackHandler newHandler)
+   public void synchronizeWith(MessageCallbackHandler newHandler, QueuedExecutor sessionExecutor)
    {
       consumerID = newHandler.consumerID;
 
@@ -517,6 +518,8 @@
 
       buffer.clear();
       
+      this.sessionExecutor = sessionExecutor;
+      
       // need to reset toggle state
       serverSending = true;
       

Modified: trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConsumerState.java	2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/client/state/ConsumerState.java	2007-02-27 16:48:23 UTC (rev 2468)
@@ -142,7 +142,9 @@
       MessageCallbackHandler handler = oldCallbackManager.unregisterHandler(oldConsumerID);
       MessageCallbackHandler newHandler = newCallbackManager.unregisterHandler(consumerID);
 
-      handler.synchronizeWith(newHandler);
+      SessionState sstate = (SessionState)this.getParent();
+      
+      handler.synchronizeWith(newHandler, sstate.getExecutor());
       newCallbackManager.registerHandler(consumerID, handler);
    }
 

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-02-27 16:48:23 UTC (rev 2468)
@@ -133,7 +133,7 @@
          currentTxId = parent.getResourceManager().createLocalTx();
       }
 
-      executor = new QueuedExecutor(new LinkedQueue());
+      createExecutor();
 
       clientAckList = new ArrayList();
 
@@ -197,6 +197,12 @@
 
       int oldSessionID = sessionID;
       sessionID = newState.sessionID;
+      
+      // We need to clear anything waiting in the session executor - since there may be messages
+      // from before failover waiting in there and we don't want them to get delivered after failover
+      executor.shutdownAfterProcessingCurrentTask();
+      
+      createExecutor();
 
       ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
 
@@ -333,9 +339,9 @@
       else
       {
          log.debug(this + " no delivery recovery info to send on failover");
-      }
+      }           
    }
-
+   
    // Public ---------------------------------------------------------------------------------------
 
    /**
@@ -441,6 +447,12 @@
 
    // Private --------------------------------------------------------------------------------------
 
+   private void createExecutor()
+   {
+      executor = new QueuedExecutor(new LinkedQueue());
+   }
+
+   
    // Inner classes --------------------------------------------------------------------------------
 
 }

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-02-27 16:48:23 UTC (rev 2468)
@@ -123,9 +123,11 @@
 
    private Object failoverStatusLock;
    
-   private long failoverStartTimeout = 3000;
+   //Default is 1 minute
+   private long failoverStartTimeout = 60 * 1000;
    
-   private long failoverCompleteTimeout = 12000;
+   //Default is 5 minutes
+   private long failoverCompleteTimeout = 5 * 60 * 1000;
    
    private Map sessions;
    

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-02-27 16:48:23 UTC (rev 2468)
@@ -276,8 +276,7 @@
 
          Client callbackClient = callbackHandler.getCallbackClient();
 
-         ClientDelivery del = new ClientDelivery(message, id, deliveryId, ref
-                  .getDeliveryCount());
+         ClientDelivery del = new ClientDelivery(message, id, deliveryId, ref.getDeliveryCount());
 
          Callback callback = new Callback(del);
 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java	2007-02-27 13:35:05 UTC (rev 2467)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java	2007-02-27 16:48:23 UTC (rev 2468)
@@ -228,11 +228,10 @@
          while (!killer.isDone())
          {
             TextMessage tm = sessSend.createTextMessage("message " + count);
+            tm.setIntProperty("cnt", count);
 
             prod.send(tm);
             
-            //Thread.sleep(10);
-
             if (count % 100 == 0)
             {
                log.info("sent " + count);
@@ -240,13 +239,9 @@
 
             count++;
          }
-         
-         log.info("sending done");
-         
+              
          t.join();
          
-         log.info("stopping listener");
-         
          if (killer.failed)
          {
             fail();
@@ -264,9 +259,19 @@
       }
       finally
       {
+         if (!ServerManagement.isStarted(0))
+         {
+            ServerManagement.start(0, "all");
+         }
+         
+         if (!ServerManagement.isStarted(1))
+         {
+            ServerManagement.start(1, "all");
+         }
+         
          if (conn != null)
          {
-            log.info("closing connetion");
+            log.info("closing connection");
             try
             {
                conn.close();
@@ -395,7 +400,8 @@
       {
          this.latch = latch;
       }
-           
+      
+   
       public void onMessage(Message msg)
       {
          try
@@ -404,17 +410,14 @@
             
             if (count % 100 == 0)
             {
-               log.info("Received message " + tm.getText());
+               log.info("Received message " + tm.getText() + " (" + tm + ")");
             }
             
-            if (!tm.getText().equals("message " + count))
+            if (tm.getIntProperty("cnt") != count)
             {
-               log.error("Expected message " + count + " but got " + tm.getText());
-               
+               log.error("Wrong message received " + tm.getIntProperty("cnt"));
                failed = true;
-               
-               latch.release();
-            }
+            }            
             
             count++;
          }




More information about the jboss-cvs-commits mailing list