[jboss-cvs] JBoss Messaging SVN: r5074 - in trunk: examples/jms and 20 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Oct 6 10:27:34 EDT 2008


Author: ataylor
Date: 2008-10-06 10:27:34 -0400 (Mon, 06 Oct 2008)
New Revision: 5074

Added:
   trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java
   trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
Modified:
   trunk/build-messaging.xml
   trunk/build.xml
   trunk/examples/jms/build.xml
   trunk/examples/messaging/build.xml
   trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1203 - re enabled scheduling

Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/build-messaging.xml	2008-10-06 14:27:34 UTC (rev 5074)
@@ -968,6 +968,10 @@
       <ant dir="${examples.dir}/jms" antfile="build.xml" target="messageGroupingExample"/>
    </target>
 
+   <target name="scheduledExample" depends="client-jar">
+      <ant dir="${examples.dir}/jms" antfile="build.xml" target="scheduledExample"/>
+   </target>
+
    <target name="SimpleClient" depends="client-jar">
       <ant dir="${examples.dir}/messaging" antfile="build.xml" target="SimpleClient"/>
    </target>
@@ -986,8 +990,11 @@
        
    <target name="WildCardClient" depends="jar, client-jar">
          <ant dir="${examples.dir}/messaging" antfile="build.xml" target="WildCardClient"/>
+   </target>
+
+   <target name="ScheduledMessageExample" depends="jar, client-jar">
+         <ant dir="${examples.dir}/messaging" antfile="build.xml" target="ScheduledMessageExample"/>
       </target>
-
    <!-- Performance examples -->
 
    <target name="perfListener" depends="client-jar">

Modified: trunk/build.xml
===================================================================
--- trunk/build.xml	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/build.xml	2008-10-06 14:27:34 UTC (rev 5074)
@@ -194,10 +194,18 @@
       <ant antfile="build-messaging.xml" target="wildcardExample"/>
    </target>
 
+   <target name="ScheduledMessageExample" depends="createthirdparty">
+      <ant antfile="build-messaging.xml" target="ScheduledMessageExample"/>
+   </target>
+
    <target name="messageGroupingExample" depends="createthirdparty">
       <ant antfile="build-messaging.xml" target="messageGroupingExample"/>
    </target>
 
+   <target name="scheduledExample" depends="createthirdparty">
+      <ant antfile="build-messaging.xml" target="scheduledExample"/>
+   </target>
+
    <target name="perfListener" depends="createthirdparty">
       <ant antfile="build-messaging.xml" target="perfListener"/>
    </target>

Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/examples/jms/build.xml	2008-10-06 14:27:34 UTC (rev 5074)
@@ -135,6 +135,12 @@
       </java>
    </target>
 
+   <target name="scheduledExample" depends="compile" description="-> point to point example using a queue">
+      <java classname="org.jboss.jms.example.ScheduledExample" fork="true">
+         <classpath refid="runtime.classpath"/>
+      </java>
+   </target>
+
    <target name="echo-params">
       <echo>
 ***********************************************************************************

Added: trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java	                        (rev 0)
+++ trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -0,0 +1,95 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.example;
+
+import org.jboss.messaging.core.logging.Logger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledExample
+{
+   final static Logger log = Logger.getLogger(ScheduledExample.class);
+
+   public static void main(final String[] args)
+   {
+      DateFormat df = new SimpleDateFormat("hh:mm:ss");
+      Connection connection = null;
+      try
+      {
+         //create an initial context, env will be picked up from jndi.properties
+         InitialContext initialContext = new InitialContext();
+         Queue queue = (Queue) initialContext.lookup("/queue/testQueue");
+         ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
+
+         connection = cf.createConnection();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(queue);
+         Message message = session.createTextMessage("This is a text message!");
+         Calendar cal = Calendar.getInstance();
+         log.info("current time " + df.format(cal.getTime()));
+         cal.roll(Calendar.SECOND, 5);
+         log.info("message scheduled for " + df.format(cal.getTime()));
+         message.setLongProperty("JBM_SCHEDULED_DELIVERY_TIME", cal.getTimeInMillis());
+         log.info("sending message to queue");
+         producer.send(message);
+
+         MessageConsumer messageConsumer = session.createConsumer(queue);
+         connection.start();
+         TextMessage message2 = (TextMessage) messageConsumer.receive(7000);
+         log.info("message received at " + df.format(Calendar.getInstance().getTime()));
+         log.info("message = " + message2.getText());
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         if(connection != null)
+         {
+            try
+            {
+               connection.close();
+            }
+            catch (JMSException e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+}

Modified: trunk/examples/messaging/build.xml
===================================================================
--- trunk/examples/messaging/build.xml	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/examples/messaging/build.xml	2008-10-06 14:27:34 UTC (rev 5074)
@@ -109,4 +109,10 @@
       </java>
    </target>
 
+   <target name="ScheduledMessageExample" depends="compile">
+      <java classname="org.jboss.messaging.example.ScheduledMessageExample" fork="true">
+         <classpath refid="runtime.classpath"/>
+      </java>
+   </target>
+
 </project>
\ No newline at end of file

Added: trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java	                        (rev 0)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -0,0 +1,89 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.example;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledMessageExample
+{
+   private static final  Logger log = Logger.getLogger(ScheduledMessageExample.class);
+   public static void main(final String[] args)
+   {
+      ClientSession clientSession = null;
+      DateFormat df = new SimpleDateFormat("hh:mm:ss");
+      try
+      {
+         ClientSessionFactory sessionFactory =
+            new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"));
+         clientSession = sessionFactory.createSession(false, true, true, false);
+         SimpleString queue = new SimpleString("queuejms.testQueue");
+         ClientProducer clientProducer = clientSession.createProducer(queue);
+         ClientMessage message = clientSession.createClientMessage(JBossTextMessage.TYPE, false, 0,
+                                                       System.currentTimeMillis(), (byte) 1);
+         message.getBody().putString("Hello!");
+         Calendar cal = Calendar.getInstance();
+         log.info("current time " + df.format(cal.getTime()));
+         cal.roll(Calendar.SECOND, 5);
+         log.info("message scheduled for " + df.format(cal.getTime()));
+         clientProducer.send(message, cal.getTimeInMillis());
+         ClientConsumer clientConsumer = clientSession.createConsumer(queue);
+         clientSession.start();
+         ClientMessage msg = clientConsumer.receive(7000);
+         log.info("message received at " + df.format(Calendar.getInstance().getTime()));
+         msg.processed();
+      }
+      catch(Exception e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         if (clientSession != null)
+         {
+            try
+            {
+               clientSession.close();
+            }
+            catch (MessagingException ignore)
+            {
+            }
+         }
+      }
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -36,7 +36,11 @@
 	void send(ClientMessage message) throws MessagingException;
 	
    void send(SimpleString address, ClientMessage message) throws MessagingException;
+   
+   void send(final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException;
 
+   void send(final SimpleString address, final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException;
+
    void sendManagement(ClientMessage mngmntMessage) throws MessagingException;
 
    void registerAcknowledgementHandler(AcknowledgementHandler handler);

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -19,6 +19,7 @@
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.util.SimpleString;
@@ -120,17 +121,31 @@
    {
       checkClosed();
 
-      doSend(null, msg);
+      doSend(null, msg, 0);
    }
 
    public void send(final SimpleString address, final ClientMessage msg) throws MessagingException
    {
       checkClosed();
 
-      doSend(address, msg);
+      doSend(address, msg, 0);
    }
 
-   // use a special wireformat packet to send management message (on the server-side they are
+    public void send(final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
+   {
+      checkClosed();
+
+      doSend(null, msg, scheduleDeliveryTime);
+   }
+
+   public void send(final SimpleString address, final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
+   {
+      checkClosed();
+
+      doSend(address, msg, scheduleDeliveryTime);
+   }
+
+   // use a special wireformat packet to sendScheduled management message (on the server-side they are
    // handled by the server session differently from regular Client Message)
    public void sendManagement(final ClientMessage msg) throws MessagingException
    {
@@ -272,7 +287,7 @@
       closed = true;
    }
 
-   private void doSend(final SimpleString address, final ClientMessage msg) throws MessagingException
+   private void doSend(final SimpleString address, final ClientMessage msg, long scheduledDeliveryTime) throws MessagingException
    {
       if (address != null)
       {
@@ -297,8 +312,18 @@
 
       boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
 
-      SessionSendMessage message = new SessionSendMessage(id, msg, sendBlocking);
+      SessionSendMessage message;
+      //check to see if this message need to be scheduled.
+      if(scheduledDeliveryTime <= 0)
+      {
+         message = new SessionSendMessage(id, msg, sendBlocking);
+      }
+      else
+      {
+         message = new SessionScheduledSendMessage(id, msg, sendBlocking, scheduledDeliveryTime);
+      }
 
+
       if (sendBlocking)
       {
          channel.sendBlocking(message);

Modified: trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -24,6 +24,7 @@
 
 import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.TypedProperties;
 
 /**
  * 
@@ -39,4 +40,6 @@
 
    long getTransactionID();
 
+   TypedProperties getProperties();
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,14 +22,14 @@
 
 package org.jboss.messaging.core.paging;
 
-import java.util.Collection;
-
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessagingComponent;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.util.SimpleString;
 
+import java.util.Collection;
+
 /**
  * 
  * <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
@@ -52,6 +52,7 @@
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  *
  */
 public interface PagingManager extends MessagingComponent
@@ -91,8 +92,22 @@
     * @param message
     * @return false if destination is not on page mode
     */
+   boolean pageScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception;
+
+   /**
+    * Page, only if destination is in page mode.
+    * @param message
+    * @return false if destination is not on page mode
+    */
    boolean page(ServerMessage message, long transactionId) throws Exception;
 
+    /**
+    * Page, only if destination is in page mode.
+    * @param message
+    * @return false if destination is not on page mode
+    */
+   boolean pageScheduled(ServerMessage message, long transactionId, long scheduledDeliveryTime) throws Exception;
+
    /**
     * Point to inform/restoring Transactions used when the messages were added into paging
     * */

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -27,12 +27,14 @@
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.TypedProperties;
 
 /**
  * 
  * This class is used to encapsulate ServerMessage and TransactionID on Paging
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  *
  */
 public class PageMessageImpl implements PageMessage
@@ -52,15 +54,19 @@
 
    private long transactionID = -1;
 
+   private final TypedProperties properties;
+
    public PageMessageImpl(final ServerMessage message, final long transactionID)
    {
       this.message = message;
       this.transactionID = transactionID;
+      properties = new TypedProperties();
    }
 
    public PageMessageImpl(final ServerMessage message)
    {
       this.message = message;
+      properties = new TypedProperties();
    }
 
    public PageMessageImpl()
@@ -68,6 +74,19 @@
       this(new ServerMessageImpl());
    }
 
+   public PageMessageImpl(final ServerMessage message, final TypedProperties properties)
+   {
+      this.message = message;
+      this.properties = properties;
+   }
+
+   public PageMessageImpl(final ServerMessage message, final TypedProperties properties, final long transactionID)
+   {
+      this.message = message;
+      this.transactionID = transactionID;
+      this.properties = properties;
+   }
+
    public ServerMessage getMessage()
    {
       return message;
@@ -78,24 +97,30 @@
       return transactionID;
    }
 
+   public TypedProperties getProperties()
+   {
+      return properties;
+   }
+
    // EncodingSupport implementation --------------------------------
 
    public void decode(final MessagingBuffer buffer)
    {
       transactionID = buffer.getLong();
       message.decode(buffer);
+      properties.decode(buffer);
    }
 
    public void encode(final MessagingBuffer buffer)
    {
       buffer.putLong(transactionID);
       message.encode(buffer);
+      properties.encode(buffer);
    }
 
    public int getEncodeSize()
    {
-
-      return DataConstants.SIZE_LONG  + message.getEncodeSize();
+      return DataConstants.SIZE_LONG  + message.getEncodeSize() + properties.getEncodeSize();
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,15 +22,6 @@
 
 package org.jboss.messaging.core.paging.impl;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.PageMessage;
@@ -45,12 +36,23 @@
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  *  <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  *
  */
 public class PagingManagerImpl implements PagingManager
@@ -91,6 +93,8 @@
    // private static final boolean isTrace = log.isTraceEnabled();
    private static final boolean isTrace = true;
 
+   private static final SimpleString SCHEDULED_DELIVERY_PROP = new SimpleString("JBM_SCHEDULED_DELIVERY_PROP");
+
    // This is just a debug tool method.
    // During debugs you could make log.trace as log.info, and change the
    // variable isTrace above
@@ -202,6 +206,7 @@
       HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
 
       final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+      final List<MessageReference> scheduledRefsToAdd = new ArrayList<MessageReference>();
 
       for (PageMessage msg : data)
       {
@@ -237,12 +242,30 @@
                pageTransactionsToUpdate.add(pageTransactionInfo);
             }
          }
+         Long scheduledDeliveryTime = (Long) msg.getProperties().getProperty(SCHEDULED_DELIVERY_PROP);
+         //if this is a scheduled message we add it to the queue as just that
+         if(scheduledDeliveryTime == null)
+         {
+            refsToAdd.addAll(postOffice.route(msg.getMessage()));
+         }
+         else
+         {
+            List<MessageReference> refs = postOffice.route(msg.getMessage());
+            for (MessageReference ref : refs)
+            {
+               ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+            }
+            scheduledRefsToAdd.addAll(refs);
+         }
 
-         refsToAdd.addAll(postOffice.route(msg.getMessage()));
-
          if (msg.getMessage().getDurableRefCount() != 0)
          {
             storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
+            //write the scheduled message record if needed
+            if(scheduledDeliveryTime != null)
+            {
+               storageManager.storeMessageScheduledTransactional(depageTransactionID, msg.getMessage(), scheduledDeliveryTime);
+            }
          }
       }
 
@@ -268,6 +291,10 @@
          ref.getQueue().addLast(ref);
       }
 
+      for (MessageReference ref : scheduledRefsToAdd)
+      {
+         ref.getQueue().addScheduledDelivery(ref);
+      }
       if (globalMode.get())
       {
          return globalSize.get() < maxGlobalSize - WATERMARK_GLOBAL_PAGE && pagingStore.getMaxSizeBytes() <= 0 ||
@@ -313,6 +340,20 @@
       return getPageStore(message.getDestination()).page(new PageMessageImpl(message));
    }
 
+   public boolean pageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+   {
+      TypedProperties properties = new TypedProperties();
+      properties.putLongProperty(SCHEDULED_DELIVERY_PROP, scheduledDeliveryTime);
+      return getPageStore(message.getDestination()).page(new PageMessageImpl(message, properties));
+   }
+
+   public boolean pageScheduled(final ServerMessage message, final long transactionId, final long scheduledDeliveryTime) throws Exception
+   {
+      TypedProperties properties = new TypedProperties();
+      properties.putLongProperty(SCHEDULED_DELIVERY_PROP, scheduledDeliveryTime);
+      return getPageStore(message.getDestination()).page(new PageMessageImpl(message, properties));
+   }
+
    public void addTransaction(final PageTransactionInfo pageTransaction)
    {
       transactions.put(pageTransaction.getTransactionID(), pageTransaction);

Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,11 +22,6 @@
 
 package org.jboss.messaging.core.persistence;
 
-import java.util.List;
-import java.util.Map;
-
-import javax.transaction.xa.Xid;
-
 import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.postoffice.Binding;
@@ -39,11 +34,16 @@
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.util.SimpleString;
 
+import javax.transaction.xa.Xid;
+import java.util.List;
+import java.util.Map;
+
 /**
  * 
  * A StorageManager
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  *
  */
 public interface StorageManager extends MessagingComponent
@@ -58,10 +58,14 @@
 
    void storeDelete(long messageID) throws Exception;
 
+   void storeMessageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception;
+
    void storeMessageTransactional(long txID, ServerMessage message) throws Exception;
 
    void storeAcknowledgeTransactional(long txID, long queueID, long messageiD) throws Exception;
 
+   void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception;
+
    void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
 
    /** Used to delete non-messaging data (such as PageTransaction and LasPage) */

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,17 +22,6 @@
 
 package org.jboss.messaging.core.persistence.impl.journal;
 
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.xa.Xid;
-
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
@@ -70,6 +59,17 @@
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TimeAndCounterIDGenerator;
 
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * 
  * A JournalStorageManager
@@ -222,6 +222,11 @@
       messageJournal.appendDeleteRecord(messageID);
    }
 
+   public void storeMessageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+   {
+      messageJournal.appendUpdateRecord(message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME,  new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+   }
+
    // Transactional operations
 
    public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
@@ -270,6 +275,11 @@
       messageJournal.appendDeleteRecordTransactional(txID, recordID, null);
    }
 
+   public void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+   {
+      messageJournal.appendUpdateRecordTransactional(txID, message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME,  new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+   }
+
    public void storeDeleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
       messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
@@ -309,7 +319,7 @@
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
 
       messageJournal.load(records, preparedTransactions);
-
+      Map<Long, List<MessageReference>> routedRefs = new HashMap<Long, List<MessageReference>>();
       for (RecordInfo record : records)
       {
          byte[] data = record.data;
@@ -335,6 +345,7 @@
                   ref.getQueue().addLast(ref);
                }
 
+               routedRefs.put(record.id, refs);
                break;
             }
             case ACKNOWLEDGE_REF:
@@ -417,7 +428,18 @@
             }
             case SET_SCHEDULED_DELIVERY_TIME:
             {
-               // TODO
+               ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
+               scheduledDeliveryEncoding.decode(buff);
+               List<MessageReference> refs = routedRefs.get(record.id);
+               //for any references that have already been routed, we need to remove them from t he queue and re add them as scheduled
+               for (MessageReference ref : refs)
+               {
+                  ref.getQueue().removeReferenceWithID(record.id);
+                  ref.setScheduledDeliveryTime(scheduledDeliveryEncoding.getScheduledDeliveryTime());
+                  ref.getQueue().addScheduledDelivery(ref);
+               }
+
+               break;
             }
             default:
             {
@@ -633,8 +655,12 @@
 
          List<MessageReference> messages = new ArrayList<MessageReference>();
 
+         List<MessageReference> scheduledMessages = new ArrayList<MessageReference>();
+
          List<MessageReference> messagesToAck = new ArrayList<MessageReference>();
 
+         Map<Long, List<MessageReference>> routedRefs = new HashMap<Long, List<MessageReference>>();
+
          PageTransactionInfoImpl pageTransactionInfo = null;
 
          // first get any sent messages for this tx and recreate
@@ -660,6 +686,8 @@
 
                   messages.addAll(refs);
 
+                  routedRefs.put(record.id, refs);
+
                   break;
                }
                case ACKNOWLEDGE_REF:
@@ -698,6 +726,19 @@
 
                   break;
                }
+               case SET_SCHEDULED_DELIVERY_TIME:
+               {
+                  ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
+                  scheduledDeliveryEncoding.decode(buff);
+                  List<MessageReference> refs = routedRefs.get(record.id);
+                  //for any references that have already been routed, we need to remove them from the queue and re add them as scheduled
+                  for (MessageReference ref : refs)
+                  {
+                     ref.setScheduledDeliveryTime(scheduledDeliveryEncoding.getScheduledDeliveryTime());
+                     scheduledMessages.add(ref);
+                  }
+                  break;
+               }
                default:
                   log.warn("InternalError: Record type " + recordType +
                            " not recognized. Maybe you're using journal files created on a different version");
@@ -736,7 +777,7 @@
          }
 
          // now we recreate the state of the tx and add to the resource manager
-         tx.replay(messages, messagesToAck, pageTransactionInfo, Transaction.State.PREPARED);
+         tx.replay(messages, scheduledMessages, messagesToAck, pageTransactionInfo, Transaction.State.PREPARED);
 
          resourceManager.putTransaction(xid, tx);
       }
@@ -968,4 +1009,40 @@
          super(queueID);
       }
    }
+   private static class ScheduledDeliveryEncoding implements EncodingSupport
+   {
+      long messageId;
+      long scheduledDeliveryTime;
+
+      private ScheduledDeliveryEncoding(long messageId, long scheduledDeliveryTime)
+      {
+         this.messageId = messageId;
+         this.scheduledDeliveryTime = scheduledDeliveryTime;
+      }
+
+      public ScheduledDeliveryEncoding(long messageId)
+      {
+         this.messageId = messageId;
+      }
+
+      public int getEncodeSize()
+      {
+         return 8;
+      }
+
+      public void encode(MessagingBuffer buffer)
+      {
+         buffer.putLong(scheduledDeliveryTime);
+      }
+
+      public void decode(MessagingBuffer buffer)
+      {
+         scheduledDeliveryTime = buffer.getLong();
+      }
+
+      public long getScheduledDeliveryTime()
+      {
+         return scheduledDeliveryTime;
+      }
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,11 +22,6 @@
 
 package org.jboss.messaging.core.persistence.impl.nullpm;
 
-import java.util.List;
-import java.util.Map;
-
-import javax.transaction.xa.Xid;
-
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.PageTransactionInfo;
@@ -42,6 +37,10 @@
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TimeAndCounterIDGenerator;
 
+import javax.transaction.xa.Xid;
+import java.util.List;
+import java.util.Map;
+
 /**
  * 
  * A NullStorageManager
@@ -110,7 +109,11 @@
 	{
 	}
 
-	public void storeDelete(long messageID) throws Exception
+   public void storeMessageScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception
+   {
+   }
+
+   public void storeDelete(long messageID) throws Exception
 	{
 	}
 
@@ -127,10 +130,14 @@
 	{
 	}
 
-	public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
+   public void storeMessageScheduledTransactional(long txID, ServerMessage message, long scheduledDeliveryTime) throws Exception
    {
    }
 
+   public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
+   {
+   }
+
    public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
    {
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -12,6 +12,21 @@
 
 package org.jboss.messaging.core.remoting.impl;
 
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.ResponseNotifier;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.DuplicablePacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
@@ -49,6 +64,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SCHEDULED_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
@@ -68,37 +84,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.ResponseNotifier;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.DuplicablePacket;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
@@ -130,6 +115,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
@@ -152,6 +138,21 @@
 import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.messaging.util.SimpleIDGenerator;
 
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 /**
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -777,6 +778,11 @@
             packet = new SessionBrowserCloseMessage();
             break;
          }
+         case SESS_SCHEDULED_SEND:
+         {
+            packet = new SessionScheduledSendMessage();
+            break;
+         }
          case NULL_RESPONSE:
          {
             packet = new NullResponseMessage();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -45,7 +45,7 @@
    public static final byte EXCEPTION = 20;
 
    public static final byte NULL_RESPONSE = 21;
-   
+
    public static final byte PACKETS_CONFIRMED = 22;
 
 
@@ -53,7 +53,7 @@
    public static final byte CREATESESSION = 30;
 
    public static final byte CREATESESSION_RESP = 31;
-   
+
    public static final byte REATTACH_SESSION = 32;
 
    public static final byte REATTACH_SESSION_RESP = 33;
@@ -76,7 +76,7 @@
    public static final byte SESS_COMMIT = 47;
 
    public static final byte SESS_ROLLBACK = 48;
-   
+
    public static final byte SESS_QUEUEQUERY = 49;
 
    public static final byte SESS_QUEUEQUERY_RESP = 50;
@@ -135,10 +135,10 @@
 
    public static final byte SESS_XA_GET_TIMEOUT_RESP = 77;
 
-   public static final byte SESS_START = 78; 
+   public static final byte SESS_START = 78;
 
    public static final byte SESS_STOP = 79;
-   
+
    public static final byte SESS_CLOSE = 80;
 
    public static final byte SESS_FLOWTOKEN = 81;
@@ -156,7 +156,9 @@
    public static final byte SESS_RECEIVE_MSG = 87;
 
    public static final byte SESS_MANAGEMENT_SEND = 88;
-   
+
+   public static final byte SESS_SCHEDULED_SEND = 91;
+
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)
@@ -217,34 +219,34 @@
    public void decodeBody(final MessagingBuffer buffer)
    {
    }
-   
+
    public boolean isRequiresConfirmations()
    {
       return true;
    }
-   
+
    public boolean isReplicateBlocking()
    {
       return false;
    }
-   
+
    public boolean isWriteAlways()
    {
       return false;
    }
-   
+
    public boolean isReHandleResponseOnFailure()
    {
       return false;
    }
-   
+
    public boolean isDuplicate()
    {
       return false;
    }
-   
+
    public void setDuplicate(final boolean duplicate)
-   {      
+   {
    }
 
    @Override
@@ -265,7 +267,7 @@
 
       return r.type == type && r.channelID == channelID;
    }
-      
+
    // Package protected ---------------------------------------------
 
    protected String getParentString()

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class SessionScheduledSendMessage extends SessionSendMessage
+{
+   private long scheduledDeliveryTime;
+
+   public SessionScheduledSendMessage(final long producerID, final ClientMessage message, final boolean requiresResponse, final long scheduledDeliveryTime)
+   {
+      super(SESS_SCHEDULED_SEND, producerID, message, requiresResponse);
+      this.scheduledDeliveryTime = scheduledDeliveryTime;
+   }
+
+   public SessionScheduledSendMessage()
+   {
+      super(SESS_SCHEDULED_SEND);  
+   }
+
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      super.encodeBody(buffer);
+      buffer.putLong(scheduledDeliveryTime);
+   }
+
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      super.decodeBody(buffer);
+      scheduledDeliveryTime = buffer.getLong();
+   }
+
+   public long getScheduledDeliveryTime()
+   {
+      return scheduledDeliveryTime;
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -70,6 +70,22 @@
       super(SESS_SEND);
    }
 
+   protected SessionSendMessage(final byte type, final long producerID, final ClientMessage message, final boolean requiresResponse)
+   {
+      super(type);
+
+      this.producerID = producerID;
+
+      this.clientMessage = message;
+
+      this.requiresResponse = requiresResponse;
+   }
+
+   protected SessionSendMessage(byte type)
+   {
+      super(type);
+   }
+
    // Public --------------------------------------------------------
 
    public boolean isReHandleResponseOnFailure()

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,11 +22,6 @@
 
 package org.jboss.messaging.core.server;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
@@ -36,7 +31,12 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 
+
 /**
  * 
  * A Queue
@@ -147,4 +147,5 @@
    MessageReference removeFirst();
    
    boolean consumerFailedOver();   
+   void addScheduledDelivery(MessageReference ref);
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -28,6 +28,7 @@
  * A ServerProducer
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  *
  */
 public interface ServerProducer
@@ -37,6 +38,8 @@
 	void close() throws Exception;
 	
 	void send(ServerMessage msg) throws Exception;
+
+   void sendScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception;
 	
 	void requestAndSendCredits() throws Exception;
 	

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -23,7 +23,12 @@
 package org.jboss.messaging.core.server;
 
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
 import org.jboss.messaging.core.server.impl.ServerBrowserImpl;
 import org.jboss.messaging.util.SimpleString;
 
@@ -31,10 +36,11 @@
 import java.util.List;
 
 /**
- * 
+ *
  * A ServerSession
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  *
  */
 public interface ServerSession
@@ -59,6 +65,8 @@
 
    void send(ServerMessage msg) throws Exception;
 
+   void sendScheduled(ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
+
    void processed(final long consumerID, final long messageID) throws Exception;
 
    void rollback() throws Exception;
@@ -82,7 +90,7 @@
    SessionXAResponseMessage XAStart(Xid xid);
 
    SessionXAResponseMessage XASuspend() throws Exception;
-   
+
    List<Xid> getInDoubtXids() throws Exception;
 
    int getXATimeout();
@@ -137,6 +145,8 @@
 
    void sendProducerMessage(long producerID, ServerMessage message) throws Exception;
 
+   void sendScheduledProducerMessage(long producerID, ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
+
    boolean browserHasNextMessage(long browserID) throws Exception;
 
    ServerMessage browserNextMessage(long browserID) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -12,23 +12,6 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.list.PriorityLinkedList;
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
@@ -49,6 +32,23 @@
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.SimpleString;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Implementation of a Queue TODO use Java 5 concurrent queue
  *
@@ -690,6 +690,13 @@
       return ref;
    }
 
+   public void addScheduledDelivery(MessageReference ref)
+   {
+      ScheduledDeliveryRunnable runner = new ScheduledDeliveryRunnable(ref);
+      scheduledRunnables.add(runner);
+      scheduleDelivery(runner, ref.getScheduledDeliveryTime());
+   }
+
    // Public
    // -----------------------------------------------------------------------------
 
@@ -722,11 +729,6 @@
          sizeBytes.addAndGet(ref.getMessage().getEncodeSize());
       }
 
-      if (checkAndSchedule(ref))
-      {
-         return HandleStatus.HANDLED;
-      }
-
       boolean add = false;
 
       if (direct && !backup)
@@ -784,31 +786,6 @@
       return HandleStatus.HANDLED;
    }
 
-   private boolean checkAndSchedule(final MessageReference ref)
-   {
-      long deliveryTime = ref.getScheduledDeliveryTime();
-
-      if (deliveryTime != 0 && scheduledExecutor != null)
-      {
-         if (trace)
-         {
-            log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
-         }
-
-         ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
-
-         scheduledRunnables.add(runnable);
-
-         if (!backup)
-         {
-            scheduleDelivery(runnable, deliveryTime);
-         }
-
-         return true;
-      }
-      return false;
-   }
-
    private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
    {
       long now = System.currentTimeMillis();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,8 +22,6 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.remoting.Channel;
@@ -34,11 +32,14 @@
 import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.util.SimpleString;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * 
  * A ServerProducerImpl
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  *
  */
 public class ServerProducerImpl implements ServerProducer
@@ -95,26 +96,20 @@
 	}
 	
 	public void send(final ServerMessage message) throws Exception
-	{		
-		if (this.address != null)
-		{			
-		   //Only do flow control with non anonymous producers
-		   
-			if (flowController != null)
-		   {
-			   int creds = creditsToSend.addAndGet(message.getEncodeSize());
-			   
-			   if (creds >= windowSize)
-			   {
-			      requestAndSendCredits();
-			   }
-			}
-		}
-		
-		session.send(message);  		
+	{
+      doFlowControl(message);
+
+      session.send(message);  		
 	}
-	
-	public void requestAndSendCredits() throws Exception
+
+   public void sendScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+   {
+      doFlowControl(message);
+
+      session.sendScheduled(message, scheduledDeliveryTime);
+   }
+
+   public void requestAndSendCredits() throws Exception
 	{	 
 	   if (!waiting)
 	   {
@@ -140,4 +135,24 @@
 	{
 		return waiting;
 	}
+
+
+
+   private void doFlowControl(final ServerMessage message) throws Exception
+   {
+      if (this.address != null)
+      {
+         //Only do flow control with non anonymous producers
+
+         if (flowController != null)
+         {
+            int creds = creditsToSend.addAndGet(message.getEncodeSize());
+
+            if (creds >= windowSize)
+            {
+               requestAndSendCredits();
+            }
+         }
+      }
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -27,11 +27,22 @@
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
 import org.jboss.messaging.core.security.CheckType;
 import org.jboss.messaging.core.security.SecurityStore;
-import org.jboss.messaging.core.server.*;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerConsumer;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ServerProducer;
+import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
@@ -48,7 +59,11 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
@@ -58,6 +73,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a> 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a> 
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  */
 
 public class ServerSessionImpl implements ServerSession, FailureListener, NotificationListener
@@ -298,42 +314,62 @@
    public void send(final ServerMessage msg) throws Exception
    {
       // check the user has write access to this address.
-      try
+      doSecurity(msg);
+
+      if (autoCommitSends)
       {
-         securityStore.check(msg.getDestination(), CheckType.WRITE, this);
-      }
-      catch (MessagingException e)
-      {
-         if (!autoCommitSends)
+         if (!pager.page(msg))
          {
-            tx.markAsRollbackOnly(e);
+            List<MessageReference> refs = postOffice.route(msg);
+
+            if (msg.getDurableRefCount() != 0)
+            {
+               storageManager.storeMessage(msg);
+            }
+
+            for (MessageReference ref : refs)
+            {
+               ref.getQueue().addLast(ref);
+            }
          }
-         throw e;
       }
+      else
+      {
+         tx.addMessage(msg);
+      }
+   }
 
+
+   public void sendScheduled(final ServerMessage msg, final long scheduledDeliveryTime) throws Exception
+   {
+      doSecurity(msg);
+
       if (autoCommitSends)
       {
-         if (!pager.page(msg))
+         if (!pager.pageScheduled(msg, scheduledDeliveryTime))
          {
             List<MessageReference> refs = postOffice.route(msg);
 
             if (msg.getDurableRefCount() != 0)
             {
                storageManager.storeMessage(msg);
+               storageManager.storeMessageScheduled(msg, scheduledDeliveryTime);
             }
 
             for (MessageReference ref : refs)
             {
-               ref.getQueue().addLast(ref);
+               ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+               ref.getQueue().addScheduledDelivery(ref);
             }
          }
       }
       else
       {
-         tx.addMessage(msg);
+         tx.addScheduledMessage(msg, scheduledDeliveryTime);
       }
    }
 
+
    public void processed(final long consumerID, final long messageID) throws Exception
    {
       MessageReference ref = consumers.get(consumerID).waitForReference(messageID);
@@ -367,7 +403,7 @@
          }
       }
    }
-   
+
    public void rollback() throws Exception
    {
       rollback(true);
@@ -463,7 +499,7 @@
 
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
-   
+
    public SessionXAResponseMessage XAEnd(final Xid xid, final boolean failed) throws Exception
    {
       if (tx != null && tx.getXid().equals(xid))
@@ -504,7 +540,7 @@
 
       return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
    }
-   
+
    public SessionXAResponseMessage XAForget(final Xid xid)
    {
       // Do nothing since we don't support heuristic commits / rollback from the
@@ -907,7 +943,7 @@
 
       return response;
    }
-   
+
 //   public SessionCreateConsumerResponseMessage recreateConsumer(final SimpleString queueName,
 //                                                              final SimpleString filterString,
 //                                                              int windowSize,
@@ -1014,7 +1050,7 @@
 
    /**
     * Create a producer for the specified address
-    * 
+    *
     * @param address The address to produce too
     * @param windowSize - the producer window size to use for flow control. Specify -1 to disable flow control
     *           completely The actual window size used may be less than the specified window size if it is overridden by
@@ -1062,7 +1098,7 @@
       }
       return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
    }
-   
+
 //   public SessionCreateProducerResponseMessage recreateProducer(final SimpleString address,
 //                                                              final int windowSize,
 //                                                              final int maxRate,
@@ -1078,7 +1114,7 @@
 //      }
 //
 //      final int windowToUse = flowController == null ? -1 : windowSize;
-//      
+//
 //      // Get some initial credits to send to the producer - we try for
 //      // windowToUse
 //
@@ -1133,6 +1169,11 @@
       producers.get(producerID).send(message);
    }
 
+   public void sendScheduledProducerMessage(final long producerID, final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+   {
+       producers.get(producerID).sendScheduled(message, scheduledDeliveryTime);  
+   }
+
    public int transferConnection(final RemotingConnection newConnection)
    {
       remotingConnection.removeFailureListener(this);
@@ -1147,11 +1188,11 @@
       remotingConnection = newConnection;
 
       remotingConnection.addFailureListener(this);
-      
+
       int lastReceivedCommandID =  channel.getLastReceivedCommandID();
-     
+
       //TODO resend any dup responses
-      
+
       return lastReceivedCommandID;
    }
 
@@ -1293,4 +1334,20 @@
       queue.referenceAcknowledged(ref);
    }
 
+
+   private void doSecurity(final ServerMessage msg) throws Exception
+   {
+      try
+      {
+         securityStore.check(msg.getDestination(), CheckType.WRITE, this);
+      }
+      catch (MessagingException e)
+      {
+         if (!autoCommitSends)
+         {
+            tx.markAsRollbackOnly(e);
+         }
+         throw e;
+      }
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -12,6 +12,14 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
@@ -33,6 +41,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SCHEDULED_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
@@ -48,19 +57,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import java.util.List;
-
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
@@ -80,6 +76,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
@@ -97,11 +94,15 @@
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
 
+import javax.transaction.xa.Xid;
+import java.util.List;
+
 /**
  * A ServerSessionPacketHandler
- * 
+ *
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  */
 public class ServerSessionPacketHandler implements ChannelHandler
 {
@@ -134,12 +135,12 @@
    {
       byte type = packet.getType();
 
-      if (type == SESS_SEND)
+      if (type == SESS_SEND || type == SESS_SCHEDULED_SEND)
       {
          SessionSendMessage send = (SessionSendMessage)packet;
 
          ServerMessage msg = send.getServerMessage();
-         
+
          if (msg.getMessageID() == 0L)
          {
             // must generate message id here, so we know they are in sync
@@ -148,13 +149,13 @@
             send.getServerMessage().setMessageID(id);
          }
       }
-      
+
       Packet response = null;
 
       try
       {
-         channel.replicatePacket(packet);         
-         
+         channel.replicatePacket(packet);
+
          switch (type)
          {
             case SESS_CREATECONSUMER:
@@ -227,7 +228,7 @@
             }
             case SESS_ROLLBACK:
             {
-               session.rollback();    
+               session.rollback();
                //Rollback response is handled in the rollback() method
                break;
             }
@@ -369,6 +370,16 @@
                }
                break;
             }
+            case SESS_SCHEDULED_SEND:
+            {
+               SessionScheduledSendMessage message = (SessionScheduledSendMessage)packet;
+               session.sendScheduledProducerMessage(message.getProducerID(), message.getServerMessage(), message.getScheduledDeliveryTime());
+               if (message.isRequiresResponse())
+               {
+                  response = new NullResponseMessage();
+               }
+               break;
+            }
             case SESS_BROWSER_HASNEXTMESSAGE:
             {
                SessionBrowserHasNextMessageMessage message = (SessionBrowserHasNextMessageMessage)packet;

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -36,6 +36,7 @@
  * A JBoss Messaging internal transaction
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  */
 public interface Transaction
 {
@@ -67,8 +68,10 @@
 
    void markAsRollbackOnly(MessagingException messagingException);
 
-   void replay(List<MessageReference> messages, List<MessageReference> acknowledgements, PageTransactionInfo pageTransaction, State prepared) throws Exception;
+   void replay(List<MessageReference> messages, List<MessageReference> scheduledMessages, List<MessageReference> acknowledgements, PageTransactionInfo pageTransaction, State prepared) throws Exception;
 
+   void addScheduledMessage(ServerMessage msg, long scheduledDeliveryTime) throws Exception;
+
    static enum State
    {
       ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY;

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -28,12 +28,18 @@
 import org.jboss.messaging.util.SimpleString;
 
 import javax.transaction.xa.Xid;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * A TransactionImpl
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  */
 public class TransactionImpl implements Transaction
 {
@@ -51,6 +57,10 @@
 
    private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
 
+   private final Map<ServerMessage, Long> scheduledPagedMessages = new HashMap<ServerMessage, Long>();
+
+   private final Map<MessageReference, Long> scheduledReferences = new HashMap<MessageReference, Long>();
+
    private PageTransactionInfo pageTransaction;
 
    private final Xid xid;
@@ -148,6 +158,32 @@
       }
    }
 
+   public void addScheduledMessage(final ServerMessage message, long scheduledDeliveryTime) throws Exception
+   {
+      if (state != State.ACTIVE)
+      {
+         throw new IllegalStateException("Transaction is in invalid state " + state);
+      }
+
+      if (pagingManager.isPaging(message.getDestination()))
+      {
+         scheduledPagedMessages.put(message, scheduledDeliveryTime);
+      }
+      else
+      {
+         List<MessageReference> refs = route(message);
+
+         if (message.getDurableRefCount() != 0)
+         {
+            storageManager.storeMessageScheduledTransactional(id, message, scheduledDeliveryTime);
+         }
+         for (MessageReference ref : refs)
+         {
+            scheduledReferences.put(ref, scheduledDeliveryTime);
+         }
+      }
+   }
+
    public void addAcknowledgement(final MessageReference acknowledgement) throws Exception
    {
       if (state != State.ACTIVE)
@@ -259,7 +295,16 @@
 
       for (MessageReference ref : refsToAdd)
       {
-         ref.getQueue().addLast(ref);
+         Long scheduled = scheduledReferences.get(ref);
+         if(scheduled == null)
+         {
+            ref.getQueue().addLast(ref);
+         }
+         else
+         {
+            ref.setScheduledDeliveryTime(scheduled);
+            ref.getQueue().addScheduledDelivery(ref);
+         }
       }
 
       // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
@@ -403,12 +448,17 @@
    }
 
    public void replay(List<MessageReference> messages,
+                      List<MessageReference> scheduledMessages,
                       List<MessageReference> acknowledgements,
                       PageTransactionInfo pageTransaction,
                       State prepared) throws Exception
    {
       containsPersistent = true;
       refsToAdd.addAll(messages);
+      for (MessageReference scheduledMessage : scheduledMessages)
+      {
+         this.scheduledReferences.put(scheduledMessage, scheduledMessage.getScheduledDeliveryTime());
+      }
       this.acknowledgements.addAll(acknowledgements);
       this.pageTransaction = pageTransaction;
 
@@ -428,7 +478,7 @@
    // Private
    // -------------------------------------------------------------------
 
-   private void route(final ServerMessage message) throws Exception
+   private List<MessageReference> route(final ServerMessage message) throws Exception
    {
       List<MessageReference> refs = postOffice.route(message);
 
@@ -440,6 +490,7 @@
 
          containsPersistent = true;
       }
+      return refs;
    }
 
    private void pageMessages() throws Exception
@@ -481,6 +532,37 @@
          }
       }
 
+      for (ServerMessage message : scheduledPagedMessages.keySet())
+      {
+         long scheduledDeliveryTime = scheduledPagedMessages.get(message);
+         // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+         // Explained under Transaction On Paging. (This is the item B)
+         if (pagingManager.pageScheduled(message, id, scheduledDeliveryTime))
+         {
+            if (message.isDurable())
+            {
+               // We only create pageTransactions if using persistent messages
+               pageTransaction.increment();
+               pagingPersistent = true;
+               pagedDestinationsToSync.add(message.getDestination());
+            }
+         }
+         else
+         {
+            // This could happen when the PageStore left the pageState
+            List<MessageReference> refs = route(message);
+
+            if (message.getDurableRefCount() != 0)
+            {
+               storageManager.storeMessageScheduledTransactional(id, message, scheduledDeliveryTime);
+            }
+            for (MessageReference ref : refs)
+            {
+               scheduledReferences.put(ref, scheduledDeliveryTime);
+            }
+         }
+      }
+
       if (pagingPersistent)
       {
          containsPersistent = true;
@@ -499,5 +581,9 @@
       acknowledgements.clear();
 
       pagedMessages.clear();
+
+      scheduledPagedMessages.clear();
+
+      scheduledReferences.clear();
    }
 }

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -23,9 +23,20 @@
 import org.jboss.messaging.jms.JBossDestination;
 import org.jboss.messaging.util.SimpleString;
 
-import javax.jms.*;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
 
 /**
  * Implementation of a JMS Message JMS Messages only live on the client side - the server only deals with MessageImpl
@@ -66,6 +77,8 @@
    // Used when bridging a message
    public static final String JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST = "JBM_BRIDGE_MSG_ID_LIST";
 
+   public static final String JBM_SCHEDULED_DELIVERY_TIME = "JBM_SCHEDULED_DELIVERY_TIME";
+
    public static final byte TYPE = 0;
 
    // Static --------------------------------------------------------
@@ -162,6 +175,8 @@
    // Cache it
    private String jmsType;
 
+   private long scheduledDeliveryTime = 0;
+
    // Constructors --------------------------------------------------
    /**
     * constructors for test purposes only
@@ -363,6 +378,17 @@
       return jmsCorrelationID;
    }
 
+   public long getScheduledDeliveryTime()
+   {
+      return scheduledDeliveryTime;
+   }
+
+   public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+   {
+      message.putLongProperty(new SimpleString(JBM_SCHEDULED_DELIVERY_TIME), scheduledDeliveryTime);
+      this.scheduledDeliveryTime = scheduledDeliveryTime;
+   }
+
    public Destination getJMSReplyTo() throws JMSException
    {
       if (replyTo == null)
@@ -806,6 +832,10 @@
    {
       Long l = new Long(value);
       checkProperty(name, l);
+      if(JBM_SCHEDULED_DELIVERY_TIME.equals(name))
+      {
+         scheduledDeliveryTime = l;
+      }
       message.putLongProperty(new SimpleString(name), value);
    }
 

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,7 +22,14 @@
 
 package org.jboss.messaging.jms.client;
 
-import java.util.concurrent.atomic.AtomicLong;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.jms.JBossDestination;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UUIDGenerator;
 
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
@@ -40,16 +47,8 @@
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicPublisher;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.jms.JBossDestination;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.UUIDGenerator;
-
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -448,10 +447,18 @@
       {
          coreMessage.putStringProperty(JBossConnection.CONNECTION_ID_PROPERTY_NAME, connID);
       }
-      
+
       try
       {      	
-      	producer.send(address, coreMessage);      		      	
+      	//check to see if this message needs to be scheduled
+         if(jbm.getScheduledDeliveryTime() > 0)
+         {
+            producer.send(address, coreMessage, jbm.getScheduledDeliveryTime());
+         }
+         else
+         {
+            producer.send(address, coreMessage);
+         }
       }
       catch (MessagingException e)
       {

Added: trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.timing.core.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.util.id.GUID;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.util.Calendar;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledMessageTest extends UnitTestCase
+{
+   private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+
+   private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+
+   private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/journal";
+
+   private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/bindings";
+
+   private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/page";
+
+   private SimpleString atestq = new SimpleString("ascheduledtestq");
+
+   private MessagingService messagingService;
+
+   private ConfigurationImpl configuration;
+
+   protected void setUp() throws Exception
+   {
+      File file = new File(journalDir);
+      File file2 = new File(bindingsDir);
+      File file3 = new File(pageDir);
+      deleteDirectory(file);
+      file.mkdirs();
+      deleteDirectory(file2);
+      file2.mkdirs();
+      deleteDirectory(file3);
+      file3.mkdirs();
+      configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      configuration.setJournalMinFiles(2);
+      configuration.setPagingDirectory(pageDir);
+   }
+
+   protected void tearDown() throws Exception
+   {
+      if (messagingService != null)
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Exception e)
+         {
+            //ignore
+         }
+      }
+      new File(journalDir).delete();
+      new File(bindingsDir).delete();
+      new File(pageDir).delete();
+   }
+
+   public void testRecoveredMessageDeliveredCorrectly() throws Exception
+   {
+
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(false, true, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+                                                          System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString("testINVMCoreClient");
+      message.getBody().flip();
+      message.setDurable(true);
+      Calendar cal = Calendar.getInstance();
+      cal.roll(Calendar.SECOND, 10);
+      producer.send(message, cal.getTimeInMillis());
+
+      producer.close();
+      session.close();
+      messagingService.stop();
+      messagingService = null;
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService.start();
+
+      sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+      session = sessionFactory.createSession(false, true, true, false);
+
+      ClientConsumer consumer = session.createConsumer(atestq);
+
+      session.start();
+
+      ClientMessage message2 = consumer.receive(10000);
+      assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
+      assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+      message2.processed();
+      session.close();
+   }
+
+   public void testRecoveredTxMessageDeliveredCorrectly() throws Exception
+   {
+       Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(true, false, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      session.start(xid,  XAResource.TMNOFLAGS);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+                                                          System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString("testINVMCoreClient");
+      message.getBody().flip();
+      message.setDurable(true);
+      Calendar cal = Calendar.getInstance();
+      cal.roll(Calendar.SECOND, 10);
+      producer.send(message, cal.getTimeInMillis());
+      session.end(xid, XAResource.TMSUCCESS);
+      session.prepare(xid);
+      producer.close();
+      session.close();
+      messagingService.stop();
+      messagingService = null;
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService.start();
+
+      sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+      session = sessionFactory.createSession(true, false, false, false);
+      session.commit(xid, true);
+      ClientConsumer consumer = session.createConsumer(atestq);
+
+      session.start();
+
+      ClientMessage message2 = consumer.receive(10000);
+      assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
+      assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+      message2.processed();
+      session.close();
+   }
+
+   public void testPagedMessageDeliveredCorrectly() throws Exception
+   {
+
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      configuration.setPagingMaxGlobalSizeBytes(0);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSession session = sessionFactory.createSession(false, true, false, false);
+      session.createQueue(atestq, atestq, null, true, true);
+      ClientProducer producer = session.createProducer(atestq);
+      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+                                                          System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString("testINVMCoreClient");
+      message.getBody().flip();
+      message.setDurable(true);
+      Calendar cal = Calendar.getInstance();
+      cal.roll(Calendar.SECOND, 10);
+      producer.send(message, cal.getTimeInMillis());
+
+      producer.close();
+
+      
+      ClientConsumer consumer = session.createConsumer(atestq);
+
+      session.start();
+
+      ClientMessage message2 = consumer.receive(10000);
+      assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
+      assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+      message2.processed();
+      session.close();
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,22 +22,31 @@
 
 package org.jboss.messaging.tests.unit.core.paging.impl;
 
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-
 import org.easymock.EasyMock;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.LastPageRecord;
+import org.jboss.messaging.core.paging.PageMessage;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.paging.impl.PageMessageImpl;
 import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+
 public class PageManagerImplTest extends UnitTestCase
 {
 
@@ -187,6 +196,75 @@
 
    }
 
+   public void testOnDepage() throws Exception
+   {
+      long time = System.currentTimeMillis() + 10000;
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+      MessageReference ref = EasyMock.createStrictMock(MessageReference.class);
+      refs.add(ref);
+      Queue queue = EasyMock.createStrictMock(Queue.class);
+      HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
+      queueSettings.setDefault(new QueueSettings());
+      PostOffice po = EasyMock.createStrictMock(PostOffice.class);
+      PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
+      PagingStore store = EasyMock.createNiceMock(PagingStore.class);
+      StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
+      PagingManagerImpl manager = new PagingManagerImpl(spi, storageManager, queueSettings, -1);
+      manager.setPostOffice(po);
+      ServerMessage message = EasyMock.createStrictMock(ServerMessage.class);
+
+      EasyMock.expect(storageManager.generateUniqueID()).andReturn(1l);
+      EasyMock.expect(po.route(message)).andReturn(refs);
+      EasyMock.expect(message.getDurableRefCount()).andReturn(1);
+      storageManager.storeLastPage(EasyMock.anyLong(), (LastPageRecord) EasyMock.anyObject());
+      storageManager.storeMessageTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject());
+      storageManager.commit(EasyMock.anyLong());
+      EasyMock.expect(ref.getQueue()).andReturn(queue);
+      EasyMock.expect(queue.addLast(ref)).andReturn(null);
+      EasyMock.replay(spi, store, message, storageManager, po, ref, queue);
+      SimpleString queueName = new SimpleString("aq");
+      PageMessageImpl pageMessage = new PageMessageImpl(message);
+
+      manager.onDepage(0, queueName, store, new PageMessage[] {pageMessage} );
+      EasyMock.verify(spi, store, message, storageManager, po, ref, queue);
+   }
+
+   public void testOnDepageScheduledMessage() throws Exception
+   {
+      long time = System.currentTimeMillis() + 10000;
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+      MessageReference ref = EasyMock.createStrictMock(MessageReference.class);
+      refs.add(ref);
+      Queue queue = EasyMock.createStrictMock(Queue.class);
+      HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
+      queueSettings.setDefault(new QueueSettings());
+      PostOffice po = EasyMock.createStrictMock(PostOffice.class);
+      PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
+      PagingStore store = EasyMock.createNiceMock(PagingStore.class);
+      StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
+      PagingManagerImpl manager = new PagingManagerImpl(spi, storageManager, queueSettings, -1);
+      manager.setPostOffice(po);
+      ServerMessage message = EasyMock.createStrictMock(ServerMessage.class);
+
+      EasyMock.expect(storageManager.generateUniqueID()).andReturn(1l);
+      EasyMock.expect(po.route(message)).andReturn(refs);
+      EasyMock.expect(message.getDurableRefCount()).andReturn(1);
+      ref.setScheduledDeliveryTime(time);
+      storageManager.storeLastPage(EasyMock.anyLong(), (LastPageRecord) EasyMock.anyObject());
+      storageManager.storeMessageTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject());
+      storageManager.storeMessageScheduledTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject(), EasyMock.eq(time));
+      storageManager.commit(EasyMock.anyLong());
+      EasyMock.expect(ref.getQueue()).andReturn(queue);
+      queue.addScheduledDelivery(ref);
+      EasyMock.replay(spi, store, message, storageManager, po, ref, queue);
+      SimpleString queueName = new SimpleString("aq");
+      PageMessageImpl pageMessage = new PageMessageImpl(message);
+
+      pageMessage.getProperties().putLongProperty(new SimpleString("JBM_SCHEDULED_DELIVERY_PROP"), time);
+      manager.onDepage(0, queueName, store, new PageMessage[] {pageMessage} );
+      EasyMock.verify(spi, store, message, storageManager, po, ref, queue);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,10 +22,6 @@
 
 package org.jboss.messaging.tests.unit.core.paging.impl;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PageMessage;
@@ -37,6 +33,10 @@
 import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 import org.jboss.messaging.util.SimpleString;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
@@ -113,6 +113,45 @@
 
    }
 
+   public void testStoreWithProperty() throws Exception
+   {
+      SequentialFileFactory factory = new FakeSequentialFileFactory();
+
+      PagingStore storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
+
+      storeImpl.start();
+
+      assertEquals(0, storeImpl.getNumberOfPages());
+
+      storeImpl.startPaging();
+
+      assertEquals(1, storeImpl.getNumberOfPages());
+
+      List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+
+      ByteBuffer buffer = createRandomBuffer(0, 10);
+
+      buffers.add(buffer);
+      SimpleString destination = new SimpleString("test");
+
+      PageMessageImpl msg = createMessage(destination, buffer);
+      msg.getProperties().putLongProperty(new SimpleString("test-property"), 12345l);
+      assertTrue(storeImpl.isPaging());
+
+      assertTrue(storeImpl.page(msg));
+
+      assertEquals(1, storeImpl.getNumberOfPages());
+
+      storeImpl.sync();
+
+      storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
+
+      storeImpl.start();
+
+      assertEquals(2, storeImpl.getNumberOfPages());
+
+   }
+
    public void testDepageOnCurrentPage() throws Exception
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-10-06 14:27:34 UTC (rev 5074)
@@ -23,13 +23,24 @@
 package org.jboss.messaging.tests.unit.core.server.impl;
 
 import org.easymock.EasyMock;
-import static org.easymock.EasyMock.*;
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.isA;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.*;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.QueueImpl;
 import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -741,7 +752,7 @@
 
          refs.add(ref);
 
-         queue.addLast(ref);
+         queue.addScheduledDelivery(ref);
       }
 
 




More information about the jboss-cvs-commits mailing list