[jboss-cvs] JBoss Messaging SVN: r3235 - in trunk: src/main/org/jboss/jms/server/endpoint and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Oct 21 09:46:18 EDT 2007


Author: timfox
Date: 2007-10-21 09:46:15 -0400 (Sun, 21 Oct 2007)
New Revision: 3235

Modified:
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
Log:
Only replicate transacted or client ack sessions


Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-10-21 11:51:18 UTC (rev 3234)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2007-10-21 13:46:15 UTC (rev 3235)
@@ -285,12 +285,12 @@
          // to AUTO_ACKNOWLEDGE)
 
          log.trace(this + " is not transacted (or XA with no transaction set), " +
-            "retrieving deliveries from session state");
+                   "retrieving deliveries from session state");
 
          // We remove any unacked non-persistent messages - this is because we don't want to ack
          // them since the server won't know about them and will get confused
 
-         if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+         if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
          {
             for(Iterator i = getClientAckList().iterator(); i.hasNext(); )
             {
@@ -352,7 +352,12 @@
       //like remove from recovery Area refs corresponding to messages in client consumer buffers
       
       log.trace(this + " sending delivery recovery " + recoveryInfos + " on failover");
-      newDelegate.recoverDeliveries(recoveryInfos, oldSessionID);
+      
+      //Note we only recover sessions that are transacted or client ack
+      if (transacted || xa || acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
+      {
+      	newDelegate.recoverDeliveries(recoveryInfos, oldSessionID);
+      }
    }
    
    // Public ---------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-10-21 11:51:18 UTC (rev 3234)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-10-21 13:46:15 UTC (rev 3235)
@@ -33,6 +33,7 @@
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
+import javax.jms.Session;
 
 import org.jboss.aop.AspectManager;
 import org.jboss.jms.client.delegate.ClientSessionDelegate;
@@ -239,8 +240,11 @@
            
          // create the corresponding server-side session endpoint and register it with this
          // connection endpoint instance
-         ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this);
          
+         //Note we only replicate transacted and client acknowledge sessions.
+         ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this,
+         		                     transacted || acknowledgmentMode == Session.CLIENT_ACKNOWLEDGE);
+         
          synchronized (sessions)
          {
             sessions.put(sessionID, ep);

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-21 11:51:18 UTC (rev 3234)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-21 13:46:15 UTC (rev 3235)
@@ -162,6 +162,7 @@
    private Queue defaultDLQ;
    private Queue defaultExpiryQueue;
    private boolean supportsFailover;
+   private boolean replicating;
    
    private Object deliveryLock = new Object();
       
@@ -181,12 +182,15 @@
    
    // Constructors ---------------------------------------------------------------------------------
 
-   ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception
+   ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
+   		                boolean replicating) throws Exception
    {
       this.id = sessionID;
 
       this.connectionEndpoint = connectionEndpoint;
       
+      this.replicating = replicating;
+      
       callbackHandler = connectionEndpoint.getCallbackHandler();
       
       sp = connectionEndpoint.getServerPeer();
@@ -465,8 +469,11 @@
       {
          Delivery del = cancelDeliveryInternal(cancel);
          
-         //Prompt delivery
-         promptDelivery((Channel)del.getObserver());
+         if (del != null)
+         {         
+	         //Prompt delivery
+	         promptDelivery((Channel)del.getObserver());
+         }
       }
       catch (Throwable t)
       {
@@ -492,7 +499,10 @@
                         
             Delivery del = cancelDeliveryInternal(cancel);
             
-            channels.add(del.getObserver());
+            if (del != null)
+            {            	
+            	channels.add(del.getObserver());
+            }
          }
                  
          if (trace) { log.trace("Cancelled deliveries"); }
@@ -920,7 +930,7 @@
    	
    	boolean gotSome = false;
    	   	
-   	if (!firstNode)
+   	if (!firstNode && replicating)
    	{	   	
 	   	if (trace) { log.trace("Now collecting"); }
 	   	   	
@@ -1348,7 +1358,9 @@
        
        Message message = delivery.getReference().getMessage();
        
-       if (!consumer.isReplicating())
+       //Note that we only replicate transacted or client acknowledge sessions
+       //There is no point in replicating AUTO_ACK or DUPS_OK
+       if (!consumer.isReplicating() || !replicating)
        {
       	 if (trace) { log.trace(this + " doing the delivery straight away"); }
       	 
@@ -1588,7 +1600,13 @@
       
       if (rec == null)
       {
-         throw new IllegalStateException("Cannot find delivery to cancel " + cancel.getDeliveryId());
+         //The delivery might not be found, if the session is not replicated (i.e. auto_ack or dups_ok)
+      	//and has failed over since recoverDeliveries won't have been called
+      	if (trace)
+      	{
+      		log.trace("Cannot find delivery to cancel, session probably failed over and is not replicated");
+      	}
+      	return null;
       }
                  
       //Note we check the flag *and* evaluate again, this is because the server and client clocks may
@@ -1730,7 +1748,10 @@
       
       if (rec == null)
       {
-         log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before ");
+         log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
+         
+         log.info("**** CANNOT FIND ACK TO ACKNOWLEDGED");
+         
          return;
       }
       
@@ -1738,7 +1759,7 @@
       
       //Now replicate the ack
       
-      if (rec.replicating)
+      if (rec.replicating && replicating)
       {
       	postOffice.sendReplicateAckMessage(rec.queueName, rec.del.getReference().getMessage().getMessageID());
       }

Modified: trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java	2007-10-21 11:51:18 UTC (rev 3234)
+++ trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java	2007-10-21 13:46:15 UTC (rev 3235)
@@ -148,7 +148,21 @@
    	
       Integer messageCount = (Integer)ServerManagement.getServer(server).getAttribute(destObjectName, "MessageCount");
       
-      assertEquals(0, messageCount.intValue());
+      if (messageCount.intValue() != 0)
+      {
+      	//Now delete it - prevents other tests from failing
+      	try
+      	{
+      		this.removeAllMessages(queue.getQueueName(), true, server);
+      	}
+      	catch (Exception e)
+      	{
+      		log.debug("Failed to remove all messages", e);
+      	}
+      	
+      	fail("Queue " + queue.getQueueName()  + " is not empty");
+      }
+
    }
    
    protected void checkEmpty(Topic topic) throws Exception




More information about the jboss-cvs-commits mailing list