[jboss-cvs] JBoss Messaging SVN: r4760 - in branches/Branch_JBMESSAGING-1303: src/main/org/jboss/messaging/core/server and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 31 10:37:29 EDT 2008
Author: jmesnil
Date: 2008-07-31 10:37:28 -0400 (Thu, 31 Jul 2008)
New Revision: 4760
Modified:
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
JBMESSAGING-1303: Revisit management interfaces
* added method Queue.moveMessage()
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-07-31 13:10:12 UTC (rev 4759)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-07-31 14:37:28 UTC (rev 4760)
@@ -253,16 +253,7 @@
+ otherQueueName);
}
- List<MessageReference> refs = queue.list(null);
- for (MessageReference ref : refs)
- {
- if (ref.getMessage().getMessageID() == messageID)
- {
- ref.move(binding, storageManager, postOffice);
- return true;
- }
- }
- return false;
+ return queue.moveMessage(messageID, binding, storageManager, postOffice);
}
public boolean sendMessageToDLQ(long messageID) throws Exception
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java 2008-07-31 13:10:12 UTC (rev 4759)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java 2008-07-31 14:37:28 UTC (rev 4760)
@@ -28,6 +28,7 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -131,6 +132,10 @@
HierarchicalRepository<QueueSettings> queueSettingsRepository)
throws Exception;
+ boolean moveMessage(long messageID,
+ Binding toBinding, StorageManager storageManager,
+ PostOffice postOffice) throws Exception;
+
void lock();
void unlock();
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-07-31 13:10:12 UTC (rev 4759)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-07-31 14:37:28 UTC (rev 4760)
@@ -245,14 +245,12 @@
{
Transaction tx = new TransactionImpl(persistenceManager, postOffice);
- MessageReference copyRef = getMessage().createReference(otherBinding.getQueue());
- ServerMessage copyMessage = copyRef.getMessage();
-
+ ServerMessage copyMessage = message.copy();
copyMessage.setDestination(otherBinding.getAddress());
tx.addMessage(copyMessage);
- queue.deleteReference(message.getMessageID(), persistenceManager);
+ tx.addAcknowledgement(this);
tx.commit();
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-07-31 13:10:12 UTC (rev 4759)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-07-31 14:37:28 UTC (rev 4760)
@@ -43,6 +43,7 @@
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Consumer;
@@ -515,26 +516,18 @@
return deleted;
}
- public void lock()
+ public boolean expireMessage(final long messageID,
+ final StorageManager storageManager, final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository)
+ throws Exception
{
- lock.lock();
- }
-
- public void unlock()
- {
- lock.unlock();
- }
-
- public boolean expireMessage(final long messageID, final StorageManager storageManager,
- final PostOffice postOffice, final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
- {
Iterator<MessageReference> iter = messageReferences.iterator();
while (iter.hasNext())
{
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID)
- {
+ {
deliveringCount.incrementAndGet();
ref.expire(storageManager, postOffice, queueSettingsRepository);
iter.remove();
@@ -543,9 +536,11 @@
}
return false;
}
-
- public boolean sendMessageToDLQ(final long messageID, final StorageManager storageManager,
- final PostOffice postOffice, final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
+
+ public boolean sendMessageToDLQ(final long messageID,
+ final StorageManager storageManager, final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository)
+ throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -553,7 +548,7 @@
{
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID)
- {
+ {
deliveringCount.incrementAndGet();
ref.sendToDLQ(storageManager, postOffice, queueSettingsRepository);
iter.remove();
@@ -563,6 +558,37 @@
return false;
}
+ public boolean moveMessage(final long messageID,
+ final Binding toBinding, final StorageManager storageManager,
+ final PostOffice postOffice) throws Exception
+ {
+ Iterator<MessageReference> iter = messageReferences.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
+ {
+ deliveringCount.incrementAndGet();
+ ref.move(toBinding, storageManager, postOffice);
+ iter.remove();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
+
+
// Public
// -----------------------------------------------------------------------------
Modified: branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-07-31 13:10:12 UTC (rev 4759)
+++ branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-07-31 14:37:28 UTC (rev 4760)
@@ -341,27 +341,27 @@
verify(queue, storageManager, postOffice, repository);
}
-
+
public void testGetDLQ() throws Exception
{
String queueName = randomString();
final String dlqName = randomString();
-
+
Queue queue = createMock(Queue.class);
expect(queue.getName()).andReturn(new SimpleString(queueName));
StorageManager storageManager = createMock(StorageManager.class);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
- QueueSettings queueSettings = new QueueSettings()
+ QueueSettings queueSettings = new QueueSettings()
{
@Override
public SimpleString getDLQ()
{
return new SimpleString(dlqName);
- }
+ }
};
expect(repository.getMatch(queueName)).andReturn(queueSettings);
-
+
replay(queue, storageManager, postOffice, repository);
QueueControlMBean control = new QueueControl(queue, storageManager,
@@ -375,22 +375,22 @@
{
String queueName = randomString();
final String expiryQueueName = randomString();
-
+
Queue queue = createMock(Queue.class);
expect(queue.getName()).andReturn(new SimpleString(queueName));
StorageManager storageManager = createMock(StorageManager.class);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
- QueueSettings queueSettings = new QueueSettings()
+ QueueSettings queueSettings = new QueueSettings()
{
@Override
public SimpleString getExpiryQueue()
{
return new SimpleString(expiryQueueName);
- }
+ }
};
expect(repository.getMatch(queueName)).andReturn(queueSettings);
-
+
replay(queue, storageManager, postOffice, repository);
QueueControlMBean control = new QueueControl(queue, storageManager,
@@ -399,7 +399,7 @@
verify(queue, storageManager, postOffice, repository);
}
-
+
public void testRemoveAllMessages() throws Exception
{
Queue queue = createMock(Queue.class);
@@ -544,7 +544,9 @@
StorageManager storageManager = createMock(StorageManager.class);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
- expect(queue.expireMessage(messageID, storageManager, postOffice, repository)).andReturn(true);
+ expect(
+ queue.expireMessage(messageID, storageManager, postOffice,
+ repository)).andReturn(true);
replay(queue, storageManager, postOffice, repository);
@@ -563,7 +565,9 @@
StorageManager storageManager = createMock(StorageManager.class);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
- expect(queue.expireMessage(messageID, storageManager, postOffice, repository)).andReturn(false);
+ expect(
+ queue.expireMessage(messageID, storageManager, postOffice,
+ repository)).andReturn(false);
replay(queue, storageManager, postOffice, repository);
QueueControlMBean control = new QueueControl(queue, storageManager,
@@ -594,8 +598,12 @@
expect(queue.list(isA(Filter.class))).andReturn(refs);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
- expect(queue.expireMessage(messageID_1, storageManager, postOffice, repository)).andReturn(true);
- expect(queue.expireMessage(messageID_2, storageManager, postOffice, repository)).andReturn(true);
+ expect(
+ queue.expireMessage(messageID_1, storageManager, postOffice,
+ repository)).andReturn(true);
+ expect(
+ queue.expireMessage(messageID_2, storageManager, postOffice,
+ repository)).andReturn(true);
replay(queue, storageManager, postOffice, repository, ref_1, ref_2,
message_1, message_2);
@@ -612,30 +620,23 @@
{
long messageID = randomLong();
SimpleString otherQueueName = randomSimpleString();
- List<MessageReference> refs = new ArrayList<MessageReference>();
- MessageReference ref = createMock(MessageReference.class);
- ServerMessage message = createMock(ServerMessage.class);
- expect(message.getMessageID()).andStubReturn(messageID);
- expect(ref.getMessage()).andReturn(message);
- refs.add(ref);
Queue queue = createMock(Queue.class);
Binding otherBinding = createMock(Binding.class);
StorageManager storageManager = createMock(StorageManager.class);
- expect(queue.list(null)).andReturn(refs);
PostOffice postOffice = createMock(PostOffice.class);
expect(postOffice.getBinding(otherQueueName)).andReturn(otherBinding);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
- ref.move(otherBinding, storageManager, postOffice);
+ expect(
+ queue.moveMessage(messageID, otherBinding, storageManager,
+ postOffice)).andReturn(true);
- replay(queue, storageManager, postOffice, repository, ref, message,
- otherBinding);
+ replay(queue, storageManager, postOffice, repository, otherBinding);
QueueControlMBean control = new QueueControl(queue, storageManager,
postOffice, repository);
assertTrue(control.moveMessage(messageID, otherQueueName.toString()));
- verify(queue, storageManager, postOffice, repository, ref, message,
- otherBinding);
+ verify(queue, storageManager, postOffice, repository, otherBinding);
}
public void testMoveMessageWithNoQueue() throws Exception
@@ -668,15 +669,14 @@
{
long messageID = randomLong();
SimpleString otherQueueName = randomSimpleString();
- List<MessageReference> refs = new ArrayList<MessageReference>();
Queue queue = createMock(Queue.class);
Binding otherBinding = createMock(Binding.class);
StorageManager storageManager = createMock(StorageManager.class);
- expect(queue.list(null)).andReturn(refs);
PostOffice postOffice = createMock(PostOffice.class);
expect(postOffice.getBinding(otherQueueName)).andReturn(otherBinding);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
-
+ expect(queue.moveMessage(messageID, otherBinding, storageManager, postOffice)).andReturn(false);
+
replay(queue, storageManager, postOffice, repository, otherBinding);
QueueControl control = new QueueControl(queue, storageManager,
@@ -774,7 +774,9 @@
StorageManager storageManager = createMock(StorageManager.class);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
- expect(queue.sendMessageToDLQ(messageID, storageManager, postOffice, repository)).andReturn(true);
+ expect(
+ queue.sendMessageToDLQ(messageID, storageManager, postOffice,
+ repository)).andReturn(true);
replay(queue, storageManager, postOffice, repository);
@@ -784,7 +786,7 @@
verify(queue, storageManager, postOffice, repository);
}
-
+
public void testSendMessageToDLQWithNoMessageID() throws Exception
{
long messageID = randomLong();
@@ -793,7 +795,9 @@
StorageManager storageManager = createMock(StorageManager.class);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
- expect(queue.sendMessageToDLQ(messageID, storageManager, postOffice, repository)).andReturn(false);
+ expect(
+ queue.sendMessageToDLQ(messageID, storageManager, postOffice,
+ repository)).andReturn(false);
replay(queue, storageManager, postOffice, repository);
@@ -803,7 +807,7 @@
verify(queue, storageManager, postOffice, repository);
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-07-31 13:10:12 UTC (rev 4759)
+++ branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-07-31 14:37:28 UTC (rev 4760)
@@ -318,34 +318,34 @@
public void testMove() throws Exception
{
- SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString toAddress = RandomUtil.randomSimpleString();
+ long tid = RandomUtil.randomLong();
long messageID = RandomUtil.randomLong();
+
Queue queue = EasyMock.createStrictMock(Queue.class);
- Binding otherBinding = EasyMock.createStrictMock(Binding.class);
- Queue otherQueue = EasyMock.createStrictMock(Queue.class);
- EasyMock.expect(otherBinding.getQueue()).andReturn(otherQueue);
- EasyMock.expect(otherBinding.getAddress()).andReturn(address);
+ Binding toBinding = EasyMock.createStrictMock(Binding.class);
+ Queue toQueue = EasyMock.createStrictMock(Queue.class);
PostOffice postOffice = EasyMock.createMock(PostOffice.class);
StorageManager persistenceManager = EasyMock.createMock(StorageManager.class);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
- MessageReference copyReference = EasyMock.createStrictMock(MessageReference.class);
+ MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
ServerMessage copyMessage = EasyMock.createStrictMock(ServerMessage.class);
-
- EasyMock.expect(persistenceManager.generateTransactionID()).andReturn(1l);
- EasyMock.expect(serverMessage.createReference(otherQueue)).andReturn(copyReference);
- EasyMock.expect(copyReference.getMessage()).andReturn(copyMessage);
- copyMessage.setDestination(address);
+
+ EasyMock.expect(persistenceManager.generateTransactionID()).andReturn(tid);
+ EasyMock.expect(serverMessage.copy()).andReturn(copyMessage);
+ EasyMock.expect(toBinding.getAddress()).andStubReturn(toAddress);
+ copyMessage.setDestination(toAddress);
EasyMock.expect(postOffice.route(copyMessage)).andReturn(new ArrayList<MessageReference>());
EasyMock.expect(copyMessage.getDurableRefCount()).andReturn(0);
+ EasyMock.expect(serverMessage.isDurable()).andStubReturn(false);
EasyMock.expect(serverMessage.getMessageID()).andReturn(messageID);
- EasyMock.expect(queue.deleteReference(messageID, persistenceManager)).andReturn(true);
+ queue.referenceAcknowledged(messageReference);
- EasyMock.replay(queue, otherBinding, otherQueue, postOffice, persistenceManager, serverMessage, copyReference, copyMessage);
+ EasyMock.replay(queue, toBinding, toQueue, postOffice, persistenceManager, serverMessage, copyMessage);
- MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
- messageReference.move(otherBinding, persistenceManager, postOffice);
+ messageReference.move(toBinding, persistenceManager, postOffice);
- EasyMock.verify(queue, otherBinding, otherQueue, postOffice, persistenceManager, serverMessage, copyReference, copyMessage);
+ EasyMock.verify(queue, toBinding, toQueue, postOffice, persistenceManager, serverMessage, copyMessage);
}
//we need to override the constructor for creation
Modified: branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-07-31 13:10:12 UTC (rev 4759)
+++ branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-07-31 14:37:28 UTC (rev 4760)
@@ -1510,6 +1510,47 @@
EasyMock.verify(storageManager, postOffice, queueSettingsRepository, dlqBinding);
}
+ public void testMoveMessage() throws Exception
+ {
+ long messageID = randomLong();
+ long tid = randomLong();
+ final SimpleString toQueueName = new SimpleString("toQueueName");
+ Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
+ Queue toQueue = createMock(Queue.class);
+
+ MessageReference messageReference = generateReference(queue, messageID);
+ StorageManager storageManager = EasyMock.createMock(StorageManager.class);
+ EasyMock.expect(storageManager.generateTransactionID()).andReturn(tid);
+ storageManager.storeDeleteTransactional(EasyMock.anyLong(), EasyMock.eq(messageID));
+ storageManager.commit(EasyMock.anyLong());
+ PostOffice postOffice = EasyMock.createMock(PostOffice.class);
+ Binding toBinding = EasyMock.createMock(Binding.class);
+ EasyMock.expect(toBinding.getAddress()).andStubReturn(toQueueName);
+ EasyMock.expect(toBinding.getQueue()).andStubReturn(toQueue);
+ EasyMock.expect(postOffice.route(EasyMock.isA(ServerMessage.class))).andReturn(new ArrayList<MessageReference>());
+ HierarchicalRepository<QueueSettings> queueSettingsRepository = EasyMock.createMock(HierarchicalRepository.class);
+
+ EasyMock.replay(storageManager, postOffice, queueSettingsRepository, toBinding);
+
+ assertEquals(0, queue.getMessageCount());
+ assertEquals(0, queue.getDeliveringCount());
+ assertEquals(0, queue.getSizeBytes());
+
+ queue.addLast(messageReference);
+
+ assertEquals(1, queue.getMessageCount());
+ assertEquals(0, queue.getDeliveringCount());
+ assertTrue(queue.getSizeBytes() > 0);
+
+ queue.moveMessage(messageID, toBinding, storageManager, postOffice);
+
+ assertEquals(0, queue.getMessageCount());
+ assertEquals(0, queue.getDeliveringCount());
+ assertEquals(0, queue.getSizeBytes());
+
+ EasyMock.verify(storageManager, postOffice, queueSettingsRepository, toBinding);
+ }
+
// Inner classes ---------------------------------------------------------------
class AddtoQueueRunner implements Runnable
More information about the jboss-cvs-commits
mailing list