[jboss-cvs] JBoss Messaging SVN: r5554 - in trunk: src/main/org/jboss/messaging/core/postoffice and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Dec 21 03:52:10 EST 2008


Author: timfox
Date: 2008-12-21 03:52:08 -0500 (Sun, 21 Dec 2008)
New Revision: 5554

Modified:
   trunk/.project
   trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
Some tweaks to preack


Modified: trunk/.project
===================================================================
--- trunk/.project	2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/.project	2008-12-21 08:52:08 UTC (rev 5554)
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <projectDescription>
-	<name>jboss-messaging</name>
+	<name>trunk</name>
 	<comment></comment>
 	<projects>
 	</projects>
@@ -10,8 +10,14 @@
 			<arguments>
 			</arguments>
 		</buildCommand>
+		<buildCommand>
+			<name>net.sourceforge.metrics.builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
 	</buildSpec>
 	<natures>
 		<nature>org.eclipse.jdt.core.javanature</nature>
+		<nature>net.sourceforge.metrics.nature</nature>
 	</natures>
 </projectDescription>

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java	2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java	2008-12-21 08:52:08 UTC (rev 5554)
@@ -20,13 +20,13 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-
 package org.jboss.messaging.core.postoffice;
 
 import java.util.List;
 
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.transaction.Transaction;
 
 /**
  * A Bindings
@@ -41,8 +41,10 @@
 {
    List<Binding> getBindings();
    
-   List<MessageReference> route(ServerMessage message);
+   List<MessageReference> route(final ServerMessage message);
    
+   List<MessageReference> route(ServerMessage message, Transaction tx);
+   
    void addBinding(Binding binding);
    
    void removeBinding(Binding binding);

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2008-12-21 08:52:08 UTC (rev 5554)
@@ -35,6 +35,7 @@
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.transaction.Transaction;
 
 /**
  * A BindingsImpl
@@ -83,7 +84,68 @@
          numberExclusive.decrementAndGet();
       }
    }
+   
+   public List<MessageReference> route(final ServerMessage message)
+   {
+      return route(message, null);
+   }
+   
+   public List<MessageReference> route(final ServerMessage message, final Transaction tx)
+   {
+      if (numberExclusive.get() > 0)
+      {
+         // We need to round robin
+         
+         List<MessageReference> refs = new ArrayList<MessageReference>(1);
 
+         Binding binding = getNext(message);
+            
+         if (binding != null)
+         {        
+            Queue queue = binding.getQueue();
+            
+            MessageReference reference = message.createReference(queue);
+   
+            refs.add(reference);             
+         }
+         
+         return refs;
+      }
+      else
+      {
+         // They all get the message
+         
+         // TODO - this can be optimised to avoid a copy
+         
+         if (!bindings.isEmpty())
+         {   
+            List<MessageReference> refs = new ArrayList<MessageReference>();
+   
+            for (Binding binding : bindings)
+            {
+               Queue queue = binding.getQueue();
+   
+               Filter filter = queue.getFilter();
+   
+               //Note we ignore any exclusive - this structure is concurrent so one could have been added
+               //since the initial check on number of exclusive
+               if (!binding.isExclusive() && (filter == null || filter.match(message)))
+               {
+                  MessageReference reference = message.createReference(queue);
+   
+                  refs.add(reference);
+               }
+            }
+            
+            return refs;
+         }
+         else
+         {
+            return Collections.<MessageReference>emptyList();
+         }
+      }
+   }
+   
    private Binding getNext(final ServerMessage message)
    {
       //It's not an exact round robin under concurrent access but that doesn't matter
@@ -161,60 +223,4 @@
             
       binding = null;
    }
-   
-   public List<MessageReference> route(final ServerMessage message)
-   {
-      if (numberExclusive.get() > 0)
-      {
-         // We need to round robin
-         
-         List<MessageReference> refs = new ArrayList<MessageReference>(1);
-
-         Binding binding = getNext(message);
-            
-         if (binding != null)
-         {        
-            Queue queue = binding.getQueue();
-            
-            MessageReference reference = message.createReference(queue);
-   
-            refs.add(reference);             
-         }
-         
-         return refs;
-      }
-      else
-      {
-         // They all get the message
-         
-         // TODO - this can be optimised to avoid a copy
-         
-         if (!bindings.isEmpty())
-         {   
-            List<MessageReference> refs = new ArrayList<MessageReference>();
-   
-            for (Binding binding : bindings)
-            {
-               Queue queue = binding.getQueue();
-   
-               Filter filter = queue.getFilter();
-   
-               //Note we ignore any exclusive - this structure is concurrent so one could have been added
-               //since the initial check on number of exclusive
-               if (!binding.isExclusive() && (filter == null || filter.match(message)))
-               {
-                  MessageReference reference = message.createReference(queue);
-   
-                  refs.add(reference);
-               }
-            }
-            
-            return refs;
-         }
-         else
-         {
-            return Collections.<MessageReference>emptyList();
-         }
-      }
-   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-12-21 08:52:08 UTC (rev 5554)
@@ -333,7 +333,7 @@
       
       if (bindings != null)
       {
-         references = bindings.route(message);
+         references = bindings.route(message, null);
 
          computePaging(address, message, references);
       }
@@ -360,7 +360,7 @@
       
       if (bindings != null)
       {
-         references = bindings.route(message);
+         references = bindings.route(message, tx);
 
          computePaging(address, message, references);
       }
@@ -400,6 +400,12 @@
                }
             }
          }
+         
+//         TODO - bindings.route does not need to return a messagereference list, instead bindings route should actually create the refs and send them to the queues
+//         then we can get rid of the deliver call too
+//                          
+//         for a transaction bindings.route
+//          
       
          if (deliver)
          {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-21 08:52:08 UTC (rev 5554)
@@ -536,9 +536,13 @@
             
             ref.getQueue().referenceHandled();
          }
-
          
-         
+         if (preAcknowledge)
+         {
+            //With pre-ack, we ack *before* sending to the client
+            doAck(ref);
+         }
+                  
          // TODO: get rid of the instanceof by something like message.isLargeMessage()
          if (message instanceof ServerLargeMessage)
          {            
@@ -567,12 +571,8 @@
          else
          {
             sendStandardMessage(ref, message);
-            
-            if (preAcknowledge)
-            {
-               doAck(ref);
-            }
          }
+         
          return HandleStatus.HANDLED;
       }
       finally
@@ -636,15 +636,12 @@
 
       private final MessageReference ref;
 
-      private boolean sentFirstMessage = false;
+      private volatile boolean sentFirstMessage = false;
 
-      // To avoid ACK to be called twice, as sendLargeMessage could be waiting another call because of flowControl
-      private boolean acked = false;
-
       /** The current position on the message being processed */
-      private long positionPendingLargeMessage;
+      private volatile long positionPendingLargeMessage;
 
-      private SessionReceiveContinuationMessage readAheadChunk;
+      private volatile SessionReceiveContinuationMessage readAheadChunk;
 
       public LargeMessageSender(final ServerLargeMessage message, final MessageReference ref)
       {
@@ -673,7 +670,6 @@
 
             if (!sentFirstMessage)
             {
-
                sentFirstMessage = true;
 
                MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
@@ -736,21 +732,6 @@
 
             pendingLargeMessage.releaseResources();
 
-            if (preAcknowledge && !acked)
-            {
-               acked = true;
-               try
-               {
-                  doAck(ref);
-               }
-               catch (Exception e)
-               {
-                  // Is there anything we could do here besides logging?
-                  // The message was already sent, and this shouldn't happen
-                  log.error("Error while ACKing reference " + ref, e);
-               }
-            }
-
             largeMessageSender = null;
 
             return true;

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-12-21 08:52:08 UTC (rev 5554)
@@ -23,7 +23,6 @@
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
@@ -43,6 +42,8 @@
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
+ * 
+ * TODO - this should be refactored to use transaction operations for adding, acking paging stuff etc
  */
 public class TransactionImpl implements Transaction
 {




More information about the jboss-cvs-commits mailing list