[jboss-cvs] JBoss Messaging SVN: r3688 - in trunk: src/main/org/jboss/jms/server/endpoint and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 8 14:11:24 EST 2008


Author: timfox
Date: 2008-02-08 14:11:23 -0500 (Fri, 08 Feb 2008)
New Revision: 3688

Modified:
   trunk/src/main/org/jboss/jms/client/JBossConnectionMetaData.java
   trunk/src/main/org/jboss/jms/client/JMSMessageListenerWrapper.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/MessageReference.java
   trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/TransactionImpl.java
   trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
Log:
Fixed some stuff related to redelivery


Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionMetaData.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionMetaData.java	2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionMetaData.java	2008-02-08 19:11:23 UTC (rev 3688)
@@ -39,7 +39,6 @@
  */
 public class JBossConnectionMetaData implements ConnectionMetaData
 {
-
    // Constants -----------------------------------------------------
 
    // Static --------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JMSMessageListenerWrapper.java	2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/jms/client/JMSMessageListenerWrapper.java	2008-02-08 19:11:23 UTC (rev 3688)
@@ -100,7 +100,7 @@
          if (!transactedOrClientAck)
          {            
             try
-            {
+            {                              
                session.getCoreSession().rollback();
                
                session.setRecoverCalled(true);

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2008-02-08 19:11:23 UTC (rev 3688)
@@ -21,18 +21,15 @@
  */
 package org.jboss.jms.server.endpoint;
 
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BROWSER_RESET;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BROWSER_HASNEXTMESSAGE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BROWSER_NEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BROWSER_RESET;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidSelectorException;
-
 import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.MessageReference;
@@ -40,11 +37,11 @@
 import org.jboss.messaging.core.impl.filter.FilterImpl;
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.SessionBrowserHasNextMessageResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionBrowserNextMessageResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.NullPacket;
 import org.jboss.messaging.core.remoting.wireformat.Packet;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.SessionBrowserHasNextMessageResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionBrowserNextMessageResponseMessage;
 import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessagingException;
 
@@ -69,10 +66,10 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   private String id;
-   private ServerSessionEndpoint session;
-   private Queue destination;
-   private Filter filter;
+   private final String id;
+   private final ServerSessionEndpoint session;
+   private final Queue destination;
+   private final Filter filter;
    private Iterator iterator;
 
    // Constructors ---------------------------------------------------------------------------------
@@ -86,15 +83,12 @@
 
 		if (messageFilter != null)
 		{	
-		   try
-		   {
-		      filter = new FilterImpl(messageFilter);
-		   }
-		   catch (Exception e)
-		   {
-		      throw new InvalidSelectorException("Invalid selector " + messageFilter);
-		   }
+		   filter = new FilterImpl(messageFilter);
 		}
+		else
+		{
+		   filter = null;
+		}
    }
 
    // BrowserEndpoint implementation ---------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-02-08 19:11:23 UTC (rev 3688)
@@ -73,31 +73,31 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   private String id;
+   private final String id;
 
    private volatile boolean started;
 
-   private String username;
+   private final String username;
    
-   private String password;
+   private final String password;
 
-   private String remotingClientSessionID;
+   private final String remotingClientSessionID;
    
-   private String jmsClientVMID;
+   private final String jmsClientVMID;
 
-   private MessagingServer messagingServer;
+   private final MessagingServer messagingServer;
 
-   private PostOffice postOffice;
+   private final PostOffice postOffice;
    
-   private SecurityStore sm;
+   private final SecurityStore sm;
    
-   private ConnectionManager cm;
+   private final ConnectionManager cm;
 
-   private ConcurrentMap<String, ServerSessionEndpoint> sessions = new ConcurrentHashMap<String, ServerSessionEndpoint>();
+   private final ConcurrentMap<String, ServerSessionEndpoint> sessions = new ConcurrentHashMap<String, ServerSessionEndpoint>();
 
-   private Set<Queue> temporaryQueues = new ConcurrentHashSet<Queue>();
+   private final Set<Queue> temporaryQueues = new ConcurrentHashSet<Queue>();
 
-   private int prefetchSize;
+   private final int prefetchSize;
 
    // Constructors ---------------------------------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-08 19:11:23 UTC (rev 3688)
@@ -139,35 +139,35 @@
    // Attributes
    // -----------------------------------------------------------------------------------
 
-   private SecurityAspect security = new SecurityAspect();
+   private final SecurityAspect security = new SecurityAspect();
 
-   private boolean trace = log.isTraceEnabled();
+   private final boolean trace = log.isTraceEnabled();
 
-   private String id;
+   private final String id;
 
-   private ServerConnectionEndpoint connectionEndpoint;
+   private final ServerConnectionEndpoint connectionEndpoint;
 
-   private MessagingServer sp;
+   private final MessagingServer sp;
 
-   private Map<String, ServerConsumerEndpoint> consumers = new ConcurrentHashMap<String, ServerConsumerEndpoint>();
+   private final Map<String, ServerConsumerEndpoint> consumers = new ConcurrentHashMap<String, ServerConsumerEndpoint>();
 
-   private Map<String, ServerBrowserEndpoint> browsers = new ConcurrentHashMap<String, ServerBrowserEndpoint>();
+   private final Map<String, ServerBrowserEndpoint> browsers = new ConcurrentHashMap<String, ServerBrowserEndpoint>();
 
-   private PostOffice postOffice;
+   private final PostOffice postOffice;
 
-   private volatile LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
+   private final LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
 
    private long deliveryIDSequence = 0;
 
-   ExecutorService executor = Executors.newSingleThreadExecutor();
+   private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
    private Transaction tx;
 
-   private boolean autoCommitSends;
+   private final boolean autoCommitSends;
 
-   private boolean autoCommitAcks;
+   private final boolean autoCommitAcks;
 
-   private ResourceManager resourceManager;
+   private final ResourceManager resourceManager;
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -378,6 +378,9 @@
                else
                {
                   tx.addAcknowledgement(ref);
+                  
+                  //Del count is not actually updated in storage unless it's cancelled
+                  ref.incrementDeliveryCount();
                }
 
                if (rec.getDeliveryID() == deliveryID)
@@ -413,6 +416,9 @@
                else
                {
                   tx.addAcknowledgement(ref);
+                  
+                  //Del count is not actually updated in storage unless it's cancelled
+                  ref.incrementDeliveryCount();
                }
 
                break;
@@ -800,17 +806,19 @@
 
    private void addAddress(String address) throws Exception
    {
-      if (postOffice.containsAllowableAddress(address)) { throw new MessagingException(
-            MessagingException.ADDRESS_EXISTS, "Address already exists: "
-                  + address); }
+      if (postOffice.containsAllowableAddress(address))
+      {
+         throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
+      }
       postOffice.addAllowableAddress(address);
    }
 
    private void removeAddress(String address) throws Exception
    {
-      if (!postOffice.removeAllowableAddress(address)) { throw new MessagingException(
-            MessagingException.ADDRESS_DOES_NOT_EXIST,
-            "Address does not exist: " + address); }
+      if (!postOffice.removeAllowableAddress(address))
+      {
+         throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST, "Address does not exist: " + address);
+      }
    }
 
    private void createQueue(String address, String queueName,
@@ -819,8 +827,10 @@
    {
       Binding binding = postOffice.getBinding(queueName);
 
-      if (binding != null) { throw new MessagingException(
-            MessagingException.QUEUE_EXISTS); }
+      if (binding != null)
+      {
+         throw new MessagingException(MessagingException.QUEUE_EXISTS);
+      }
 
       if (temporary)
       {
@@ -849,7 +859,10 @@
    {
       Binding binding = postOffice.removeBinding(queueName);
 
-      if (binding == null) { throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST); }
+      if (binding == null)
+      {
+         throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+      }
 
       Queue queue = binding.getQueue();
 
@@ -869,7 +882,10 @@
    {
       Binding binding = postOffice.getBinding(queueName);
 
-      if (binding == null) { throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST); }
+      if (binding == null)
+      {
+         throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+      }
 
       int prefetchSize = connectionEndpoint.getPrefetchSize();
 
@@ -961,8 +977,10 @@
    {
       Binding binding = postOffice.getBinding(queueName);
 
-      if (binding == null) { throw new MessagingException(
-            MessagingException.QUEUE_DOES_NOT_EXIST); }
+      if (binding == null)
+      {
+         throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+      }
 
       String browserID = UUID.randomUUID().toString();
 
@@ -1170,8 +1188,7 @@
          }
          else
          {
-            throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
-                  "Unsupported packet " + type);
+            throw new MessagingException(MessagingException.UNSUPPORTED_PACKET, "Unsupported packet " + type);
          }
 
          // reply if necessary

Modified: trunk/src/main/org/jboss/messaging/core/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessageReference.java	2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/messaging/core/MessageReference.java	2008-02-08 19:11:23 UTC (rev 3688)
@@ -56,11 +56,13 @@
    
    void setDeliveryCount(int deliveryCount);        
    
+   void incrementDeliveryCount();
+   
    Queue getQueue();
    
    void acknowledge(PersistenceManager persistenceManager) throws Exception;  
    
-   void cancel(PersistenceManager persistenceManager) throws Exception;  
+   boolean cancel(PersistenceManager persistenceManager) throws Exception;  
    
    void expire(PersistenceManager persistenceManager) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java	2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java	2008-02-08 19:11:23 UTC (rev 3688)
@@ -53,12 +53,8 @@
    
    // Constructors --------------------------------------------------
 
-   /**
-    * Required by externalization.
-    */
    public MessageReferenceImpl()
    {
-      if (trace) { log.trace("Creating using default constructor"); }
    }
 
    public MessageReferenceImpl(MessageReferenceImpl other, Queue queue)
@@ -96,6 +92,11 @@
       this.deliveryCount = deliveryCount;
    }
    
+   public void incrementDeliveryCount()
+   {
+      deliveryCount++;
+   }
+   
    public long getScheduledDeliveryTime()
    {
       return scheduledDeliveryTime;
@@ -126,10 +127,8 @@
       queue.decrementDeliveringCount();
    }
    
-   public void cancel(PersistenceManager persistenceManager) throws Exception
+   public boolean cancel(PersistenceManager persistenceManager) throws Exception
    {      
-      deliveryCount++;
-      
       if (message.isDurable() && queue.isDurable())
       {
          persistenceManager.updateDeliveryCount(queue, this);
@@ -154,8 +153,14 @@
             log.warn("Message has reached maximum delivery attempts, no DLQ is configured so dropping it");
             
             acknowledge(persistenceManager);
-         }         
+         }       
+         
+         return false;
       }
+      else
+      {
+         return true;
+      }
    }
    
    public void expire(PersistenceManager persistenceManager) throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/TransactionImpl.java	2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/src/main/org/jboss/messaging/core/impl/TransactionImpl.java	2008-02-08 19:11:23 UTC (rev 3688)
@@ -254,18 +254,16 @@
             queueMap.put(queue, list);
          }
                  
-         list.add(ref);
+         if (ref.cancel(persistenceManager))
+         {
+            list.add(ref);
+         }
       }
       
       for (Map.Entry<Queue, LinkedList<MessageReference>> entry: queueMap.entrySet())
       {                  
          LinkedList<MessageReference> refs = entry.getValue();
-         
-         for (MessageReference ref: refs)
-         {
-            ref.cancel(persistenceManager);
-         }
-                  
+                
          entry.getKey().addListFirst(refs);
       }
    }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java	2008-02-08 18:08:20 UTC (rev 3687)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java	2008-02-08 19:11:23 UTC (rev 3688)
@@ -550,8 +550,7 @@
          
          assertEquals(tm.getText(), rm.getText());
          
-         //Delivery count is not hard and fast - is best effort
-         assertEquals(5, rm.getIntProperty("JMSXDeliveryCount"));
+         assertEquals(4, rm.getIntProperty("JMSXDeliveryCount"));
          
          assertTrue(rm.getJMSRedelivered());
          
@@ -575,8 +574,7 @@
          	catch (Exception ignore)
          	{         		
          	}
-         }
-         
+         }         
          if (xaConn != null)
          {
             xaConn.close();               




More information about the jboss-cvs-commits mailing list