[jboss-cvs] JBoss Messaging SVN: r5806 - trunk/src/main/org/jboss/messaging/core/server/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 4 08:41:04 EST 2009


Author: ataylor
Date: 2009-02-04 08:41:04 -0500 (Wed, 04 Feb 2009)
New Revision: 5806

Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java
Log:
refactoring

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-02-04 11:28:46 UTC (rev 5805)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-02-04 13:41:04 UTC (rev 5806)
@@ -97,7 +97,7 @@
 
    private final AtomicInteger messagesAdded = new AtomicInteger(0);
 
-   private final AtomicInteger deliveringCount = new AtomicInteger(0);
+   protected final AtomicInteger deliveringCount = new AtomicInteger(0);
 
    private final AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
 
@@ -1310,78 +1310,6 @@
          deliver();
       }
    }
-
-   final void discardMessage(MessageReference ref, Transaction tx) throws Exception
-   {
-      deliveringCount.decrementAndGet();
-      PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
-      store.addSize(-ref.getMemoryEstimate());
-      QueueImpl queue = (QueueImpl) ref.getQueue();
-      ServerMessage msg = ref.getMessage();
-      boolean durableRef = msg.isDurable() && queue.isDurable();
-
-      if (durableRef)
-      {
-         int count = msg.decrementDurableRefCount();
-
-         if (count == 0)
-         {
-            if (tx == null)
-            {
-               storageManager.deleteMessage(msg.getMessageID());
-            }
-            else
-            {
-               storageManager.deleteMessageTransactional(tx.getID(), getPersistenceID(), msg.getMessageID());
-            }
-         }
-      }
-   }
-
-   final void discardMessage(Long id, Transaction tx) throws Exception
-   {
-      RefsOperation oper = getRefsOperation(tx);
-      Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
-
-      while (iterator.hasNext())
-      {
-         MessageReference ref = iterator.next();
-
-         if (ref.getMessage().getMessageID() == id)
-         {
-            iterator.remove();
-            discardMessage(ref, tx);
-            break;
-         }
-      }
-
-   }
-
-
-   final void rediscardMessage(long id, Transaction tx) throws Exception
-   {
-      RefsOperation oper = getRefsOperation(tx);
-      Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
-
-      while (iterator.hasNext())
-      {
-         MessageReference ref = iterator.next();
-
-         if (ref.getMessage().getMessageID() == id)
-         {
-            iterator.remove();
-            rediscardMessage(ref);
-            break;
-         }
-      }
-   }
-
-   final void rediscardMessage(MessageReference ref) throws Exception
-   {
-      deliveringCount.decrementAndGet();
-      PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
-      store.addSize(-ref.getMemoryEstimate());
-   }
    // Inner classes
    // --------------------------------------------------------------------------
 
@@ -1399,7 +1327,7 @@
       }
    }
 
-   private class RefsOperation implements TransactionOperation
+   final class RefsOperation implements TransactionOperation
    {
       List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java	2009-02-04 11:28:46 UTC (rev 5805)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java	2009-02-04 13:41:04 UTC (rev 5806)
@@ -30,6 +30,8 @@
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.util.SimpleString;
 
 import java.util.ArrayList;
@@ -37,6 +39,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Iterator;
 import java.util.concurrent.ScheduledExecutorService;
 
 /**
@@ -47,6 +50,11 @@
 {
    private final Map<SimpleString, ServerMessage> map = new HashMap<SimpleString, ServerMessage>();
 
+   private final PagingManager pagingManager;
+
+    private final StorageManager storageManager;
+
+
    public SoloQueueImpl(final long persistenceID,
                     final SimpleString name,
                     final Filter filter,
@@ -58,6 +66,8 @@
                     final HierarchicalRepository<AddressSettings> queueSettingsRepository)
    {
       super(persistenceID, name, filter, durable, temporary, scheduledExecutor, postOffice, storageManager, queueSettingsRepository);
+      this.pagingManager  = postOffice.getPagingManager();
+      this.storageManager = storageManager;
    }
 
    public void route(final ServerMessage message, final Transaction tx) throws Exception
@@ -81,8 +91,7 @@
                   ref = removeReferenceWithID(msg.getMessageID());
                   if (ref != null)
                   {
-                     QueueImpl queue = (QueueImpl) ref.getQueue();
-                     queue.discardMessage(ref, tx);
+                     discardMessage(ref, tx);
                   }
                }
 
@@ -199,5 +208,75 @@
       super.postRollback(refs);
    }
 
+      final void discardMessage(MessageReference ref, Transaction tx) throws Exception
+   {
+      deliveringCount.decrementAndGet();
+      PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
+      store.addSize(-ref.getMemoryEstimate());
+      QueueImpl queue = (QueueImpl) ref.getQueue();
+      ServerMessage msg = ref.getMessage();
+      boolean durableRef = msg.isDurable() && queue.isDurable();
 
+      if (durableRef)
+      {
+         int count = msg.decrementDurableRefCount();
+
+         if (count == 0)
+         {
+            if (tx == null)
+            {
+               storageManager.deleteMessage(msg.getMessageID());
+            }
+            else
+            {
+               storageManager.deleteMessageTransactional(tx.getID(), getPersistenceID(), msg.getMessageID());
+            }
+         }
+      }
+   }
+
+   final void discardMessage(Long id, Transaction tx) throws Exception
+   {
+      RefsOperation oper = getRefsOperation(tx);
+      Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
+
+      while (iterator.hasNext())
+      {
+         MessageReference ref = iterator.next();
+
+         if (ref.getMessage().getMessageID() == id)
+         {
+            iterator.remove();
+            discardMessage(ref, tx);
+            break;
+         }
+      }
+
+   }
+
+
+   final void rediscardMessage(long id, Transaction tx) throws Exception
+   {
+      RefsOperation oper = getRefsOperation(tx);
+      Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
+
+      while (iterator.hasNext())
+      {
+         MessageReference ref = iterator.next();
+
+         if (ref.getMessage().getMessageID() == id)
+         {
+            iterator.remove();
+            rediscardMessage(ref);
+            break;
+         }
+      }
+   }
+
+   final void rediscardMessage(MessageReference ref) throws Exception
+   {
+      deliveringCount.decrementAndGet();
+      PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
+      store.addSize(-ref.getMemoryEstimate());
+   }
 }




More information about the jboss-cvs-commits mailing list