[jboss-cvs] JBoss Messaging SVN: r5094 - in trunk: examples/messaging/src/org/jboss/messaging/example and 12 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 9 05:25:14 EDT 2008
Author: ataylor
Date: 2008-10-09 05:25:14 -0400 (Thu, 09 Oct 2008)
New Revision: 5094
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/
trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
Modified:
trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java
trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
changed scheduled messge to store against the message reference
Modified: trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -61,7 +61,7 @@
Message message = session.createTextMessage("This is a text message!");
Calendar cal = Calendar.getInstance();
log.info("current time " + df.format(cal.getTime()));
- cal.roll(Calendar.SECOND, 5);
+ cal.add(Calendar.SECOND, 5);
log.info("message scheduled for " + df.format(cal.getTime()));
message.setLongProperty("JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME", cal.getTimeInMillis());
log.info("sending message to queue");
Modified: trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -59,7 +59,7 @@
message.getBody().putString("Hello!");
Calendar cal = Calendar.getInstance();
log.info("current time " + df.format(cal.getTime()));
- cal.roll(Calendar.SECOND, 5);
+ cal.add(Calendar.SECOND, 5);
log.info("message scheduled for " + df.format(cal.getTime()));
clientProducer.send(message, cal.getTimeInMillis());
ClientConsumer clientConsumer = clientSession.createConsumer(queue);
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -254,6 +254,10 @@
for (MessageReference ref : refs)
{
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+ if(ref.getQueue().isDurable())
+ {
+ storageManager.storeMessageReferenceScheduledTransactional(depageTransactionID, ref.getQueue().getPersistenceID(), msg.getMessage().getMessageID(), scheduledDeliveryTime);
+ }
}
scheduledRefsToAdd.addAll(refs);
}
@@ -261,11 +265,6 @@
if (msg.getMessage().getDurableRefCount() != 0)
{
storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
- //write the scheduled message record if needed
- if(scheduledDeliveryTime != null)
- {
- storageManager.storeMessageScheduledTransactional(depageTransactionID, msg.getMessage(), scheduledDeliveryTime);
- }
}
}
@@ -293,7 +292,7 @@
for (MessageReference ref : scheduledRefsToAdd)
{
- ref.getQueue().addScheduledDelivery(ref);
+ ref.getQueue().addLast(ref);
}
if (globalMode.get())
{
Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -58,13 +58,13 @@
void storeDelete(long messageID) throws Exception;
- void storeMessageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception;
+ void storeMessageReferenceScheduled(final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception;
void storeMessageTransactional(long txID, ServerMessage message) throws Exception;
void storeAcknowledgeTransactional(long txID, long queueID, long messageiD) throws Exception;
- void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception;
+ void storeMessageReferenceScheduledTransactional(final long txID, final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception;
void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -63,7 +63,6 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -222,9 +221,10 @@
messageJournal.appendDeleteRecord(messageID);
}
- public void storeMessageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+ public void storeMessageReferenceScheduled(final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception
{
- messageJournal.appendUpdateRecord(message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME, new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+ ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(scheduledDeliveryTime, queueID);
+ messageJournal.appendUpdateRecord(messageID, SET_SCHEDULED_DELIVERY_TIME, encoding);
}
// Transactional operations
@@ -275,9 +275,10 @@
messageJournal.appendDeleteRecordTransactional(txID, recordID, null);
}
- public void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+ public void storeMessageReferenceScheduledTransactional(final long txID, final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception
{
- messageJournal.appendUpdateRecordTransactional(txID, message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME, new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+ ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(scheduledDeliveryTime, queueID);
+ messageJournal.appendUpdateRecordTransactional(txID, messageID, SET_SCHEDULED_DELIVERY_TIME, encoding);
}
public void storeDeleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -319,7 +320,6 @@
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
messageJournal.load(records, preparedTransactions);
- Map<Long, List<MessageReference>> routedRefs = new HashMap<Long, List<MessageReference>>();
for (RecordInfo record : records)
{
byte[] data = record.data;
@@ -345,7 +345,6 @@
ref.getQueue().addLast(ref);
}
- routedRefs.put(record.id, refs);
break;
}
case ACKNOWLEDGE_REF:
@@ -428,17 +427,25 @@
}
case SET_SCHEDULED_DELIVERY_TIME:
{
- ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
- scheduledDeliveryEncoding.decode(buff);
- List<MessageReference> refs = routedRefs.get(record.id);
- //for any references that have already been routed, we need to remove them from t he queue and re add them as scheduled
- for (MessageReference ref : refs)
- {
- ref.getQueue().removeReferenceWithID(ref.getMessage().getMessageID());
- ref.setScheduledDeliveryTime(scheduledDeliveryEncoding.getScheduledDeliveryTime());
- ref.getQueue().addScheduledDelivery(ref);
- }
+ long messageID = record.id;
+ ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding();
+
+ encoding.decode(buff);
+
+ Queue queue = queues.get(encoding.queueID);
+
+ if (queue == null)
+ {
+ throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+ }
+ //remove the reference and then add it back in with the scheduled time set.
+ MessageReference removed = queue.removeReferenceWithID(messageID);
+
+ removed.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
+
+ queue.addLast(removed);
+
break;
}
default:
@@ -659,7 +666,6 @@
List<MessageReference> messagesToAck = new ArrayList<MessageReference>();
- Map<Long, List<MessageReference>> routedRefs = new HashMap<Long, List<MessageReference>>();
PageTransactionInfoImpl pageTransactionInfo = null;
@@ -686,8 +692,6 @@
messages.addAll(refs);
- routedRefs.put(record.id, refs);
-
break;
}
case ACKNOWLEDGE_REF:
@@ -728,15 +732,29 @@
}
case SET_SCHEDULED_DELIVERY_TIME:
{
- ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
- scheduledDeliveryEncoding.decode(buff);
- List<MessageReference> refs = routedRefs.get(record.id);
- //for any references that have already been routed, we need to remove them from the queue and re add them as scheduled
- for (MessageReference ref : refs)
+ long messageID = record.id;
+
+ ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding();
+
+ encoding.decode(buff);
+
+ Queue queue = queues.get(encoding.queueID);
+
+ if (queue == null)
{
- ref.setScheduledDeliveryTime(scheduledDeliveryEncoding.getScheduledDeliveryTime());
- scheduledMessages.add(ref);
+ throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
}
+
+ for (MessageReference ref : messages)
+ {
+ if(ref.getQueue().getPersistenceID() == encoding.queueID &&
+ ref.getMessage().getMessageID() == messageID)
+ {
+ ref.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
+ scheduledMessages.add(ref);
+ }
+ }
+
break;
}
default:
@@ -1009,40 +1027,36 @@
super(queueID);
}
}
- private static class ScheduledDeliveryEncoding implements EncodingSupport
+ private static class ScheduledDeliveryEncoding extends QueueEncoding
{
- long messageId;
long scheduledDeliveryTime;
- private ScheduledDeliveryEncoding(long messageId, long scheduledDeliveryTime)
+ private ScheduledDeliveryEncoding(long scheduledDeliveryTime, long queueID)
{
- this.messageId = messageId;
+ super(queueID);
this.scheduledDeliveryTime = scheduledDeliveryTime;
}
- public ScheduledDeliveryEncoding(long messageId)
+ public ScheduledDeliveryEncoding()
{
- this.messageId = messageId;
+
}
public int getEncodeSize()
{
- return 8;
+ return super.getEncodeSize() + 8;
}
public void encode(MessagingBuffer buffer)
{
+ super.encode(buffer);
buffer.putLong(scheduledDeliveryTime);
}
public void decode(MessagingBuffer buffer)
{
+ super.decode(buffer);
scheduledDeliveryTime = buffer.getLong();
}
-
- public long getScheduledDeliveryTime()
- {
- return scheduledDeliveryTime;
- }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -104,14 +104,15 @@
{
}
- public void storeAcknowledgeTransactional(long txID, long queueID,
+ public void storeMessageReferenceScheduled(long queueID, long messageID, long scheduledDeliveryTime) throws Exception
+ {
+ }
+
+ public void storeAcknowledgeTransactional(long txID, long queueID,
long messageiD) throws Exception
{
}
- public void storeMessageScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception
- {
- }
public void storeDelete(long messageID) throws Exception
{
@@ -130,7 +131,7 @@
{
}
- public void storeMessageScheduledTransactional(long txID, ServerMessage message, long scheduledDeliveryTime) throws Exception
+ public void storeMessageReferenceScheduledTransactional(long txID, long queueID, long messageID, long scheduledDeliveryTime) throws Exception
{
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -147,5 +147,4 @@
MessageReference removeFirst();
boolean consumerFailedOver();
- void addScheduledDelivery(MessageReference ref);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -148,6 +148,7 @@
if(redeliveryDelay > 0)
{
scheduledDeliveryTime = System.currentTimeMillis() + redeliveryDelay;
+ persistenceManager.storeMessageReferenceScheduled(queue.getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
}
queue.referenceCancelled();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -185,14 +185,10 @@
MessageReference ref = iter.previous();
ServerMessage msg = ref.getMessage();
- if(ref.getScheduledDeliveryTime() <= 0)
+ if(!checkAndSchedule(ref))
{
messageReferences.addFirst(ref, msg.getPriority());
}
- else
- {
- addScheduledDelivery(ref);
- }
checkWaiting(msg.getMessageID());
}
@@ -696,13 +692,6 @@
return ref;
}
- public void addScheduledDelivery(MessageReference ref)
- {
- ScheduledDeliveryRunnable runner = new ScheduledDeliveryRunnable(ref);
- scheduledRunnables.add(runner);
- scheduleDelivery(runner, ref.getScheduledDeliveryTime());
- }
-
// Public
// -----------------------------------------------------------------------------
@@ -735,6 +724,11 @@
sizeBytes.addAndGet(ref.getMessage().getEncodeSize());
}
+ if (checkAndSchedule(ref))
+ {
+ return HandleStatus.HANDLED;
+ }
+
boolean add = false;
if (direct && !backup)
@@ -792,6 +786,31 @@
return HandleStatus.HANDLED;
}
+ private boolean checkAndSchedule(final MessageReference ref)
+ {
+ long deliveryTime = ref.getScheduledDeliveryTime();
+
+ if (deliveryTime != 0 && scheduledExecutor != null)
+ {
+ if (trace)
+ {
+ log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
+ }
+
+ ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+
+ scheduledRunnables.add(runnable);
+
+ if (!backup)
+ {
+ scheduleDelivery(runnable, deliveryTime);
+ }
+
+ return true;
+ }
+ return false;
+ }
+
private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
{
long now = System.currentTimeMillis();
@@ -809,7 +828,6 @@
if (status == HandleStatus.HANDLED)
{
deliveringCount.incrementAndGet();
-
return HandleStatus.HANDLED;
}
else if (status == HandleStatus.NO_MATCH)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -353,13 +353,16 @@
if (msg.getDurableRefCount() != 0)
{
storageManager.storeMessage(msg);
- storageManager.storeMessageScheduled(msg, scheduledDeliveryTime);
}
for (MessageReference ref : refs)
{
+ if(ref.getQueue().isDurable())
+ {
+ storageManager.storeMessageReferenceScheduled(ref.getQueue().getPersistenceID(), msg.getMessageID(), scheduledDeliveryTime);
+ }
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
- ref.getQueue().addScheduledDelivery(ref);
+ ref.getQueue().addLast(ref);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -173,13 +173,13 @@
{
List<MessageReference> refs = route(message);
- if (message.getDurableRefCount() != 0)
- {
- storageManager.storeMessageScheduledTransactional(id, message, scheduledDeliveryTime);
- }
for (MessageReference ref : refs)
{
scheduledReferences.put(ref, scheduledDeliveryTime);
+ if(ref.getQueue().isDurable())
+ {
+ storageManager.storeMessageReferenceScheduledTransactional(id, ref.getQueue().getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
+ }
}
}
}
@@ -303,7 +303,7 @@
else
{
ref.setScheduledDeliveryTime(scheduled);
- ref.getQueue().addScheduledDelivery(ref);
+ ref.getQueue().addLast(ref);
}
}
@@ -552,13 +552,13 @@
// This could happen when the PageStore left the pageState
List<MessageReference> refs = route(message);
- if (message.getDurableRefCount() != 0)
- {
- storageManager.storeMessageScheduledTransactional(id, message, scheduledDeliveryTime);
- }
for (MessageReference ref : refs)
{
scheduledReferences.put(ref, scheduledDeliveryTime);
+ if(ref.getQueue().isDurable())
+ {
+ storageManager.storeMessageReferenceScheduledTransactional(id, ref.getQueue().getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
+ }
}
}
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -170,7 +170,7 @@
assertEquals(tm9.getText(), rm5.getText());
//Now the scheduled
- TextMessage rm6 = (TextMessage)cons.receive(3250);
+ TextMessage rm6 = (TextMessage)cons.receive(15250);
assertNotNull(rm6);
assertEquals(tm7.getText(), rm6.getText());
@@ -179,7 +179,7 @@
assertTrue(now2 - now >= 3000);
- TextMessage rm7 = (TextMessage)cons.receive(1250);
+ TextMessage rm7 = (TextMessage)cons.receive(16250);
assertNotNull(rm7);
assertEquals(tm6.getText(), rm7.getText());
@@ -188,7 +188,7 @@
assertTrue(now2 - now >= 4000);
- TextMessage rm8 = (TextMessage)cons.receive(1250);
+ TextMessage rm8 = (TextMessage)cons.receive(17250);
assertNotNull(rm8);
assertEquals(tm5.getText(), rm8.getText());
@@ -197,7 +197,7 @@
assertTrue(now2 - now >= 5000);
- TextMessage rm9 = (TextMessage)cons.receive(1250);
+ TextMessage rm9 = (TextMessage)cons.receive(18250);
assertNotNull(rm9);
assertEquals(tm8.getText(), rm9.getText());
@@ -206,7 +206,7 @@
assertTrue(now2 - now >= 6000);
- TextMessage rm10 = (TextMessage)cons.receive(1250);
+ TextMessage rm10 = (TextMessage)cons.receive(19250);
assertNotNull(rm10);
assertEquals(tm1.getText(), rm10.getText());
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java (from rev 5081, trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -0,0 +1,619 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.scheduling;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.util.id.GUID;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.util.Calendar;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledMessageTest extends UnitTestCase
+{
+ private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+
+ private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+
+ private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/journal";
+
+ private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/bindings";
+
+ private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/page";
+
+ private SimpleString atestq = new SimpleString("ascheduledtestq");
+ private SimpleString atestq2 = new SimpleString("ascheduledtestq2");
+
+ private MessagingService messagingService;
+
+ private ConfigurationImpl configuration;
+
+ protected void setUp() throws Exception
+ {
+ File file = new File(journalDir);
+ File file2 = new File(bindingsDir);
+ File file3 = new File(pageDir);
+ deleteDirectory(file);
+ file.mkdirs();
+ deleteDirectory(file2);
+ file2.mkdirs();
+ deleteDirectory(file3);
+ file3.mkdirs();
+ configuration = new ConfigurationImpl();
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setPagingDirectory(pageDir);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (messagingService != null)
+ {
+ try
+ {
+ messagingService.stop();
+ messagingService = null;
+ }
+ catch (Exception e)
+ {
+ //ignore
+ }
+ }
+ new File(journalDir).delete();
+ new File(bindingsDir).delete();
+ new File(pageDir).delete();
+ }
+
+ public void testRecoveredMessageDeliveredCorrectly() throws Exception
+ {
+ testMessageDeliveredCorrectly(true);
+ }
+
+ public void testMessageDeliveredCorrectly() throws Exception
+ {
+ testMessageDeliveredCorrectly(false);
+ }
+
+ public void testScheduledMessagesDeliveredCorrectly() throws Exception
+ {
+ testScheduledMessagesDeliveredCorrectly(false);
+ }
+
+ public void testRecoveredScheduledMessagesDeliveredCorrectly() throws Exception
+ {
+ testScheduledMessagesDeliveredCorrectly(true);
+ }
+
+ public void testScheduledMessagesDeliveredCorrectlyDifferentOrder() throws Exception
+ {
+ testScheduledMessagesDeliveredCorrectlyDifferentOrder(false);
+ }
+
+ public void testRecoveredScheduledMessagesDeliveredCorrectlyDifferentOrder() throws Exception
+ {
+ testScheduledMessagesDeliveredCorrectlyDifferentOrder(true);
+ }
+
+ public void testScheduledAndNormalMessagesDeliveredCorrectly() throws Exception
+ {
+ testScheduledAndNormalMessagesDeliveredCorrectly(false);
+ }
+
+ public void testRecoveredScheduledAndNormalMessagesDeliveredCorrectly() throws Exception
+ {
+ testScheduledAndNormalMessagesDeliveredCorrectly(true);
+ }
+
+ public void testTxMessageDeliveredCorrectly() throws Exception
+ {
+ testTxMessageDeliveredCorrectly(false);
+ }
+
+ public void testRecoveredTxMessageDeliveredCorrectly() throws Exception
+ {
+ testTxMessageDeliveredCorrectly(true);
+ }
+
+ public void testPagedMessageDeliveredCorrectly() throws Exception
+ {
+
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ configuration.setPagingMaxGlobalSizeBytes(0);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ //start the server
+ messagingService.start();
+ //then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSession session = sessionFactory.createSession(false, true, false, false);
+ session.createQueue(atestq, atestq, null, true, true);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientMessage message = createMessage(session, "m1");
+ long time = System.currentTimeMillis();
+ time+=10000;
+ producer.send(message, time);
+
+ producer.close();
+
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ ClientMessage message2 = consumer.receive(10250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m1", message2.getBody().getString());
+
+ message2.processed();
+ session.close();
+ }
+
+ public void testPagedMessageDeliveredMultipleConsumersCorrectly() throws Exception
+ {
+
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ configuration.setPagingMaxGlobalSizeBytes(0);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ //start the server
+ messagingService.start();
+ QueueSettings qs = new QueueSettings();
+ qs.setRedeliveryDelay(5000l);
+ messagingService.getServer().getQueueSettingsRepository().addMatch(atestq2.toString(), qs);
+ //then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSession session = sessionFactory.createSession(false, true, false, false);
+ session.createQueue(atestq, atestq, null, true, true);
+ session.createQueue(atestq, atestq2, null, true, true);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientMessage message = createMessage(session, "m1");
+ producer.send(message);
+
+ producer.close();
+
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+ ClientConsumer consumer2 = session.createConsumer(atestq2);
+
+ session.start();
+ ClientMessage message3 = consumer.receive(1000);
+ ClientMessage message2 = consumer2.receive(1000);
+ assertEquals("m1", message3.getBody().getString());
+ assertEquals("m1", message2.getBody().getString());
+ long time = System.currentTimeMillis();
+ //force redelivery
+ consumer.close();
+ consumer2.close();
+ consumer = session.createConsumer(atestq);
+ consumer2 = session.createConsumer(atestq2);
+ message3 = consumer.receive(1000);
+ message2 = consumer2.receive(5250);
+ time+=5000;
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m1", message3.getBody().getString());
+ assertEquals("m1", message2.getBody().getString());
+ message2.processed();
+ message3.processed();
+ session.close();
+ }
+
+ public void testPagedMessageDeliveredMultipleConsumersAfterRecoverCorrectly() throws Exception
+ {
+
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ configuration.setPagingMaxGlobalSizeBytes(0);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ //start the server
+ messagingService.start();
+ QueueSettings qs = new QueueSettings();
+ qs.setRedeliveryDelay(5000l);
+ messagingService.getServer().getQueueSettingsRepository().addMatch(atestq2.toString(), qs);
+ //then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSession session = sessionFactory.createSession(false, true, false, false);
+ session.createQueue(atestq, atestq, null, true, true);
+ session.createQueue(atestq, atestq2, null, true, true);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientMessage message = createMessage(session, "m1");
+ producer.send(message);
+
+ producer.close();
+
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+ ClientConsumer consumer2 = session.createConsumer(atestq2);
+
+ session.start();
+ ClientMessage message3 = consumer.receive(1000);
+ ClientMessage message2 = consumer2.receive(1000);
+ assertEquals("m1", message3.getBody().getString());
+ assertEquals("m1", message2.getBody().getString());
+ long time = System.currentTimeMillis();
+ //force redelivery
+ consumer.close();
+ consumer2.close();
+ producer.close();
+ session.close();
+ messagingService.stop();
+ messagingService = null;
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService.start();
+ sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ session = sessionFactory.createSession(false, true, true, false);
+ consumer = session.createConsumer(atestq);
+ consumer2 = session.createConsumer(atestq2);
+ session.start();
+ message3 = consumer.receive(1000);
+ message2 = consumer2.receive(5250);
+ time+=5000;
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m1", message3.getBody().getString());
+ assertEquals("m1", message2.getBody().getString());
+ message2.processed();
+ message3.processed();
+ session.close();
+ }
+ public void testMessageDeliveredCorrectly(boolean recover) throws Exception
+ {
+
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ //start the server
+ messagingService.start();
+ //then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSession session = sessionFactory.createSession(false, true, false, false);
+ session.createQueue(atestq, atestq, null, true, true);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("testINVMCoreClient");
+ message.getBody().flip();
+ message.setDurable(true);
+ long time = System.currentTimeMillis();
+ time+=10000;
+ producer.send(message, time);
+
+ if (recover)
+ {
+ producer.close();
+ session.close();
+ messagingService.stop();
+ messagingService = null;
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService.start();
+ sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ session = sessionFactory.createSession(false, true, true, false);
+ }
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ ClientMessage message2 = consumer.receive(11000);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+ message2.processed();
+ session.close();
+ }
+
+ public void testScheduledMessagesDeliveredCorrectly(boolean recover) throws Exception
+ {
+
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ //start the server
+ messagingService.start();
+ //then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSession session = sessionFactory.createSession(false, true, false, false);
+ session.createQueue(atestq, atestq, null, true, true);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientMessage m1 = createMessage(session, "m1");
+ ClientMessage m2 = createMessage(session, "m2");
+ ClientMessage m3 = createMessage(session, "m3");
+ ClientMessage m4 = createMessage(session, "m4");
+ ClientMessage m5 = createMessage(session, "m5");
+ long time = System.currentTimeMillis();
+ time+=10000;
+ producer.send(m1, time);
+ time+=1000;
+ producer.send(m2, time);
+ time+=1000;
+ producer.send(m3, time);
+ time+=1000;
+ producer.send(m4, time);
+ time+=1000;
+ producer.send(m5, time);
+ time-=4000;
+ if (recover)
+ {
+ producer.close();
+ session.close();
+ messagingService.stop();
+ messagingService = null;
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService.start();
+
+ sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+ session = sessionFactory.createSession(false, true, true, false);
+ }
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ ClientMessage message = consumer.receive(11000);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m1", message.getBody().getString());
+ message.processed();
+ time+=1000;
+ message = consumer.receive(1250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m2", message.getBody().getString());
+ message.processed();
+ time+=1000;
+ message = consumer.receive(1250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m3", message.getBody().getString());
+ message.processed();
+ time+=1000;
+ message = consumer.receive(1250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m4", message.getBody().getString());
+ message.processed();
+ time+=1000;
+ message = consumer.receive(1250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m5", message.getBody().getString());
+ message.processed();
+ session.close();
+ }
+
+ public void testScheduledMessagesDeliveredCorrectlyDifferentOrder(boolean recover) throws Exception
+ {
+
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ //start the server
+ messagingService.start();
+ //then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSession session = sessionFactory.createSession(false, true, false, false);
+ session.createQueue(atestq, atestq, null, true, true);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientMessage m1 = createMessage(session, "m1");
+ ClientMessage m2 = createMessage(session, "m2");
+ ClientMessage m3 = createMessage(session, "m3");
+ ClientMessage m4 = createMessage(session, "m4");
+ ClientMessage m5 = createMessage(session, "m5");
+ long time = System.currentTimeMillis();
+ time+=10000;
+ producer.send(m1, time);
+ time+=3000;
+ producer.send(m2, time);
+ time-=2000;
+ producer.send(m3, time);
+ time+=3000;
+ producer.send(m4, time);
+ time-=2000;
+ producer.send(m5, time);
+ time-=2000;
+ ClientConsumer consumer = null;
+ if (recover)
+ {
+ producer.close();
+ session.close();
+ messagingService.stop();
+ messagingService = null;
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService.start();
+
+ sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+ session = sessionFactory.createSession(false, true, true, false);
+
+ }
+ consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ ClientMessage message = consumer.receive(10250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m1", message.getBody().getString());
+ message.processed();
+ time+=1000;
+ message = consumer.receive(1250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m3", message.getBody().getString());
+ message.processed();
+ time+=1000;
+ message = consumer.receive(1250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m5", message.getBody().getString());
+ message.processed();
+ time+=1000;
+ message = consumer.receive(1250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m2", message.getBody().getString());
+ message.processed();
+ time+=1000;
+ message = consumer.receive(1250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m4", message.getBody().getString());
+ message.processed();
+ session.close();
+ }
+
+ public void testScheduledAndNormalMessagesDeliveredCorrectly(boolean recover) throws Exception
+ {
+
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ //start the server
+ messagingService.start();
+ //then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSession session = sessionFactory.createSession(false, true, false, false);
+ session.createQueue(atestq, atestq, null, true, true);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientMessage m1 = createMessage(session, "m1");
+ ClientMessage m2 = createMessage(session, "m2");
+ ClientMessage m3 = createMessage(session, "m3");
+ ClientMessage m4 = createMessage(session, "m4");
+ ClientMessage m5 = createMessage(session, "m5");
+ long time = System.currentTimeMillis();
+ time+=10000;
+ producer.send(m1, time);
+ producer.send(m2);
+ time+=1000;
+ producer.send(m3, time);
+ producer.send(m4);
+ time+=1000;
+ producer.send(m5, time);
+ time-=2000;
+ ClientConsumer consumer = null;
+ if (recover)
+ {
+ producer.close();
+ session.close();
+ messagingService.stop();
+ messagingService = null;
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService.start();
+
+ sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+ session = sessionFactory.createSession(false, true, true, false);
+ }
+
+ consumer = session.createConsumer(atestq);
+ session.start();
+
+ ClientMessage message = consumer.receive(1000);
+ assertEquals("m2", message.getBody().getString());
+ message.processed();
+ message = consumer.receive(1000);
+ assertEquals("m4", message.getBody().getString());
+ message.processed();
+ message = consumer.receive(10250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m1", message.getBody().getString());
+ message.processed();
+ time+=1000;
+ message = consumer.receive(1250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m3", message.getBody().getString());
+ message.processed();
+ time+=1000;
+ message = consumer.receive(1250);
+ assertTrue(System.currentTimeMillis() >= time);
+ assertEquals("m5", message.getBody().getString());
+ message.processed();
+ session.close();
+ }
+
+ public void testTxMessageDeliveredCorrectly(boolean recover) throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ //start the server
+ messagingService.start();
+ //then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSession session = sessionFactory.createSession(true, false, false, false);
+ session.createQueue(atestq, atestq, null, true, false);
+ session.start(xid, XAResource.TMNOFLAGS);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("testINVMCoreClient");
+ message.getBody().flip();
+ message.setDurable(true);
+ Calendar cal = Calendar.getInstance();
+ cal.roll(Calendar.SECOND, 10);
+ producer.send(message, cal.getTimeInMillis());
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ if (recover)
+ {
+ producer.close();
+ session.close();
+ messagingService.stop();
+ messagingService = null;
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService.start();
+
+ sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+ session = sessionFactory.createSession(true, false, false, false);
+ }
+ session.commit(xid, true);
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ ClientMessage message2 = consumer.receive(10000);
+ assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
+ assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+ message2.processed();
+ session.close();
+ }
+
+
+ private ClientMessage createMessage(ClientSession session, String body)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString(body);
+ message.getBody().flip();
+ message.setDurable(true);
+ return message;
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -33,6 +33,7 @@
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
@@ -251,11 +252,15 @@
EasyMock.expect(message.getDurableRefCount()).andReturn(1);
ref.setScheduledDeliveryTime(time);
storageManager.storeLastPage(EasyMock.anyLong(), (LastPageRecord) EasyMock.anyObject());
+ storageManager.storeMessageReferenceScheduledTransactional(EasyMock.anyLong(), EasyMock.anyLong(), EasyMock.anyLong(), EasyMock.eq(time));
storageManager.storeMessageTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject());
- storageManager.storeMessageScheduledTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject(), EasyMock.eq(time));
storageManager.commit(EasyMock.anyLong());
- EasyMock.expect(ref.getQueue()).andReturn(queue);
- queue.addScheduledDelivery(ref);
+ EasyMock.expect(ref.getQueue()).andStubReturn(queue);
+ EasyMock.expect(queue.isDurable()).andReturn(true);
+ EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+ EasyMock.expect(message.getMessageID()).andStubReturn(2);
+ //storageManager.storeMessageReferenceScheduledTransactional(1,1,2,time);
+ EasyMock.expect(queue.addLast(ref)).andReturn(HandleStatus.HANDLED);
EasyMock.replay(spi, store, message, storageManager, po, ref, queue);
SimpleString queueName = new SimpleString("aq");
PageMessageImpl pageMessage = new PageMessageImpl(message);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-10-09 09:25:14 UTC (rev 5094)
@@ -752,7 +752,7 @@
refs.add(ref);
- queue.addScheduledDelivery(ref);
+ queue.addLast(ref);
}
More information about the jboss-cvs-commits
mailing list