[jboss-cvs] JBoss Messaging SVN: r5806 - trunk/src/main/org/jboss/messaging/core/server/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Feb 4 08:41:04 EST 2009
Author: ataylor
Date: 2009-02-04 08:41:04 -0500 (Wed, 04 Feb 2009)
New Revision: 5806
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java
Log:
refactoring
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-02-04 11:28:46 UTC (rev 5805)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-02-04 13:41:04 UTC (rev 5806)
@@ -97,7 +97,7 @@
private final AtomicInteger messagesAdded = new AtomicInteger(0);
- private final AtomicInteger deliveringCount = new AtomicInteger(0);
+ protected final AtomicInteger deliveringCount = new AtomicInteger(0);
private final AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
@@ -1310,78 +1310,6 @@
deliver();
}
}
-
- final void discardMessage(MessageReference ref, Transaction tx) throws Exception
- {
- deliveringCount.decrementAndGet();
- PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
- store.addSize(-ref.getMemoryEstimate());
- QueueImpl queue = (QueueImpl) ref.getQueue();
- ServerMessage msg = ref.getMessage();
- boolean durableRef = msg.isDurable() && queue.isDurable();
-
- if (durableRef)
- {
- int count = msg.decrementDurableRefCount();
-
- if (count == 0)
- {
- if (tx == null)
- {
- storageManager.deleteMessage(msg.getMessageID());
- }
- else
- {
- storageManager.deleteMessageTransactional(tx.getID(), getPersistenceID(), msg.getMessageID());
- }
- }
- }
- }
-
- final void discardMessage(Long id, Transaction tx) throws Exception
- {
- RefsOperation oper = getRefsOperation(tx);
- Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
-
- while (iterator.hasNext())
- {
- MessageReference ref = iterator.next();
-
- if (ref.getMessage().getMessageID() == id)
- {
- iterator.remove();
- discardMessage(ref, tx);
- break;
- }
- }
-
- }
-
-
- final void rediscardMessage(long id, Transaction tx) throws Exception
- {
- RefsOperation oper = getRefsOperation(tx);
- Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
-
- while (iterator.hasNext())
- {
- MessageReference ref = iterator.next();
-
- if (ref.getMessage().getMessageID() == id)
- {
- iterator.remove();
- rediscardMessage(ref);
- break;
- }
- }
- }
-
- final void rediscardMessage(MessageReference ref) throws Exception
- {
- deliveringCount.decrementAndGet();
- PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
- store.addSize(-ref.getMemoryEstimate());
- }
// Inner classes
// --------------------------------------------------------------------------
@@ -1399,7 +1327,7 @@
}
}
- private class RefsOperation implements TransactionOperation
+ final class RefsOperation implements TransactionOperation
{
List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java 2009-02-04 11:28:46 UTC (rev 5805)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java 2009-02-04 13:41:04 UTC (rev 5806)
@@ -30,6 +30,8 @@
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.util.SimpleString;
import java.util.ArrayList;
@@ -37,6 +39,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
/**
@@ -47,6 +50,11 @@
{
private final Map<SimpleString, ServerMessage> map = new HashMap<SimpleString, ServerMessage>();
+ private final PagingManager pagingManager;
+
+ private final StorageManager storageManager;
+
+
public SoloQueueImpl(final long persistenceID,
final SimpleString name,
final Filter filter,
@@ -58,6 +66,8 @@
final HierarchicalRepository<AddressSettings> queueSettingsRepository)
{
super(persistenceID, name, filter, durable, temporary, scheduledExecutor, postOffice, storageManager, queueSettingsRepository);
+ this.pagingManager = postOffice.getPagingManager();
+ this.storageManager = storageManager;
}
public void route(final ServerMessage message, final Transaction tx) throws Exception
@@ -81,8 +91,7 @@
ref = removeReferenceWithID(msg.getMessageID());
if (ref != null)
{
- QueueImpl queue = (QueueImpl) ref.getQueue();
- queue.discardMessage(ref, tx);
+ discardMessage(ref, tx);
}
}
@@ -199,5 +208,75 @@
super.postRollback(refs);
}
+ final void discardMessage(MessageReference ref, Transaction tx) throws Exception
+ {
+ deliveringCount.decrementAndGet();
+ PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
+ store.addSize(-ref.getMemoryEstimate());
+ QueueImpl queue = (QueueImpl) ref.getQueue();
+ ServerMessage msg = ref.getMessage();
+ boolean durableRef = msg.isDurable() && queue.isDurable();
+ if (durableRef)
+ {
+ int count = msg.decrementDurableRefCount();
+
+ if (count == 0)
+ {
+ if (tx == null)
+ {
+ storageManager.deleteMessage(msg.getMessageID());
+ }
+ else
+ {
+ storageManager.deleteMessageTransactional(tx.getID(), getPersistenceID(), msg.getMessageID());
+ }
+ }
+ }
+ }
+
+ final void discardMessage(Long id, Transaction tx) throws Exception
+ {
+ RefsOperation oper = getRefsOperation(tx);
+ Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
+
+ while (iterator.hasNext())
+ {
+ MessageReference ref = iterator.next();
+
+ if (ref.getMessage().getMessageID() == id)
+ {
+ iterator.remove();
+ discardMessage(ref, tx);
+ break;
+ }
+ }
+
+ }
+
+
+ final void rediscardMessage(long id, Transaction tx) throws Exception
+ {
+ RefsOperation oper = getRefsOperation(tx);
+ Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
+
+ while (iterator.hasNext())
+ {
+ MessageReference ref = iterator.next();
+
+ if (ref.getMessage().getMessageID() == id)
+ {
+ iterator.remove();
+ rediscardMessage(ref);
+ break;
+ }
+ }
+ }
+
+ final void rediscardMessage(MessageReference ref) throws Exception
+ {
+ deliveringCount.decrementAndGet();
+ PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
+ store.addSize(-ref.getMemoryEstimate());
+ }
}
More information about the jboss-cvs-commits
mailing list