[jboss-cvs] JBoss Messaging SVN: r4755 - in branches/Branch_JBMESSAGING-1303: src/main/org/jboss/messaging/core/message/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 31 05:30:40 EDT 2008
Author: jmesnil
Date: 2008-07-31 05:30:40 -0400 (Thu, 31 Jul 2008)
New Revision: 4755
Modified:
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
Log:
JBMESSAGING-1303: Revisit management interfaces
* added method Queue.expireMessage()
* added property HDR_ORIGIN_QUEUE in MessageReferenceImpl.makeCopyForDLQOrExpiry() to store the origin queue before the message is moved to the DLQ or expiry queue
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-07-31 09:30:40 UTC (rev 4755)
@@ -219,17 +219,7 @@
public boolean expireMessage(long messageID) throws Exception
{
- List<MessageReference> refs = queue.list(null);
- for (MessageReference ref : refs)
- {
- if (ref.getMessage().getMessageID() == messageID)
- {
- ref.expire(storageManager, postOffice, queueSettingsRepository);
- queue.removeReferenceWithID(messageID);
- return true;
- }
- }
- return false;
+ return queue.expireMessage(messageID, storageManager, postOffice, queueSettingsRepository);
}
public int expireMessages(String filterStr) throws Exception
@@ -244,8 +234,7 @@
List<MessageReference> refs = queue.list(filter);
for (MessageReference ref : refs)
{
- ref.expire(storageManager, postOffice, queueSettingsRepository);
- queue.removeReferenceWithID(ref.getMessage().getMessageID());
+ queue.expireMessage(ref.getMessage().getMessageID(), storageManager, postOffice, queueSettingsRepository);
}
return refs.size();
} catch (MessagingException e)
@@ -285,8 +274,7 @@
if (message.getMessageID() == messageID)
{
ref.sendToDLQ(storageManager, postOffice, queueSettingsRepository);
- queue.removeReferenceWithID(messageID);
- return true;
+ return queue.deleteReference(messageID, storageManager);
}
}
return false;
@@ -309,7 +297,7 @@
message.setPriority((byte) newPriority);
// delete and add the reference so that it
// goes to the right queues for the new priority
- queue.removeReferenceWithID(messageID);
+ queue.deleteReference(messageID, storageManager);
queue.addLast(ref);
return true;
}
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-07-31 09:30:40 UTC (rev 4755)
@@ -58,6 +58,8 @@
public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("JBMActualExpiryTime");
+ public static final SimpleString HDR_ORIGIN_QUEUE = new SimpleString("JBMOriginQueue");
+
// Attributes ----------------------------------------------------
private SimpleString destination;
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java 2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java 2008-07-31 09:30:40 UTC (rev 4755)
@@ -29,6 +29,9 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.FlowController;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
@@ -118,6 +121,8 @@
boolean deleteReference(long messageID, StorageManager storageManager)
throws Exception;
+ boolean expireMessage(long messageID, StorageManager storageManager, PostOffice postOffice, HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
+
void lock();
void unlock();
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-07-31 09:30:40 UTC (rev 4755)
@@ -288,9 +288,11 @@
copy.setMessageID(newMessageId);
+ SimpleString originalQueue = copy.getDestination();
+ copy.putStringProperty(MessageImpl.HDR_ORIGIN_QUEUE, originalQueue);
+
// reset expiry
copy.setExpiration(0);
-
if (expiry)
{
long actualExpiryTime = System.currentTimeMillis();
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-07-31 09:30:40 UTC (rev 4755)
@@ -44,11 +44,14 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.FlowController;
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.DistributionPolicy;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.SimpleString;
@@ -403,7 +406,7 @@
public void referenceAcknowledged(MessageReference ref) throws Exception
{
deliveringCount.decrementAndGet();
-
+
sizeBytes.addAndGet(-ref.getMessage().getEncodeSize());
// if (flowController != null)
@@ -498,7 +501,7 @@
{
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID)
- {
+ {
deliveringCount.incrementAndGet();
tx.addAcknowledgement(ref);
iter.remove();
@@ -521,7 +524,25 @@
{
lock.unlock();
}
+
+ public boolean expireMessage(long messageID, StorageManager storageManager, PostOffice postOffice, HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
+ {
+ Iterator<MessageReference> iter = messageReferences.iterator();
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
+ {
+ deliveringCount.incrementAndGet();
+ ref.expire(storageManager, postOffice, queueSettingsRepository);
+ iter.remove();
+ return true;
+ }
+ }
+ return false;
+ }
+
// Public
// -----------------------------------------------------------------------------
Modified: branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-07-31 09:30:40 UTC (rev 4755)
@@ -540,51 +540,37 @@
{
long messageID = randomLong();
- List<MessageReference> refs = new ArrayList<MessageReference>();
- MessageReference ref = createMock(MessageReference.class);
- ServerMessage message = createMock(ServerMessage.class);
- expect(message.getMessageID()).andStubReturn(messageID);
- expect(ref.getMessage()).andReturn(message);
- refs.add(ref);
Queue queue = createMock(Queue.class);
StorageManager storageManager = createMock(StorageManager.class);
- expect(queue.list(null)).andReturn(refs);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
- ref.expire(storageManager, postOffice, repository);
- expect(queue.removeReferenceWithID(messageID)).andReturn(true);
+ expect(queue.expireMessage(messageID, storageManager, postOffice, repository)).andReturn(true);
- replay(queue, storageManager, postOffice, repository, ref, message);
+ replay(queue, storageManager, postOffice, repository);
QueueControlMBean control = new QueueControl(queue, storageManager,
postOffice, repository);
assertTrue(control.expireMessage(messageID));
- verify(queue, storageManager, postOffice, repository, ref, message);
+ verify(queue, storageManager, postOffice, repository);
}
public void testExpireMessageWithNoMatch() throws Exception
{
long messageID = randomLong();
- List<MessageReference> refs = new ArrayList<MessageReference>();
- MessageReference ref = createMock(MessageReference.class);
- ServerMessage message = createMock(ServerMessage.class);
- expect(message.getMessageID()).andStubReturn(messageID + 1);
- expect(ref.getMessage()).andReturn(message);
- refs.add(ref);
Queue queue = createMock(Queue.class);
StorageManager storageManager = createMock(StorageManager.class);
- expect(queue.list(null)).andReturn(refs);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
- replay(queue, storageManager, postOffice, repository, ref, message);
+ expect(queue.expireMessage(messageID, storageManager, postOffice, repository)).andReturn(false);
+ replay(queue, storageManager, postOffice, repository);
QueueControlMBean control = new QueueControl(queue, storageManager,
postOffice, repository);
assertFalse(control.expireMessage(messageID));
- verify(queue, storageManager, postOffice, repository, ref, message);
+ verify(queue, storageManager, postOffice, repository);
}
public void testExpireMessagesWithFilter() throws Exception
@@ -608,10 +594,8 @@
expect(queue.list(isA(Filter.class))).andReturn(refs);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
- ref_1.expire(storageManager, postOffice, repository);
- ref_2.expire(storageManager, postOffice, repository);
- expect(queue.removeReferenceWithID(messageID_1)).andReturn(true);
- expect(queue.removeReferenceWithID(messageID_2)).andReturn(true);
+ expect(queue.expireMessage(messageID_1, storageManager, postOffice, repository)).andReturn(true);
+ expect(queue.expireMessage(messageID_2, storageManager, postOffice, repository)).andReturn(true);
replay(queue, storageManager, postOffice, repository, ref_1, ref_2,
message_1, message_2);
@@ -718,7 +702,7 @@
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
Queue queue = createMock(Queue.class);
expect(queue.list(null)).andReturn(refs);
- expect(queue.removeReferenceWithID(messageID)).andReturn(true);
+ expect(queue.deleteReference(messageID, storageManager)).andReturn(true);
expect(queue.addLast(ref)).andReturn(HandleStatus.HANDLED);
replay(queue, storageManager, postOffice, repository, ref, message);
@@ -798,7 +782,7 @@
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
ref.sendToDLQ(storageManager, postOffice, repository);
- expect(queue.removeReferenceWithID(messageID)).andReturn(true);
+ expect(queue.deleteReference(messageID, storageManager)).andReturn(true);
replay(queue, storageManager, postOffice, repository, ref, message);
Modified: branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-07-31 07:37:20 UTC (rev 4754)
+++ branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-07-31 09:30:40 UTC (rev 4755)
@@ -124,6 +124,7 @@
SimpleString dlqName = new SimpleString("testDLQ");
queueSettings.setDLQ(dlqName);
Binding dlqBinding = EasyMock.createStrictMock(Binding.class);
+ EasyMock.expect(dlqBinding.getAddress()).andReturn(dlqName);
StorageManager sm = EasyMock.createNiceMock(StorageManager.class);
PostOffice po = EasyMock.createStrictMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repos = EasyMock.createStrictMock(HierarchicalRepository.class);
@@ -143,14 +144,17 @@
EasyMock.expect(serverMessage.copy()).andReturn(serverMessage);
EasyMock.expect(sm.generateMessageID()).andReturn(2l);
serverMessage.setMessageID(2);
+ EasyMock.expect(serverMessage.getDestination()).andReturn(queueName);
+ serverMessage.putStringProperty(MessageImpl.HDR_ORIGIN_QUEUE, queueName);
serverMessage.setExpiration(0);
+ serverMessage.setDestination(dlqName);
EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
EasyMock.expect(sm.generateTransactionID()).andReturn(1l);
- EasyMock.replay(sm, po, repos, serverMessage, queue);
+ EasyMock.replay(sm, po, repos, serverMessage, queue, dlqBinding);
assertFalse(messageReference.cancel(sm, po, repos));
- EasyMock.verify(sm, po, repos, serverMessage, queue);
+ EasyMock.verify(sm, po, repos, serverMessage, queue, dlqBinding);
}
public void testCancelToDLQDoesntExist() throws Exception
@@ -160,6 +164,7 @@
SimpleString dlqName = new SimpleString("testDLQ");
queueSettings.setDLQ(dlqName);
Binding dlqBinding = EasyMock.createStrictMock(Binding.class);
+ EasyMock.expect(dlqBinding.getAddress()).andReturn(dlqName);
StorageManager sm = EasyMock.createNiceMock(StorageManager.class);
PostOffice po = EasyMock.createStrictMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repos = EasyMock.createStrictMock(HierarchicalRepository.class);
@@ -180,14 +185,20 @@
EasyMock.expect(serverMessage.copy()).andReturn(serverMessage);
EasyMock.expect(sm.generateMessageID()).andReturn(2l);
serverMessage.setMessageID(2);
+ EasyMock.expect(serverMessage.getDestination()).andReturn(queueName);
+ serverMessage.putStringProperty(MessageImpl.HDR_ORIGIN_QUEUE, queueName);
serverMessage.setExpiration(0);
+ serverMessage.setDestination(dlqName);
EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
EasyMock.expect(sm.generateTransactionID()).andReturn(1l);
- EasyMock.replay(sm, po, repos, serverMessage, queue);
+
+ EasyMock.replay(sm, po, repos, serverMessage, queue, dlqBinding);
+
assertFalse(messageReference.cancel(sm, po, repos));
- EasyMock.verify(sm, po, repos, serverMessage, queue);
+
+ EasyMock.verify(sm, po, repos, serverMessage, queue, dlqBinding);
}
public void testExpire() throws Exception
@@ -244,6 +255,8 @@
EasyMock.expect(po.getBinding(expQName)).andReturn(expQBinding);
EasyMock.expect(serverMessage.copy()).andReturn(serverMessage);
serverMessage.setMessageID(2);
+ EasyMock.expect(serverMessage.getDestination()).andReturn(queueName);
+ serverMessage.putStringProperty(MessageImpl.HDR_ORIGIN_QUEUE, queueName);
serverMessage.setExpiration(0);
serverMessage.putLongProperty(EasyMock.eq(MessageImpl.HDR_ACTUAL_EXPIRY_TIME), EasyMock.anyLong());
EasyMock.expect(expQBinding.getAddress()).andStubReturn(expQName);
@@ -251,8 +264,11 @@
EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
+
EasyMock.replay(sm, po, repos, serverMessage, queue, expQBinding);
+
messageReference.expire(sm, po, repos);
+
EasyMock.verify(sm, po, repos, serverMessage, queue, expQBinding);
}
@@ -283,6 +299,8 @@
EasyMock.expect(po.addBinding(expQName, expQName, null, true, false)).andReturn(expQBinding);
EasyMock.expect(serverMessage.copy()).andReturn(serverMessage);
serverMessage.setMessageID(2);
+ EasyMock.expect(serverMessage.getDestination()).andReturn(queueName);
+ serverMessage.putStringProperty(MessageImpl.HDR_ORIGIN_QUEUE, queueName);
serverMessage.setExpiration(0);
serverMessage.putLongProperty(EasyMock.eq(MessageImpl.HDR_ACTUAL_EXPIRY_TIME), EasyMock.anyLong());
EasyMock.expect(expQBinding.getAddress()).andStubReturn(expQName);
@@ -290,8 +308,11 @@
EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
+
EasyMock.replay(sm, po, repos, serverMessage, queue, expQBinding);
+
messageReference.expire(sm, po, repos);
+
EasyMock.verify(sm, po, repos, serverMessage, queue, expQBinding);
}
More information about the jboss-cvs-commits
mailing list