[jboss-cvs] JBoss Messaging SVN: r5094 - in trunk: examples/messaging/src/org/jboss/messaging/example and 12 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 9 05:25:14 EDT 2008


Author: ataylor
Date: 2008-10-09 05:25:14 -0400 (Thu, 09 Oct 2008)
New Revision: 5094

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/
   trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
Modified:
   trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java
   trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
changed scheduled messge to store against the message reference

Modified: trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -61,7 +61,7 @@
          Message message = session.createTextMessage("This is a text message!");
          Calendar cal = Calendar.getInstance();
          log.info("current time " + df.format(cal.getTime()));
-         cal.roll(Calendar.SECOND, 5);
+         cal.add(Calendar.SECOND, 5);
          log.info("message scheduled for " + df.format(cal.getTime()));
          message.setLongProperty("JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME", cal.getTimeInMillis());
          log.info("sending message to queue");

Modified: trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -59,7 +59,7 @@
          message.getBody().putString("Hello!");
          Calendar cal = Calendar.getInstance();
          log.info("current time " + df.format(cal.getTime()));
-         cal.roll(Calendar.SECOND, 5);
+         cal.add(Calendar.SECOND, 5);
          log.info("message scheduled for " + df.format(cal.getTime()));
          clientProducer.send(message, cal.getTimeInMillis());
          ClientConsumer clientConsumer = clientSession.createConsumer(queue);

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -254,6 +254,10 @@
             for (MessageReference ref : refs)
             {
                ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+               if(ref.getQueue().isDurable())
+               {
+                  storageManager.storeMessageReferenceScheduledTransactional(depageTransactionID, ref.getQueue().getPersistenceID(), msg.getMessage().getMessageID(), scheduledDeliveryTime);
+               }
             }
             scheduledRefsToAdd.addAll(refs);
          }
@@ -261,11 +265,6 @@
          if (msg.getMessage().getDurableRefCount() != 0)
          {
             storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
-            //write the scheduled message record if needed
-            if(scheduledDeliveryTime != null)
-            {
-               storageManager.storeMessageScheduledTransactional(depageTransactionID, msg.getMessage(), scheduledDeliveryTime);
-            }
          }
       }
 
@@ -293,7 +292,7 @@
 
       for (MessageReference ref : scheduledRefsToAdd)
       {
-         ref.getQueue().addScheduledDelivery(ref);
+         ref.getQueue().addLast(ref);
       }
       if (globalMode.get())
       {

Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -58,13 +58,13 @@
 
    void storeDelete(long messageID) throws Exception;
 
-   void storeMessageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception;
+   void storeMessageReferenceScheduled(final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception;
 
    void storeMessageTransactional(long txID, ServerMessage message) throws Exception;
 
    void storeAcknowledgeTransactional(long txID, long queueID, long messageiD) throws Exception;
 
-   void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception;
+   void storeMessageReferenceScheduledTransactional(final long txID, final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception;
 
    void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -63,7 +63,6 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -222,9 +221,10 @@
       messageJournal.appendDeleteRecord(messageID);
    }
 
-   public void storeMessageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+   public void storeMessageReferenceScheduled(final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception
    {
-      messageJournal.appendUpdateRecord(message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME,  new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+      ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(scheduledDeliveryTime, queueID);
+      messageJournal.appendUpdateRecord(messageID, SET_SCHEDULED_DELIVERY_TIME, encoding);
    }
 
    // Transactional operations
@@ -275,9 +275,10 @@
       messageJournal.appendDeleteRecordTransactional(txID, recordID, null);
    }
 
-   public void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+   public void storeMessageReferenceScheduledTransactional(final long txID, final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception
    {
-      messageJournal.appendUpdateRecordTransactional(txID, message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME,  new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+      ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(scheduledDeliveryTime, queueID);
+      messageJournal.appendUpdateRecordTransactional(txID, messageID, SET_SCHEDULED_DELIVERY_TIME,  encoding);
    }
 
    public void storeDeleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -319,7 +320,6 @@
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
 
       messageJournal.load(records, preparedTransactions);
-      Map<Long, List<MessageReference>> routedRefs = new HashMap<Long, List<MessageReference>>();
       for (RecordInfo record : records)
       {
          byte[] data = record.data;
@@ -345,7 +345,6 @@
                   ref.getQueue().addLast(ref);
                }
 
-               routedRefs.put(record.id, refs);
                break;
             }
             case ACKNOWLEDGE_REF:
@@ -428,17 +427,25 @@
             }
             case SET_SCHEDULED_DELIVERY_TIME:
             {
-               ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
-               scheduledDeliveryEncoding.decode(buff);
-               List<MessageReference> refs = routedRefs.get(record.id);
-               //for any references that have already been routed, we need to remove them from t he queue and re add them as scheduled
-               for (MessageReference ref : refs)
-               {
-                  ref.getQueue().removeReferenceWithID(ref.getMessage().getMessageID());
-                  ref.setScheduledDeliveryTime(scheduledDeliveryEncoding.getScheduledDeliveryTime());
-                  ref.getQueue().addScheduledDelivery(ref);
-               }
+               long messageID = record.id;
 
+               ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding();
+
+               encoding.decode(buff);
+
+               Queue queue = queues.get(encoding.queueID);
+
+                  if (queue == null)
+                  {
+                     throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+                  }
+                  //remove the reference and then add it back in with the scheduled time set.
+                  MessageReference removed = queue.removeReferenceWithID(messageID);
+
+                  removed.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
+
+                  queue.addLast(removed);
+
                break;
             }
             default:
@@ -659,7 +666,6 @@
 
          List<MessageReference> messagesToAck = new ArrayList<MessageReference>();
 
-         Map<Long, List<MessageReference>> routedRefs = new HashMap<Long, List<MessageReference>>();
 
          PageTransactionInfoImpl pageTransactionInfo = null;
 
@@ -686,8 +692,6 @@
 
                   messages.addAll(refs);
 
-                  routedRefs.put(record.id, refs);
-
                   break;
                }
                case ACKNOWLEDGE_REF:
@@ -728,15 +732,29 @@
                }
                case SET_SCHEDULED_DELIVERY_TIME:
                {
-                  ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
-                  scheduledDeliveryEncoding.decode(buff);
-                  List<MessageReference> refs = routedRefs.get(record.id);
-                  //for any references that have already been routed, we need to remove them from the queue and re add them as scheduled
-                  for (MessageReference ref : refs)
+                  long messageID = record.id;
+
+                  ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding();
+
+                  encoding.decode(buff);
+
+                  Queue queue = queues.get(encoding.queueID);
+
+                  if (queue == null)
                   {
-                     ref.setScheduledDeliveryTime(scheduledDeliveryEncoding.getScheduledDeliveryTime());
-                     scheduledMessages.add(ref);
+                     throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
                   }
+
+                  for (MessageReference ref : messages)
+                  {
+                     if(ref.getQueue().getPersistenceID() == encoding.queueID &&
+                           ref.getMessage().getMessageID() == messageID)
+                     {
+                        ref.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
+                        scheduledMessages.add(ref);
+                     }
+                  }
+
                   break;
                }
                default:
@@ -1009,40 +1027,36 @@
          super(queueID);
       }
    }
-   private static class ScheduledDeliveryEncoding implements EncodingSupport
+   private static class ScheduledDeliveryEncoding extends QueueEncoding
    {
-      long messageId;
       long scheduledDeliveryTime;
 
-      private ScheduledDeliveryEncoding(long messageId, long scheduledDeliveryTime)
+      private ScheduledDeliveryEncoding(long scheduledDeliveryTime, long queueID)
       {
-         this.messageId = messageId;
+         super(queueID);
          this.scheduledDeliveryTime = scheduledDeliveryTime;
       }
 
-      public ScheduledDeliveryEncoding(long messageId)
+      public ScheduledDeliveryEncoding()
       {
-         this.messageId = messageId;
+
       }
 
       public int getEncodeSize()
       {
-         return 8;
+         return super.getEncodeSize() + 8;
       }
 
       public void encode(MessagingBuffer buffer)
       {
+         super.encode(buffer);
          buffer.putLong(scheduledDeliveryTime);
       }
 
       public void decode(MessagingBuffer buffer)
       {
+         super.decode(buffer);
          scheduledDeliveryTime = buffer.getLong();
       }
-
-      public long getScheduledDeliveryTime()
-      {
-         return scheduledDeliveryTime;
-      }
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -104,14 +104,15 @@
 	{
 	}
 
-	public void storeAcknowledgeTransactional(long txID, long queueID,
+   public void storeMessageReferenceScheduled(long queueID, long messageID, long scheduledDeliveryTime) throws Exception
+   {
+   }
+
+   public void storeAcknowledgeTransactional(long txID, long queueID,
 			long messageiD) throws Exception
 	{
 	}
 
-   public void storeMessageScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception
-   {
-   }
 
    public void storeDelete(long messageID) throws Exception
 	{
@@ -130,7 +131,7 @@
 	{
 	}
 
-   public void storeMessageScheduledTransactional(long txID, ServerMessage message, long scheduledDeliveryTime) throws Exception
+   public void storeMessageReferenceScheduledTransactional(long txID, long queueID, long messageID, long scheduledDeliveryTime) throws Exception
    {
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -147,5 +147,4 @@
    MessageReference removeFirst();
    
    boolean consumerFailedOver();   
-   void addScheduledDelivery(MessageReference ref);
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -148,6 +148,7 @@
          if(redeliveryDelay > 0)
          {
             scheduledDeliveryTime = System.currentTimeMillis() + redeliveryDelay;
+            persistenceManager.storeMessageReferenceScheduled(queue.getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
          }
          queue.referenceCancelled();
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -185,14 +185,10 @@
          MessageReference ref = iter.previous();
 
          ServerMessage msg = ref.getMessage();
-         if(ref.getScheduledDeliveryTime() <= 0)
+         if(!checkAndSchedule(ref))
          {
             messageReferences.addFirst(ref, msg.getPriority());
          }
-         else
-         {
-            addScheduledDelivery(ref);
-         }
 
          checkWaiting(msg.getMessageID());
       }
@@ -696,13 +692,6 @@
       return ref;
    }
 
-   public void addScheduledDelivery(MessageReference ref)
-   {
-      ScheduledDeliveryRunnable runner = new ScheduledDeliveryRunnable(ref);
-      scheduledRunnables.add(runner);
-      scheduleDelivery(runner, ref.getScheduledDeliveryTime());
-   }
-
    // Public
    // -----------------------------------------------------------------------------
 
@@ -735,6 +724,11 @@
          sizeBytes.addAndGet(ref.getMessage().getEncodeSize());
       }
 
+      if (checkAndSchedule(ref))
+      {
+         return HandleStatus.HANDLED;
+      }
+
       boolean add = false;
 
       if (direct && !backup)
@@ -792,6 +786,31 @@
       return HandleStatus.HANDLED;
    }
 
+   private boolean checkAndSchedule(final MessageReference ref)
+   {
+      long deliveryTime = ref.getScheduledDeliveryTime();
+
+      if (deliveryTime != 0 && scheduledExecutor != null)
+      {
+         if (trace)
+         {
+            log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
+         }
+
+         ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+
+         scheduledRunnables.add(runnable);
+
+         if (!backup)
+         {
+            scheduleDelivery(runnable, deliveryTime);
+         }
+
+         return true;
+      }
+      return false;
+   }
+
    private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
    {
       long now = System.currentTimeMillis();
@@ -809,7 +828,6 @@
       if (status == HandleStatus.HANDLED)
       {
          deliveringCount.incrementAndGet();
-
          return HandleStatus.HANDLED;
       }
       else if (status == HandleStatus.NO_MATCH)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -353,13 +353,16 @@
             if (msg.getDurableRefCount() != 0)
             {
                storageManager.storeMessage(msg);
-               storageManager.storeMessageScheduled(msg, scheduledDeliveryTime);
             }
 
             for (MessageReference ref : refs)
             {
+               if(ref.getQueue().isDurable())
+               {
+                  storageManager.storeMessageReferenceScheduled(ref.getQueue().getPersistenceID(), msg.getMessageID(), scheduledDeliveryTime);
+               }
                ref.setScheduledDeliveryTime(scheduledDeliveryTime);
-               ref.getQueue().addScheduledDelivery(ref);
+               ref.getQueue().addLast(ref);
             }
          }
       }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -173,13 +173,13 @@
       {
          List<MessageReference> refs = route(message);
 
-         if (message.getDurableRefCount() != 0)
-         {
-            storageManager.storeMessageScheduledTransactional(id, message, scheduledDeliveryTime);
-         }
          for (MessageReference ref : refs)
          {
             scheduledReferences.put(ref, scheduledDeliveryTime);
+            if(ref.getQueue().isDurable())
+            {
+               storageManager.storeMessageReferenceScheduledTransactional(id, ref.getQueue().getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
+            }
          }
       }
    }
@@ -303,7 +303,7 @@
          else
          {
             ref.setScheduledDeliveryTime(scheduled);
-            ref.getQueue().addScheduledDelivery(ref);
+            ref.getQueue().addLast(ref);
          }
       }
 
@@ -552,13 +552,13 @@
             // This could happen when the PageStore left the pageState
             List<MessageReference> refs = route(message);
 
-            if (message.getDurableRefCount() != 0)
-            {
-               storageManager.storeMessageScheduledTransactional(id, message, scheduledDeliveryTime);
-            }
             for (MessageReference ref : refs)
             {
                scheduledReferences.put(ref, scheduledDeliveryTime);
+               if(ref.getQueue().isDurable())
+               {
+                  storageManager.storeMessageReferenceScheduledTransactional(id, ref.getQueue().getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
+               }
             }
          }
       }

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -170,7 +170,7 @@
          assertEquals(tm9.getText(), rm5.getText());
 
          //Now the scheduled
-         TextMessage rm6 = (TextMessage)cons.receive(3250);
+         TextMessage rm6 = (TextMessage)cons.receive(15250);
          assertNotNull(rm6);
          assertEquals(tm7.getText(), rm6.getText());
 
@@ -179,7 +179,7 @@
          assertTrue(now2 - now >= 3000);
 
 
-         TextMessage rm7 = (TextMessage)cons.receive(1250);
+         TextMessage rm7 = (TextMessage)cons.receive(16250);
          assertNotNull(rm7);
          assertEquals(tm6.getText(), rm7.getText());
 
@@ -188,7 +188,7 @@
          assertTrue(now2 - now >= 4000);
 
 
-         TextMessage rm8 = (TextMessage)cons.receive(1250);
+         TextMessage rm8 = (TextMessage)cons.receive(17250);
          assertNotNull(rm8);
          assertEquals(tm5.getText(), rm8.getText());
 
@@ -197,7 +197,7 @@
          assertTrue(now2 - now >= 5000);
 
 
-         TextMessage rm9 = (TextMessage)cons.receive(1250);
+         TextMessage rm9 = (TextMessage)cons.receive(18250);
          assertNotNull(rm9);
          assertEquals(tm8.getText(), rm9.getText());
 
@@ -206,7 +206,7 @@
          assertTrue(now2 - now >= 6000);
 
 
-         TextMessage rm10 = (TextMessage)cons.receive(1250);
+         TextMessage rm10 = (TextMessage)cons.receive(19250);
          assertNotNull(rm10);
          assertEquals(tm1.getText(), rm10.getText());
 

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java (from rev 5081, trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -0,0 +1,619 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.scheduling;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.util.id.GUID;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.util.Calendar;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledMessageTest extends UnitTestCase
+{
+   private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+
+   private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+
+   private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/journal";
+
+   private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/bindings";
+
+   private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/page";
+
+   private SimpleString atestq = new SimpleString("ascheduledtestq");
+   private SimpleString atestq2 = new SimpleString("ascheduledtestq2");
+
+   private MessagingService messagingService;
+
+   private ConfigurationImpl configuration;
+
+   protected void setUp() throws Exception
+   {
+      File file = new File(journalDir);
+      File file2 = new File(bindingsDir);
+      File file3 = new File(pageDir);
+      deleteDirectory(file);
+      file.mkdirs();
+      deleteDirectory(file2);
+      file2.mkdirs();
+      deleteDirectory(file3);
+      file3.mkdirs();
+      configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      configuration.setJournalMinFiles(2);
+      configuration.setPagingDirectory(pageDir);
+   }
+
+   protected void tearDown() throws Exception
+   {
+      if (messagingService != null)
+      {
+         try
+         {
+            messagingService.stop();
+            messagingService = null;
+         }
+         catch (Exception e)
+         {
+            //ignore
+         }
+      }
+      new File(journalDir).delete();
+      new File(bindingsDir).delete();
+      new File(pageDir).delete();
+   }
+
+   public void testRecoveredMessageDeliveredCorrectly() throws Exception
+   {
+      testMessageDeliveredCorrectly(true);
+   }
+
+   public void testMessageDeliveredCorrectly() throws Exception
+   {
+      testMessageDeliveredCorrectly(false);
+   }
+
+   public void testScheduledMessagesDeliveredCorrectly() throws Exception
+   {
+      testScheduledMessagesDeliveredCorrectly(false);
+   }
+
+   public void testRecoveredScheduledMessagesDeliveredCorrectly() throws Exception
+   {
+      testScheduledMessagesDeliveredCorrectly(true);
+   }
+
+   public void testScheduledMessagesDeliveredCorrectlyDifferentOrder() throws Exception
+   {
+      testScheduledMessagesDeliveredCorrectlyDifferentOrder(false);
+   }
+
+   public void testRecoveredScheduledMessagesDeliveredCorrectlyDifferentOrder() throws Exception
+   {
+      testScheduledMessagesDeliveredCorrectlyDifferentOrder(true);
+   }
+
+   public void testScheduledAndNormalMessagesDeliveredCorrectly() throws Exception
+   {
+      testScheduledAndNormalMessagesDeliveredCorrectly(false);
+   }
+
+   public void testRecoveredScheduledAndNormalMessagesDeliveredCorrectly() throws Exception
+   {
+      testScheduledAndNormalMessagesDeliveredCorrectly(true);
+   }
+
+   public void testTxMessageDeliveredCorrectly() throws Exception
+   {
+      testTxMessageDeliveredCorrectly(false);
+   }
+
+   public void testRecoveredTxMessageDeliveredCorrectly() throws Exception
+   {
+      testTxMessageDeliveredCorrectly(true);
+   }
+
+   public void testPagedMessageDeliveredCorrectly() throws Exception
+   {
+
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      configuration.setPagingMaxGlobalSizeBytes(0);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(false, true, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage message = createMessage(session, "m1");
+      long time = System.currentTimeMillis();
+      time+=10000;
+      producer.send(message, time);
+
+      producer.close();
+
+
+      ClientConsumer consumer = session.createConsumer(atestq);
+
+      session.start();
+
+      ClientMessage message2 = consumer.receive(10250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m1", message2.getBody().getString());
+
+      message2.processed();
+      session.close();
+   }
+
+   public void testPagedMessageDeliveredMultipleConsumersCorrectly() throws Exception
+   {
+
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      configuration.setPagingMaxGlobalSizeBytes(0);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      QueueSettings qs = new QueueSettings();
+      qs.setRedeliveryDelay(5000l);
+      messagingService.getServer().getQueueSettingsRepository().addMatch(atestq2.toString(), qs);
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(false, true, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      session.createQueue(atestq, atestq2, null, true, true);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage message = createMessage(session, "m1");
+      producer.send(message);
+
+      producer.close();
+
+
+      ClientConsumer consumer = session.createConsumer(atestq);
+      ClientConsumer consumer2 = session.createConsumer(atestq2);
+
+      session.start();
+      ClientMessage message3 = consumer.receive(1000);
+      ClientMessage message2 = consumer2.receive(1000);
+      assertEquals("m1", message3.getBody().getString());
+      assertEquals("m1", message2.getBody().getString());
+      long time = System.currentTimeMillis();
+      //force redelivery
+      consumer.close();
+      consumer2.close();
+      consumer = session.createConsumer(atestq);
+      consumer2 = session.createConsumer(atestq2);
+      message3 = consumer.receive(1000);
+      message2 = consumer2.receive(5250);
+      time+=5000;
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m1", message3.getBody().getString());
+      assertEquals("m1", message2.getBody().getString());
+      message2.processed();
+      message3.processed();
+      session.close();
+   }
+
+   public void testPagedMessageDeliveredMultipleConsumersAfterRecoverCorrectly() throws Exception
+   {
+
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      configuration.setPagingMaxGlobalSizeBytes(0);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      QueueSettings qs = new QueueSettings();
+      qs.setRedeliveryDelay(5000l);
+      messagingService.getServer().getQueueSettingsRepository().addMatch(atestq2.toString(), qs);
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(false, true, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      session.createQueue(atestq, atestq2, null, true, true);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage message = createMessage(session, "m1");
+      producer.send(message);
+
+      producer.close();
+
+
+      ClientConsumer consumer = session.createConsumer(atestq);
+      ClientConsumer consumer2 = session.createConsumer(atestq2);
+
+      session.start();
+      ClientMessage message3 = consumer.receive(1000);
+      ClientMessage message2 = consumer2.receive(1000);
+      assertEquals("m1", message3.getBody().getString());
+      assertEquals("m1", message2.getBody().getString());
+      long time = System.currentTimeMillis();
+      //force redelivery
+      consumer.close();
+      consumer2.close();
+      producer.close();
+      session.close();
+      messagingService.stop();
+      messagingService = null;
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService.start();
+      sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      session = sessionFactory.createSession(false, true, true, false);
+      consumer = session.createConsumer(atestq);
+      consumer2 = session.createConsumer(atestq2);
+      session.start();
+      message3 = consumer.receive(1000);
+      message2 = consumer2.receive(5250);
+      time+=5000;
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m1", message3.getBody().getString());
+      assertEquals("m1", message2.getBody().getString());
+      message2.processed();
+      message3.processed();
+      session.close();
+   }
+   public void testMessageDeliveredCorrectly(boolean recover) throws Exception
+   {
+
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(false, true, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+                                                          System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString("testINVMCoreClient");
+      message.getBody().flip();
+      message.setDurable(true);
+      long time = System.currentTimeMillis();
+      time+=10000;
+      producer.send(message, time);
+
+      if (recover)
+      {
+         producer.close();
+         session.close();
+         messagingService.stop();
+         messagingService = null;
+         messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+         messagingService.start();
+         sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+         session = sessionFactory.createSession(false, true, true, false);
+      }
+      ClientConsumer consumer = session.createConsumer(atestq);
+
+      session.start();
+
+      ClientMessage message2 = consumer.receive(11000);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+      message2.processed();
+      session.close();
+   }
+
+   public void testScheduledMessagesDeliveredCorrectly(boolean recover) throws Exception
+   {
+
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(false, true, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage m1 = createMessage(session, "m1");
+      ClientMessage m2 = createMessage(session, "m2");
+      ClientMessage m3 = createMessage(session, "m3");
+      ClientMessage m4 = createMessage(session, "m4");
+      ClientMessage m5 = createMessage(session, "m5");
+      long time = System.currentTimeMillis();
+      time+=10000;
+      producer.send(m1, time);
+      time+=1000;
+      producer.send(m2, time);
+      time+=1000;
+      producer.send(m3, time);
+      time+=1000;
+      producer.send(m4, time);
+      time+=1000;
+      producer.send(m5, time);
+      time-=4000;
+      if (recover)
+      {
+         producer.close();
+         session.close();
+         messagingService.stop();
+         messagingService = null;
+         messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+         messagingService.start();
+
+         sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+         session = sessionFactory.createSession(false, true, true, false);
+      }
+
+      ClientConsumer consumer = session.createConsumer(atestq);
+
+      session.start();
+
+      ClientMessage message = consumer.receive(11000);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m1", message.getBody().getString());
+      message.processed();
+      time+=1000;
+      message = consumer.receive(1250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m2", message.getBody().getString());
+      message.processed();
+      time+=1000;
+      message = consumer.receive(1250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m3", message.getBody().getString());
+      message.processed();
+      time+=1000;
+      message = consumer.receive(1250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m4", message.getBody().getString());
+      message.processed();
+      time+=1000;
+      message = consumer.receive(1250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m5", message.getBody().getString());
+      message.processed();
+      session.close();
+   }
+
+   public void testScheduledMessagesDeliveredCorrectlyDifferentOrder(boolean recover) throws Exception
+   {
+
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(false, true, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage m1 = createMessage(session, "m1");
+      ClientMessage m2 = createMessage(session, "m2");
+      ClientMessage m3 = createMessage(session, "m3");
+      ClientMessage m4 = createMessage(session, "m4");
+      ClientMessage m5 = createMessage(session, "m5");
+      long time = System.currentTimeMillis();
+      time+=10000;
+      producer.send(m1, time);
+      time+=3000;
+      producer.send(m2, time);
+      time-=2000;
+      producer.send(m3, time);
+      time+=3000;
+      producer.send(m4, time);
+      time-=2000;
+      producer.send(m5, time);
+      time-=2000;
+      ClientConsumer consumer = null;
+      if (recover)
+      {
+         producer.close();
+         session.close();
+         messagingService.stop();
+         messagingService = null;
+         messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+         messagingService.start();
+
+         sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+         session = sessionFactory.createSession(false, true, true, false);
+
+      }
+      consumer = session.createConsumer(atestq);
+
+      session.start();
+
+      ClientMessage message = consumer.receive(10250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m1", message.getBody().getString());
+      message.processed();
+      time+=1000;
+      message = consumer.receive(1250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m3", message.getBody().getString());
+      message.processed();
+      time+=1000;
+      message = consumer.receive(1250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m5", message.getBody().getString());
+      message.processed();
+      time+=1000;
+      message = consumer.receive(1250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m2", message.getBody().getString());
+      message.processed();
+      time+=1000;
+      message = consumer.receive(1250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m4", message.getBody().getString());
+      message.processed();
+      session.close();
+   }
+
+   public void testScheduledAndNormalMessagesDeliveredCorrectly(boolean recover) throws Exception
+   {
+
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(false, true, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage m1 = createMessage(session, "m1");
+      ClientMessage m2 = createMessage(session, "m2");
+      ClientMessage m3 = createMessage(session, "m3");
+      ClientMessage m4 = createMessage(session, "m4");
+      ClientMessage m5 = createMessage(session, "m5");
+      long time = System.currentTimeMillis();
+      time+=10000;
+      producer.send(m1, time);
+      producer.send(m2);
+      time+=1000;
+      producer.send(m3, time);
+      producer.send(m4);
+      time+=1000;
+      producer.send(m5, time);
+      time-=2000;
+      ClientConsumer consumer = null;
+      if (recover)
+      {
+         producer.close();
+         session.close();
+         messagingService.stop();
+         messagingService = null;
+         messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+         messagingService.start();
+
+         sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+         session = sessionFactory.createSession(false, true, true, false);
+      }
+
+      consumer = session.createConsumer(atestq);
+      session.start();
+
+      ClientMessage message = consumer.receive(1000);
+      assertEquals("m2", message.getBody().getString());
+      message.processed();
+      message = consumer.receive(1000);
+      assertEquals("m4", message.getBody().getString());
+      message.processed();
+      message = consumer.receive(10250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m1", message.getBody().getString());
+      message.processed();
+      time+=1000;
+      message = consumer.receive(1250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m3", message.getBody().getString());
+      message.processed();
+      time+=1000;
+      message = consumer.receive(1250);
+      assertTrue(System.currentTimeMillis() >= time);
+      assertEquals("m5", message.getBody().getString());
+      message.processed();
+      session.close();
+   }
+
+   public void testTxMessageDeliveredCorrectly(boolean recover) throws Exception
+   {
+      Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(true, false, false, false);
+      session.createQueue(atestq, atestq, null, true, false);
+      session.start(xid, XAResource.TMNOFLAGS);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+                                                          System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString("testINVMCoreClient");
+      message.getBody().flip();
+      message.setDurable(true);
+      Calendar cal = Calendar.getInstance();
+      cal.roll(Calendar.SECOND, 10);
+      producer.send(message, cal.getTimeInMillis());
+      session.end(xid, XAResource.TMSUCCESS);
+      session.prepare(xid);
+      if (recover)
+      {
+         producer.close();
+         session.close();
+         messagingService.stop();
+         messagingService = null;
+         messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+         messagingService.start();
+
+         sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+         session = sessionFactory.createSession(true, false, false, false);
+      }
+      session.commit(xid, true);
+      ClientConsumer consumer = session.createConsumer(atestq);
+
+      session.start();
+
+      ClientMessage message2 = consumer.receive(10000);
+      assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
+      assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+      message2.processed();
+      session.close();
+   }
+
+
+   private ClientMessage createMessage(ClientSession session, String body)
+   {
+      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+                                                          System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString(body);
+      message.getBody().flip();
+      message.setDurable(true);
+      return message;
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -33,6 +33,7 @@
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -251,11 +252,15 @@
       EasyMock.expect(message.getDurableRefCount()).andReturn(1);
       ref.setScheduledDeliveryTime(time);
       storageManager.storeLastPage(EasyMock.anyLong(), (LastPageRecord) EasyMock.anyObject());
+      storageManager.storeMessageReferenceScheduledTransactional(EasyMock.anyLong(), EasyMock.anyLong(), EasyMock.anyLong(), EasyMock.eq(time));
       storageManager.storeMessageTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject());
-      storageManager.storeMessageScheduledTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject(), EasyMock.eq(time));
       storageManager.commit(EasyMock.anyLong());
-      EasyMock.expect(ref.getQueue()).andReturn(queue);
-      queue.addScheduledDelivery(ref);
+      EasyMock.expect(ref.getQueue()).andStubReturn(queue);
+      EasyMock.expect(queue.isDurable()).andReturn(true);
+      EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+      EasyMock.expect(message.getMessageID()).andStubReturn(2);
+      //storageManager.storeMessageReferenceScheduledTransactional(1,1,2,time);
+      EasyMock.expect(queue.addLast(ref)).andReturn(HandleStatus.HANDLED);
       EasyMock.replay(spi, store, message, storageManager, po, ref, queue);
       SimpleString queueName = new SimpleString("aq");
       PageMessageImpl pageMessage = new PageMessageImpl(message);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-10-09 03:09:55 UTC (rev 5093)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-10-09 09:25:14 UTC (rev 5094)
@@ -752,7 +752,7 @@
 
          refs.add(ref);
 
-         queue.addScheduledDelivery(ref);
+         queue.addLast(ref);
       }
 
 




More information about the jboss-cvs-commits mailing list