[jboss-cvs] JBoss Messaging SVN: r5275 - in branches/Branch_JBMESSAGING_1416: src/main/org/jboss/messaging/core/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 5 06:32:25 EST 2008


Author: gaohoward
Date: 2008-11-05 06:32:24 -0500 (Wed, 05 Nov 2008)
New Revision: 5275

Modified:
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/FailoverCommandCenter.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/MessagingTestCase.java
   branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java
Log:
JBMESSAGING-1416 (FIRST FAILOVER)


Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2008-11-05 11:18:01 UTC (rev 5274)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2008-11-05 11:32:24 UTC (rev 5275)
@@ -71,7 +71,7 @@
                                   JMSRemotingConnection remotingConnection)
       throws Exception
    {
-      log.debug("failure detected by " + source, reason);
+      log.info("failure detected by " + source, reason);
 
       // generate a FAILURE_DETECTED event
       broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILURE_DETECTED, source));

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-11-05 11:18:01 UTC (rev 5274)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-11-05 11:32:24 UTC (rev 5275)
@@ -591,7 +591,7 @@
     *
     * @see org.jboss.messaging.core.contract.Channel#deliver()
     */
-   /*debug use, delete them!!!
+   /*debug use, delete them!!!*/
    private void dlog(String lgmsg)
    {
       log.error("(*)-" + lgmsg);
@@ -600,7 +600,7 @@
    {
       dlog(lgmsg + this.getRefText(r));
    }
-   debug*/
+   /*debug*/
    protected void deliverInternal()
    {
       if (trace) { log.trace(this + " was prompted delivery"); }
@@ -625,12 +625,19 @@
 
             if (ref != null)
             {
-               
+dlog("========================================================================trace==========================================");
+StackTraceElement[] elements = Thread.currentThread().getStackTrace();
+for (StackTraceElement eme : elements)
+{
+   log.error("*" + eme.toString());
+}
+dlog("========================================================================trace end==========================================");
                //challengeSend can return: 1. ok because ref is not ordered. 2. ok because the ref is the first and available for sending 3. not ok because the ref is not the first and available for sending.
                // 4. not ok because the ref is the birst but is not available for sending (being sent).
                int status = monitor.isAvailable(ref);
                if (status != OrderingGroupMonitor.OK)
                {
+                  dlog("==================message is not ok for sending. " + status, ref);
                   //iterating time
                   if (iter == null)
                   {
@@ -642,6 +649,7 @@
                else
                {
                   // Attempt to push the ref to a receiver
+                  dlog("==================sending ", ref);
 
                   if (trace)
                   {
@@ -693,6 +701,7 @@
 
                      monitor.markSending(ref);
                      
+                     dlog("===================== message sent ok. ", ref);
                      // Receiver accepted the reference
                      synchronized (lock)
                      {
@@ -832,16 +841,33 @@
          }
       }
       
+      dlog("******** ACKNOWLEDGE MESSAGE: ", d.getReference());
       if (OrderingGroupMonitor.isOrderingGroupMessage(d.getReference()))
       {
          synchronized (lock)
          {
             if (monitor.messageCompleted(d.getReference()))
             {
+               dlog("******** triggering deliver here after ", d.getReference());
                deliverInternal();
             }
+            else
+            {
+               dlog("*************no need to triger, something wrong?????????");
+            }
          }
       }
+      else
+      {
+         MessageReference ref = d.getReference();
+         dlog("******** message is not a group member!!!!, something wrong!!!!", ref);
+         org.jboss.messaging.core.contract.Message msg2 = ref.getMessage();
+         if (msg2 instanceof JBossMessage)
+         {
+            JBossMessage rmsg = (JBossMessage)msg2;
+            dlog("********** msg group name: " + rmsg.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORDERING_GROUP_ID) + " for msg: ", ref);
+         }
+      }
    }
 
    protected InMemoryCallback getCallback(Transaction tx)
@@ -885,7 +911,7 @@
    // Private --------------------------------------------------------------------------------------
 
    //debug use, remove when done.
-   private String getRefText(MessageReference ref)
+   public static String getRefText(MessageReference ref)
    {
       String result = "<null>";
       if (ref == null) return result;

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2008-11-05 11:18:01 UTC (rev 5274)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2008-11-05 11:32:24 UTC (rev 5275)
@@ -73,7 +73,7 @@
 	
 	private static final Logger log = Logger.getLogger(MessagingQueue.class);
 	
-   private static final long DEFAULT_RECOVER_DELIVERIES_TIMEOUT = 5 * 60 * 1000;
+   private static final long DEFAULT_RECOVER_DELIVERIES_TIMEOUT = 20 * 1000; //debug!!! retore to 5*60*1000 aftwards!!!
          
    // Attributes ----------------------------------------------------
    
@@ -210,7 +210,7 @@
    {
       if (trace) { log.trace("Merging queue " + channelID + " node id " + nodeID + " into " + this + 
       		                 " initially refs:" + messageRefs.size()); }
-           
+log.error("$$$$$$$$$$ " + "In mergeIn, my node is : " + getNodeID() + " the dead one is " + nodeID);
       synchronized (lock)
       {
          flushDownCache();
@@ -252,7 +252,8 @@
          			RecoveryEntry re = new RecoveryEntry();
          			re.ref = ref;
          			re.sessionID = sessionID;
-         			
+
+log.error("=========================put to recovery map: " + ChannelSupport.getRefText(ref));
          			recoveryMap.put(new Long(message.getMessageID()), re);
          			
          			deliveringCount.increment();
@@ -673,7 +674,7 @@
 		public void timedOut(Timeout timeout)
 		{
 			if (trace) { log.trace("ClearRecoveryMap timeout fired"); }
-			
+log.error("*******************************finally the recover happend");			
 			Iterator iter = ids.iterator();
 			
 			boolean added = false;
@@ -681,15 +682,17 @@
 			while (iter.hasNext())
 			{
 				MessageReference ref = (MessageReference)iter.next();
-				
+
+log.error("********** message ref to be removed is: " + ChannelSupport.getRefText(ref));
 				Object obj = recoveryMap.remove(new Long(ref.getMessage().getMessageID()));
-				
+log.error("*********obj in recoveryMap: " + obj);				
 				if (obj != null)
 				{
 					if (trace) { log.trace("Adding ref " + ref + " back into queue"); }
 						
 					synchronized (lock)
-					{		
+					{
+log.error("********** adding to-recover message ref to list" + ChannelSupport.getRefText(ref));
 						messageRefs.addFirst(ref, ref.getMessage().getPriority());		
 						
 						deliveringCount.decrement();

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2008-11-05 11:18:01 UTC (rev 5274)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2008-11-05 11:32:24 UTC (rev 5275)
@@ -106,7 +106,11 @@
    public int isAvailable(MessageReference ref)
    {
       ReferenceHolder holder = sortedList.getFirst();
-      if (holder == null) return OrderingGroupMonitor.OK;
+      if (holder == null) 
+      {
+         log.error("==============message ok because the group is empty!!!!");
+         return OrderingGroupMonitor.OK;
+      }
       
       return holder.isAvailable(ref);
    }
@@ -126,6 +130,7 @@
       if (holder.matchMessage(ref))
       {
          long count = holder.releaseSendnRef();
+log.error("_______________________________________count is: " + count + " for " + ChannelSupport.getRefText(ref));
          if (count == 0)
          {
             sortedList.removeFirst();
@@ -276,6 +281,8 @@
 
 class ReferenceHolder implements Comparable<ReferenceHolder>
 {
+   private static final Logger log = Logger.getLogger(ReferenceHolder.class);
+
    private Long seq;
 
    private MessageReference ref;
@@ -332,15 +339,18 @@
       {
          if (pendingSentCount < refCount)
          {
+            log.error("===============ref is ok, " + pendingSentCount + " " + refCount);
             return OrderingGroupMonitor.OK;
          }
+         log.error("===============ref is not ok, " + pendingSentCount + " " + refCount);
          return OrderingGroupMonitor.NOT_OK_BEING_SENT;
       }
+      log.error("===============ref is not ok, because not first , the first is: " + ChannelSupport.getRefText(ref));
       return OrderingGroupMonitor.NOT_OK_NOT_FIRST;
    }
 
    /**
-    * Increase the ref count
+    * So far only allowed to register once. 
     */
    public void addRef()
    {
@@ -361,11 +371,11 @@
     */
    public long releaseSendnRef()
    {
-      if (pendingSentCount > 0)
-      {
+//      if (pendingSentCount > 0)
+//      {
          refCount--;
          pendingSentCount--;
-      }
+//      }
       return refCount;
    }
 

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	2008-11-05 11:18:01 UTC (rev 5274)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java	2008-11-05 11:32:24 UTC (rev 5275)
@@ -97,7 +97,9 @@
    public int isAvailable(MessageReference ref)
    {
       int result = OK;
+      log.error("===================isAvaialbe caled., extracting name" + ref);
       String grpName = extractGroupName(ref);
+      log.error("===================isAvaialbe., extracting name" + grpName);
       if (grpName != null)
       {
          synchronized (orderingGroups)
@@ -107,10 +109,15 @@
             {
                result = group.isAvailable(ref);
             }
+            else
+            {
+               log.error("============the group is not exist for this guy!!!!!, some thing wrong!!!");
+            }
          }
       }
       else
       {
+         log.error("------------------=========================message doesn't have group prop, fine by me");
          log.debug("message doesn't have group prop, fine by me");
       }
       return result;

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2008-11-05 11:18:01 UTC (rev 5274)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2008-11-05 11:32:24 UTC (rev 5275)
@@ -502,7 +502,11 @@
       {
          ReferenceInfo info = (ReferenceInfo)iter.next();
          
-         addFromRefInfo(info, refMap);
+         MessageReference added = addFromRefInfo(info, refMap);
+         //note, we registered the ref 'after' it has been added to the list
+         //it is safe as long as the caller of this method is synchronized on lock.
+         log.error("******************* registering loaded message: " + ChannelSupport.getRefText(added));
+         monitor.registerMessage(added, null);
       }
    }
         

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2008-11-05 11:18:01 UTC (rev 5274)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2008-11-05 11:32:24 UTC (rev 5275)
@@ -113,6 +113,7 @@
      
    public void start() throws Exception
    {		
+log.error("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ group member is starting....");
    	this.controlChannel = jChannelFactory.createControlChannel();
    	
       this.dataChannel = jChannelFactory.createDataChannel();

Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2008-11-05 11:18:01 UTC (rev 5274)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2008-11-05 11:32:24 UTC (rev 5275)
@@ -262,6 +262,7 @@
       throws Exception
    {
    	super (ds, tm, sqlProperties, createTablesOnStartup);
+log.error("======================================================post office (non-cluster) starting!!!");   
 
       this.thisNodeID = nodeId;
       
@@ -320,7 +321,7 @@
       this.clustered = true;
       
       this.failoverOnNodeLeave = failoverOnNodeLeave;
-      
+log.error("======================================================post office starting!!!");   
       groupMember = new GroupMember(groupName, stateTimeout, castTimeout, jChannelFactory, this, this);
 
       this.supportsFailover = supportsFailover;
@@ -963,7 +964,7 @@
 		         // The node crashed and we are the failover node so let's perform failover
 		
 		         log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
-		
+log.error("$$$$$$ " + "Node " + leftNodeID + " leaving!!!!, performFailover!!!!");
 		         performFailover(leftNodeID);
 		         
 		         doneFailover = true;
@@ -2937,6 +2938,8 @@
     */
    private void performFailover(Integer failedNodeID) throws Exception
    {
+log.error("################################################33$$$$$$$$$$ " + "performing failover called, the failed id is: " + failedNodeID);
+
    	log.info("JBoss Messaging is failing over for failed node " + failedNodeID + 
    			   ". If there are many messages to reload this may take some time...");
       
@@ -2946,6 +2949,7 @@
 
       log.debug(this + " announced it is starting failover procedure");
    	
+log.error("$$$$$$$$$$ " + "merging transactions ...");
       pm.mergeTransactions(failedNodeID.intValue(), thisNodeID);
       
       // Need to lock
@@ -3019,7 +3023,7 @@
                //need to merge the queues
             	
             	log.debug(this + " has already a queue: " + queue.getName() + " queue so merging queues");
-            	  
+log.error("$$$$$$$$$$ " + "Performing mergIng for " + localQueue.getName() + "for failed node: " + failedNodeID);
                localQueue.mergeIn(queue.getChannelID(), failedNodeID.intValue());
                
                log.debug("Merged queue");       

Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/MessagingTestCase.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/MessagingTestCase.java	2008-11-05 11:18:01 UTC (rev 5274)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/MessagingTestCase.java	2008-11-05 11:32:24 UTC (rev 5275)
@@ -558,11 +558,20 @@
 
    	while (true)
    	{
+   	   System.err.println("------------------------------------------------------------------");
    		if (count++>10)
    			throw new IllegalStateException("Cannot make connection to node " + serverId);
 
    		Connection connection = factory.createConnection();
 
+   		if (connection != null)
+   		{
+            System.err.println("-------conn: " + connection + " serverId: " + getServerId(connection));   		   
+   		}
+   		else
+   		{
+   		   System.err.println("--------conn: is null");
+   		}
    		if (getServerId(connection) == serverId)
    		{
    			return connection;

Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java	2008-11-05 11:18:01 UTC (rev 5274)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/clustering/OrderingGroupBasicClusteringTest.java	2008-11-05 11:32:24 UTC (rev 5275)
@@ -76,7 +76,7 @@
    
    public void testOrderingKillFailoverNodeTx() throws Exception
    {
-      testKillFailoverNode(true);
+      //testKillFailoverNode(true);
    }
 
    
@@ -101,7 +101,7 @@
                .createSession(false, Session.AUTO_ACKNOWLEDGE);
 
          JBossMessageProducer prod1 = (JBossMessageProducer)sessSend.createProducer(queue[1]);
-        // prod1.enableOrderingGroup(null);
+         prod1.enableOrderingGroup(null);
 
          final int numMessages = 10;
 
@@ -130,7 +130,7 @@
 
             assertNotNull(tm);
 
-            //assertEquals("message" + i, tm.getText());
+            assertEquals("message" + i, tm.getText());
             if (transactional)
             {
                sess1.commit();
@@ -242,7 +242,15 @@
 
          log.info("Created consumer");
 
-         System.err.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+         System.err.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>sleeping for 310 sec to allow recover time out happens>>>>>>>>>>>>>>>>");
+         try
+         {
+            Thread.sleep(310000);
+         }
+         catch (InterruptedException e)
+         {
+            log.warn("sleeping interrupted, time may not be enough for recovery");
+         }
          // the remaining messages should be received.
          for (int i = numMessages/2 + 1; i < numMessages; i++)
          {
@@ -250,7 +258,7 @@
             assertNotNull(tm);
             System.err.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> : " + tm.getText());
             System.err.println(">>>>>>>>.this group name: " + tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORDERING_GROUP_ID));
-            //assertEquals("message" + i, tm.getText());
+            assertEquals("message" + i, tm.getText());
          }
       } finally
       {




More information about the jboss-cvs-commits mailing list