[jboss-cvs] JBoss Messaging SVN: r5074 - in trunk: examples/jms and 20 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Oct 6 10:27:34 EDT 2008
Author: ataylor
Date: 2008-10-06 10:27:34 -0400 (Mon, 06 Oct 2008)
New Revision: 5074
Added:
trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java
trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
Modified:
trunk/build-messaging.xml
trunk/build.xml
trunk/examples/jms/build.xml
trunk/examples/messaging/build.xml
trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1203 - re enabled scheduling
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/build-messaging.xml 2008-10-06 14:27:34 UTC (rev 5074)
@@ -968,6 +968,10 @@
<ant dir="${examples.dir}/jms" antfile="build.xml" target="messageGroupingExample"/>
</target>
+ <target name="scheduledExample" depends="client-jar">
+ <ant dir="${examples.dir}/jms" antfile="build.xml" target="scheduledExample"/>
+ </target>
+
<target name="SimpleClient" depends="client-jar">
<ant dir="${examples.dir}/messaging" antfile="build.xml" target="SimpleClient"/>
</target>
@@ -986,8 +990,11 @@
<target name="WildCardClient" depends="jar, client-jar">
<ant dir="${examples.dir}/messaging" antfile="build.xml" target="WildCardClient"/>
+ </target>
+
+ <target name="ScheduledMessageExample" depends="jar, client-jar">
+ <ant dir="${examples.dir}/messaging" antfile="build.xml" target="ScheduledMessageExample"/>
</target>
-
<!-- Performance examples -->
<target name="perfListener" depends="client-jar">
Modified: trunk/build.xml
===================================================================
--- trunk/build.xml 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/build.xml 2008-10-06 14:27:34 UTC (rev 5074)
@@ -194,10 +194,18 @@
<ant antfile="build-messaging.xml" target="wildcardExample"/>
</target>
+ <target name="ScheduledMessageExample" depends="createthirdparty">
+ <ant antfile="build-messaging.xml" target="ScheduledMessageExample"/>
+ </target>
+
<target name="messageGroupingExample" depends="createthirdparty">
<ant antfile="build-messaging.xml" target="messageGroupingExample"/>
</target>
+ <target name="scheduledExample" depends="createthirdparty">
+ <ant antfile="build-messaging.xml" target="scheduledExample"/>
+ </target>
+
<target name="perfListener" depends="createthirdparty">
<ant antfile="build-messaging.xml" target="perfListener"/>
</target>
Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/examples/jms/build.xml 2008-10-06 14:27:34 UTC (rev 5074)
@@ -135,6 +135,12 @@
</java>
</target>
+ <target name="scheduledExample" depends="compile" description="-> point to point example using a queue">
+ <java classname="org.jboss.jms.example.ScheduledExample" fork="true">
+ <classpath refid="runtime.classpath"/>
+ </java>
+ </target>
+
<target name="echo-params">
<echo>
***********************************************************************************
Added: trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java (rev 0)
+++ trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -0,0 +1,95 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.example;
+
+import org.jboss.messaging.core.logging.Logger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledExample
+{
+ final static Logger log = Logger.getLogger(ScheduledExample.class);
+
+ public static void main(final String[] args)
+ {
+ DateFormat df = new SimpleDateFormat("hh:mm:ss");
+ Connection connection = null;
+ try
+ {
+ //create an initial context, env will be picked up from jndi.properties
+ InitialContext initialContext = new InitialContext();
+ Queue queue = (Queue) initialContext.lookup("/queue/testQueue");
+ ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
+
+ connection = cf.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createTextMessage("This is a text message!");
+ Calendar cal = Calendar.getInstance();
+ log.info("current time " + df.format(cal.getTime()));
+ cal.roll(Calendar.SECOND, 5);
+ log.info("message scheduled for " + df.format(cal.getTime()));
+ message.setLongProperty("JBM_SCHEDULED_DELIVERY_TIME", cal.getTimeInMillis());
+ log.info("sending message to queue");
+ producer.send(message);
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ connection.start();
+ TextMessage message2 = (TextMessage) messageConsumer.receive(7000);
+ log.info("message received at " + df.format(Calendar.getInstance().getTime()));
+ log.info("message = " + message2.getText());
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ if(connection != null)
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
Modified: trunk/examples/messaging/build.xml
===================================================================
--- trunk/examples/messaging/build.xml 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/examples/messaging/build.xml 2008-10-06 14:27:34 UTC (rev 5074)
@@ -109,4 +109,10 @@
</java>
</target>
+ <target name="ScheduledMessageExample" depends="compile">
+ <java classname="org.jboss.messaging.example.ScheduledMessageExample" fork="true">
+ <classpath refid="runtime.classpath"/>
+ </java>
+ </target>
+
</project>
\ No newline at end of file
Added: trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java (rev 0)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -0,0 +1,89 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.example;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledMessageExample
+{
+ private static final Logger log = Logger.getLogger(ScheduledMessageExample.class);
+ public static void main(final String[] args)
+ {
+ ClientSession clientSession = null;
+ DateFormat df = new SimpleDateFormat("hh:mm:ss");
+ try
+ {
+ ClientSessionFactory sessionFactory =
+ new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"));
+ clientSession = sessionFactory.createSession(false, true, true, false);
+ SimpleString queue = new SimpleString("queuejms.testQueue");
+ ClientProducer clientProducer = clientSession.createProducer(queue);
+ ClientMessage message = clientSession.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("Hello!");
+ Calendar cal = Calendar.getInstance();
+ log.info("current time " + df.format(cal.getTime()));
+ cal.roll(Calendar.SECOND, 5);
+ log.info("message scheduled for " + df.format(cal.getTime()));
+ clientProducer.send(message, cal.getTimeInMillis());
+ ClientConsumer clientConsumer = clientSession.createConsumer(queue);
+ clientSession.start();
+ ClientMessage msg = clientConsumer.receive(7000);
+ log.info("message received at " + df.format(Calendar.getInstance().getTime()));
+ msg.processed();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ if (clientSession != null)
+ {
+ try
+ {
+ clientSession.close();
+ }
+ catch (MessagingException ignore)
+ {
+ }
+ }
+ }
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -36,7 +36,11 @@
void send(ClientMessage message) throws MessagingException;
void send(SimpleString address, ClientMessage message) throws MessagingException;
+
+ void send(final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException;
+ void send(final SimpleString address, final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException;
+
void sendManagement(ClientMessage mngmntMessage) throws MessagingException;
void registerAcknowledgementHandler(AcknowledgementHandler handler);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -19,6 +19,7 @@
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.util.SimpleString;
@@ -120,17 +121,31 @@
{
checkClosed();
- doSend(null, msg);
+ doSend(null, msg, 0);
}
public void send(final SimpleString address, final ClientMessage msg) throws MessagingException
{
checkClosed();
- doSend(address, msg);
+ doSend(address, msg, 0);
}
- // use a special wireformat packet to send management message (on the server-side they are
+ public void send(final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
+ {
+ checkClosed();
+
+ doSend(null, msg, scheduleDeliveryTime);
+ }
+
+ public void send(final SimpleString address, final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
+ {
+ checkClosed();
+
+ doSend(address, msg, scheduleDeliveryTime);
+ }
+
+ // use a special wireformat packet to sendScheduled management message (on the server-side they are
// handled by the server session differently from regular Client Message)
public void sendManagement(final ClientMessage msg) throws MessagingException
{
@@ -272,7 +287,7 @@
closed = true;
}
- private void doSend(final SimpleString address, final ClientMessage msg) throws MessagingException
+ private void doSend(final SimpleString address, final ClientMessage msg, long scheduledDeliveryTime) throws MessagingException
{
if (address != null)
{
@@ -297,8 +312,18 @@
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
- SessionSendMessage message = new SessionSendMessage(id, msg, sendBlocking);
+ SessionSendMessage message;
+ //check to see if this message need to be scheduled.
+ if(scheduledDeliveryTime <= 0)
+ {
+ message = new SessionSendMessage(id, msg, sendBlocking);
+ }
+ else
+ {
+ message = new SessionScheduledSendMessage(id, msg, sendBlocking, scheduledDeliveryTime);
+ }
+
if (sendBlocking)
{
channel.sendBlocking(message);
Modified: trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.TypedProperties;
/**
*
@@ -39,4 +40,6 @@
long getTransactionID();
+ TypedProperties getProperties();
+
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,14 +22,14 @@
package org.jboss.messaging.core.paging;
-import java.util.Collection;
-
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessagingComponent;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.util.SimpleString;
+import java.util.Collection;
+
/**
*
* <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
@@ -52,6 +52,7 @@
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*
*/
public interface PagingManager extends MessagingComponent
@@ -91,8 +92,22 @@
* @param message
* @return false if destination is not on page mode
*/
+ boolean pageScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception;
+
+ /**
+ * Page, only if destination is in page mode.
+ * @param message
+ * @return false if destination is not on page mode
+ */
boolean page(ServerMessage message, long transactionId) throws Exception;
+ /**
+ * Page, only if destination is in page mode.
+ * @param message
+ * @return false if destination is not on page mode
+ */
+ boolean pageScheduled(ServerMessage message, long transactionId, long scheduledDeliveryTime) throws Exception;
+
/**
* Point to inform/restoring Transactions used when the messages were added into paging
* */
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -27,12 +27,14 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.TypedProperties;
/**
*
* This class is used to encapsulate ServerMessage and TransactionID on Paging
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*
*/
public class PageMessageImpl implements PageMessage
@@ -52,15 +54,19 @@
private long transactionID = -1;
+ private final TypedProperties properties;
+
public PageMessageImpl(final ServerMessage message, final long transactionID)
{
this.message = message;
this.transactionID = transactionID;
+ properties = new TypedProperties();
}
public PageMessageImpl(final ServerMessage message)
{
this.message = message;
+ properties = new TypedProperties();
}
public PageMessageImpl()
@@ -68,6 +74,19 @@
this(new ServerMessageImpl());
}
+ public PageMessageImpl(final ServerMessage message, final TypedProperties properties)
+ {
+ this.message = message;
+ this.properties = properties;
+ }
+
+ public PageMessageImpl(final ServerMessage message, final TypedProperties properties, final long transactionID)
+ {
+ this.message = message;
+ this.transactionID = transactionID;
+ this.properties = properties;
+ }
+
public ServerMessage getMessage()
{
return message;
@@ -78,24 +97,30 @@
return transactionID;
}
+ public TypedProperties getProperties()
+ {
+ return properties;
+ }
+
// EncodingSupport implementation --------------------------------
public void decode(final MessagingBuffer buffer)
{
transactionID = buffer.getLong();
message.decode(buffer);
+ properties.decode(buffer);
}
public void encode(final MessagingBuffer buffer)
{
buffer.putLong(transactionID);
message.encode(buffer);
+ properties.encode(buffer);
}
public int getEncodeSize()
{
-
- return DataConstants.SIZE_LONG + message.getEncodeSize();
+ return DataConstants.SIZE_LONG + message.getEncodeSize() + properties.getEncodeSize();
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,15 +22,6 @@
package org.jboss.messaging.core.paging.impl;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.PageMessage;
@@ -45,12 +36,23 @@
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*
*/
public class PagingManagerImpl implements PagingManager
@@ -91,6 +93,8 @@
// private static final boolean isTrace = log.isTraceEnabled();
private static final boolean isTrace = true;
+ private static final SimpleString SCHEDULED_DELIVERY_PROP = new SimpleString("JBM_SCHEDULED_DELIVERY_PROP");
+
// This is just a debug tool method.
// During debugs you could make log.trace as log.info, and change the
// variable isTrace above
@@ -202,6 +206,7 @@
HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+ final List<MessageReference> scheduledRefsToAdd = new ArrayList<MessageReference>();
for (PageMessage msg : data)
{
@@ -237,12 +242,30 @@
pageTransactionsToUpdate.add(pageTransactionInfo);
}
}
+ Long scheduledDeliveryTime = (Long) msg.getProperties().getProperty(SCHEDULED_DELIVERY_PROP);
+ //if this is a scheduled message we add it to the queue as just that
+ if(scheduledDeliveryTime == null)
+ {
+ refsToAdd.addAll(postOffice.route(msg.getMessage()));
+ }
+ else
+ {
+ List<MessageReference> refs = postOffice.route(msg.getMessage());
+ for (MessageReference ref : refs)
+ {
+ ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+ }
+ scheduledRefsToAdd.addAll(refs);
+ }
- refsToAdd.addAll(postOffice.route(msg.getMessage()));
-
if (msg.getMessage().getDurableRefCount() != 0)
{
storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
+ //write the scheduled message record if needed
+ if(scheduledDeliveryTime != null)
+ {
+ storageManager.storeMessageScheduledTransactional(depageTransactionID, msg.getMessage(), scheduledDeliveryTime);
+ }
}
}
@@ -268,6 +291,10 @@
ref.getQueue().addLast(ref);
}
+ for (MessageReference ref : scheduledRefsToAdd)
+ {
+ ref.getQueue().addScheduledDelivery(ref);
+ }
if (globalMode.get())
{
return globalSize.get() < maxGlobalSize - WATERMARK_GLOBAL_PAGE && pagingStore.getMaxSizeBytes() <= 0 ||
@@ -313,6 +340,20 @@
return getPageStore(message.getDestination()).page(new PageMessageImpl(message));
}
+ public boolean pageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+ {
+ TypedProperties properties = new TypedProperties();
+ properties.putLongProperty(SCHEDULED_DELIVERY_PROP, scheduledDeliveryTime);
+ return getPageStore(message.getDestination()).page(new PageMessageImpl(message, properties));
+ }
+
+ public boolean pageScheduled(final ServerMessage message, final long transactionId, final long scheduledDeliveryTime) throws Exception
+ {
+ TypedProperties properties = new TypedProperties();
+ properties.putLongProperty(SCHEDULED_DELIVERY_PROP, scheduledDeliveryTime);
+ return getPageStore(message.getDestination()).page(new PageMessageImpl(message, properties));
+ }
+
public void addTransaction(final PageTransactionInfo pageTransaction)
{
transactions.put(pageTransaction.getTransactionID(), pageTransaction);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,11 +22,6 @@
package org.jboss.messaging.core.persistence;
-import java.util.List;
-import java.util.Map;
-
-import javax.transaction.xa.Xid;
-
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.postoffice.Binding;
@@ -39,11 +34,16 @@
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.util.SimpleString;
+import javax.transaction.xa.Xid;
+import java.util.List;
+import java.util.Map;
+
/**
*
* A StorageManager
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*
*/
public interface StorageManager extends MessagingComponent
@@ -58,10 +58,14 @@
void storeDelete(long messageID) throws Exception;
+ void storeMessageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception;
+
void storeMessageTransactional(long txID, ServerMessage message) throws Exception;
void storeAcknowledgeTransactional(long txID, long queueID, long messageiD) throws Exception;
+ void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception;
+
void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
/** Used to delete non-messaging data (such as PageTransaction and LasPage) */
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,17 +22,6 @@
package org.jboss.messaging.core.persistence.impl.journal;
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.xa.Xid;
-
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
@@ -70,6 +59,17 @@
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TimeAndCounterIDGenerator;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
*
* A JournalStorageManager
@@ -222,6 +222,11 @@
messageJournal.appendDeleteRecord(messageID);
}
+ public void storeMessageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+ {
+ messageJournal.appendUpdateRecord(message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME, new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+ }
+
// Transactional operations
public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
@@ -270,6 +275,11 @@
messageJournal.appendDeleteRecordTransactional(txID, recordID, null);
}
+ public void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+ {
+ messageJournal.appendUpdateRecordTransactional(txID, message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME, new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+ }
+
public void storeDeleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
@@ -309,7 +319,7 @@
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
messageJournal.load(records, preparedTransactions);
-
+ Map<Long, List<MessageReference>> routedRefs = new HashMap<Long, List<MessageReference>>();
for (RecordInfo record : records)
{
byte[] data = record.data;
@@ -335,6 +345,7 @@
ref.getQueue().addLast(ref);
}
+ routedRefs.put(record.id, refs);
break;
}
case ACKNOWLEDGE_REF:
@@ -417,7 +428,18 @@
}
case SET_SCHEDULED_DELIVERY_TIME:
{
- // TODO
+ ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
+ scheduledDeliveryEncoding.decode(buff);
+ List<MessageReference> refs = routedRefs.get(record.id);
+ //for any references that have already been routed, we need to remove them from t he queue and re add them as scheduled
+ for (MessageReference ref : refs)
+ {
+ ref.getQueue().removeReferenceWithID(record.id);
+ ref.setScheduledDeliveryTime(scheduledDeliveryEncoding.getScheduledDeliveryTime());
+ ref.getQueue().addScheduledDelivery(ref);
+ }
+
+ break;
}
default:
{
@@ -633,8 +655,12 @@
List<MessageReference> messages = new ArrayList<MessageReference>();
+ List<MessageReference> scheduledMessages = new ArrayList<MessageReference>();
+
List<MessageReference> messagesToAck = new ArrayList<MessageReference>();
+ Map<Long, List<MessageReference>> routedRefs = new HashMap<Long, List<MessageReference>>();
+
PageTransactionInfoImpl pageTransactionInfo = null;
// first get any sent messages for this tx and recreate
@@ -660,6 +686,8 @@
messages.addAll(refs);
+ routedRefs.put(record.id, refs);
+
break;
}
case ACKNOWLEDGE_REF:
@@ -698,6 +726,19 @@
break;
}
+ case SET_SCHEDULED_DELIVERY_TIME:
+ {
+ ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
+ scheduledDeliveryEncoding.decode(buff);
+ List<MessageReference> refs = routedRefs.get(record.id);
+ //for any references that have already been routed, we need to remove them from the queue and re add them as scheduled
+ for (MessageReference ref : refs)
+ {
+ ref.setScheduledDeliveryTime(scheduledDeliveryEncoding.getScheduledDeliveryTime());
+ scheduledMessages.add(ref);
+ }
+ break;
+ }
default:
log.warn("InternalError: Record type " + recordType +
" not recognized. Maybe you're using journal files created on a different version");
@@ -736,7 +777,7 @@
}
// now we recreate the state of the tx and add to the resource manager
- tx.replay(messages, messagesToAck, pageTransactionInfo, Transaction.State.PREPARED);
+ tx.replay(messages, scheduledMessages, messagesToAck, pageTransactionInfo, Transaction.State.PREPARED);
resourceManager.putTransaction(xid, tx);
}
@@ -968,4 +1009,40 @@
super(queueID);
}
}
+ private static class ScheduledDeliveryEncoding implements EncodingSupport
+ {
+ long messageId;
+ long scheduledDeliveryTime;
+
+ private ScheduledDeliveryEncoding(long messageId, long scheduledDeliveryTime)
+ {
+ this.messageId = messageId;
+ this.scheduledDeliveryTime = scheduledDeliveryTime;
+ }
+
+ public ScheduledDeliveryEncoding(long messageId)
+ {
+ this.messageId = messageId;
+ }
+
+ public int getEncodeSize()
+ {
+ return 8;
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ buffer.putLong(scheduledDeliveryTime);
+ }
+
+ public void decode(MessagingBuffer buffer)
+ {
+ scheduledDeliveryTime = buffer.getLong();
+ }
+
+ public long getScheduledDeliveryTime()
+ {
+ return scheduledDeliveryTime;
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,11 +22,6 @@
package org.jboss.messaging.core.persistence.impl.nullpm;
-import java.util.List;
-import java.util.Map;
-
-import javax.transaction.xa.Xid;
-
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.PageTransactionInfo;
@@ -42,6 +37,10 @@
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TimeAndCounterIDGenerator;
+import javax.transaction.xa.Xid;
+import java.util.List;
+import java.util.Map;
+
/**
*
* A NullStorageManager
@@ -110,7 +109,11 @@
{
}
- public void storeDelete(long messageID) throws Exception
+ public void storeMessageScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception
+ {
+ }
+
+ public void storeDelete(long messageID) throws Exception
{
}
@@ -127,10 +130,14 @@
{
}
- public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
+ public void storeMessageScheduledTransactional(long txID, ServerMessage message, long scheduledDeliveryTime) throws Exception
{
}
+ public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
+ {
+ }
+
public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
{
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -12,6 +12,21 @@
package org.jboss.messaging.core.remoting.impl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.ResponseNotifier;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.DuplicablePacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
@@ -49,6 +64,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SCHEDULED_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
@@ -68,37 +84,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.ResponseNotifier;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.DuplicablePacket;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
@@ -130,6 +115,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
@@ -152,6 +138,21 @@
import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.SimpleIDGenerator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
/**
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -777,6 +778,11 @@
packet = new SessionBrowserCloseMessage();
break;
}
+ case SESS_SCHEDULED_SEND:
+ {
+ packet = new SessionScheduledSendMessage();
+ break;
+ }
case NULL_RESPONSE:
{
packet = new NullResponseMessage();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -45,7 +45,7 @@
public static final byte EXCEPTION = 20;
public static final byte NULL_RESPONSE = 21;
-
+
public static final byte PACKETS_CONFIRMED = 22;
@@ -53,7 +53,7 @@
public static final byte CREATESESSION = 30;
public static final byte CREATESESSION_RESP = 31;
-
+
public static final byte REATTACH_SESSION = 32;
public static final byte REATTACH_SESSION_RESP = 33;
@@ -76,7 +76,7 @@
public static final byte SESS_COMMIT = 47;
public static final byte SESS_ROLLBACK = 48;
-
+
public static final byte SESS_QUEUEQUERY = 49;
public static final byte SESS_QUEUEQUERY_RESP = 50;
@@ -135,10 +135,10 @@
public static final byte SESS_XA_GET_TIMEOUT_RESP = 77;
- public static final byte SESS_START = 78;
+ public static final byte SESS_START = 78;
public static final byte SESS_STOP = 79;
-
+
public static final byte SESS_CLOSE = 80;
public static final byte SESS_FLOWTOKEN = 81;
@@ -156,7 +156,9 @@
public static final byte SESS_RECEIVE_MSG = 87;
public static final byte SESS_MANAGEMENT_SEND = 88;
-
+
+ public static final byte SESS_SCHEDULED_SEND = 91;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
@@ -217,34 +219,34 @@
public void decodeBody(final MessagingBuffer buffer)
{
}
-
+
public boolean isRequiresConfirmations()
{
return true;
}
-
+
public boolean isReplicateBlocking()
{
return false;
}
-
+
public boolean isWriteAlways()
{
return false;
}
-
+
public boolean isReHandleResponseOnFailure()
{
return false;
}
-
+
public boolean isDuplicate()
{
return false;
}
-
+
public void setDuplicate(final boolean duplicate)
- {
+ {
}
@Override
@@ -265,7 +267,7 @@
return r.type == type && r.channelID == channelID;
}
-
+
// Package protected ---------------------------------------------
protected String getParentString()
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class SessionScheduledSendMessage extends SessionSendMessage
+{
+ private long scheduledDeliveryTime;
+
+ public SessionScheduledSendMessage(final long producerID, final ClientMessage message, final boolean requiresResponse, final long scheduledDeliveryTime)
+ {
+ super(SESS_SCHEDULED_SEND, producerID, message, requiresResponse);
+ this.scheduledDeliveryTime = scheduledDeliveryTime;
+ }
+
+ public SessionScheduledSendMessage()
+ {
+ super(SESS_SCHEDULED_SEND);
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ super.encodeBody(buffer);
+ buffer.putLong(scheduledDeliveryTime);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ super.decodeBody(buffer);
+ scheduledDeliveryTime = buffer.getLong();
+ }
+
+ public long getScheduledDeliveryTime()
+ {
+ return scheduledDeliveryTime;
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -70,6 +70,22 @@
super(SESS_SEND);
}
+ protected SessionSendMessage(final byte type, final long producerID, final ClientMessage message, final boolean requiresResponse)
+ {
+ super(type);
+
+ this.producerID = producerID;
+
+ this.clientMessage = message;
+
+ this.requiresResponse = requiresResponse;
+ }
+
+ protected SessionSendMessage(byte type)
+ {
+ super(type);
+ }
+
// Public --------------------------------------------------------
public boolean isReHandleResponseOnFailure()
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,11 +22,6 @@
package org.jboss.messaging.core.server;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
@@ -36,7 +31,12 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
/**
*
* A Queue
@@ -147,4 +147,5 @@
MessageReference removeFirst();
boolean consumerFailedOver();
+ void addScheduledDelivery(MessageReference ref);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -28,6 +28,7 @@
* A ServerProducer
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*
*/
public interface ServerProducer
@@ -37,6 +38,8 @@
void close() throws Exception;
void send(ServerMessage msg) throws Exception;
+
+ void sendScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception;
void requestAndSendCredits() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -23,7 +23,12 @@
package org.jboss.messaging.core.server;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
import org.jboss.messaging.core.server.impl.ServerBrowserImpl;
import org.jboss.messaging.util.SimpleString;
@@ -31,10 +36,11 @@
import java.util.List;
/**
- *
+ *
* A ServerSession
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*
*/
public interface ServerSession
@@ -59,6 +65,8 @@
void send(ServerMessage msg) throws Exception;
+ void sendScheduled(ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
+
void processed(final long consumerID, final long messageID) throws Exception;
void rollback() throws Exception;
@@ -82,7 +90,7 @@
SessionXAResponseMessage XAStart(Xid xid);
SessionXAResponseMessage XASuspend() throws Exception;
-
+
List<Xid> getInDoubtXids() throws Exception;
int getXATimeout();
@@ -137,6 +145,8 @@
void sendProducerMessage(long producerID, ServerMessage message) throws Exception;
+ void sendScheduledProducerMessage(long producerID, ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
+
boolean browserHasNextMessage(long browserID) throws Exception;
ServerMessage browserNextMessage(long browserID) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -12,23 +12,6 @@
package org.jboss.messaging.core.server.impl;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.list.PriorityLinkedList;
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
@@ -49,6 +32,23 @@
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.SimpleString;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Implementation of a Queue TODO use Java 5 concurrent queue
*
@@ -690,6 +690,13 @@
return ref;
}
+ public void addScheduledDelivery(MessageReference ref)
+ {
+ ScheduledDeliveryRunnable runner = new ScheduledDeliveryRunnable(ref);
+ scheduledRunnables.add(runner);
+ scheduleDelivery(runner, ref.getScheduledDeliveryTime());
+ }
+
// Public
// -----------------------------------------------------------------------------
@@ -722,11 +729,6 @@
sizeBytes.addAndGet(ref.getMessage().getEncodeSize());
}
- if (checkAndSchedule(ref))
- {
- return HandleStatus.HANDLED;
- }
-
boolean add = false;
if (direct && !backup)
@@ -784,31 +786,6 @@
return HandleStatus.HANDLED;
}
- private boolean checkAndSchedule(final MessageReference ref)
- {
- long deliveryTime = ref.getScheduledDeliveryTime();
-
- if (deliveryTime != 0 && scheduledExecutor != null)
- {
- if (trace)
- {
- log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
- }
-
- ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
-
- scheduledRunnables.add(runnable);
-
- if (!backup)
- {
- scheduleDelivery(runnable, deliveryTime);
- }
-
- return true;
- }
- return false;
- }
-
private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
{
long now = System.currentTimeMillis();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,8 +22,6 @@
package org.jboss.messaging.core.server.impl;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.remoting.Channel;
@@ -34,11 +32,14 @@
import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.util.SimpleString;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
*
* A ServerProducerImpl
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*
*/
public class ServerProducerImpl implements ServerProducer
@@ -95,26 +96,20 @@
}
public void send(final ServerMessage message) throws Exception
- {
- if (this.address != null)
- {
- //Only do flow control with non anonymous producers
-
- if (flowController != null)
- {
- int creds = creditsToSend.addAndGet(message.getEncodeSize());
-
- if (creds >= windowSize)
- {
- requestAndSendCredits();
- }
- }
- }
-
- session.send(message);
+ {
+ doFlowControl(message);
+
+ session.send(message);
}
-
- public void requestAndSendCredits() throws Exception
+
+ public void sendScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+ {
+ doFlowControl(message);
+
+ session.sendScheduled(message, scheduledDeliveryTime);
+ }
+
+ public void requestAndSendCredits() throws Exception
{
if (!waiting)
{
@@ -140,4 +135,24 @@
{
return waiting;
}
+
+
+
+ private void doFlowControl(final ServerMessage message) throws Exception
+ {
+ if (this.address != null)
+ {
+ //Only do flow control with non anonymous producers
+
+ if (flowController != null)
+ {
+ int creds = creditsToSend.addAndGet(message.getEncodeSize());
+
+ if (creds >= windowSize)
+ {
+ requestAndSendCredits();
+ }
+ }
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -27,11 +27,22 @@
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
import org.jboss.messaging.core.security.CheckType;
import org.jboss.messaging.core.security.SecurityStore;
-import org.jboss.messaging.core.server.*;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerConsumer;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.ServerProducer;
+import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
@@ -48,7 +59,11 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -58,6 +73,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*/
public class ServerSessionImpl implements ServerSession, FailureListener, NotificationListener
@@ -298,42 +314,62 @@
public void send(final ServerMessage msg) throws Exception
{
// check the user has write access to this address.
- try
+ doSecurity(msg);
+
+ if (autoCommitSends)
{
- securityStore.check(msg.getDestination(), CheckType.WRITE, this);
- }
- catch (MessagingException e)
- {
- if (!autoCommitSends)
+ if (!pager.page(msg))
{
- tx.markAsRollbackOnly(e);
+ List<MessageReference> refs = postOffice.route(msg);
+
+ if (msg.getDurableRefCount() != 0)
+ {
+ storageManager.storeMessage(msg);
+ }
+
+ for (MessageReference ref : refs)
+ {
+ ref.getQueue().addLast(ref);
+ }
}
- throw e;
}
+ else
+ {
+ tx.addMessage(msg);
+ }
+ }
+
+ public void sendScheduled(final ServerMessage msg, final long scheduledDeliveryTime) throws Exception
+ {
+ doSecurity(msg);
+
if (autoCommitSends)
{
- if (!pager.page(msg))
+ if (!pager.pageScheduled(msg, scheduledDeliveryTime))
{
List<MessageReference> refs = postOffice.route(msg);
if (msg.getDurableRefCount() != 0)
{
storageManager.storeMessage(msg);
+ storageManager.storeMessageScheduled(msg, scheduledDeliveryTime);
}
for (MessageReference ref : refs)
{
- ref.getQueue().addLast(ref);
+ ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+ ref.getQueue().addScheduledDelivery(ref);
}
}
}
else
{
- tx.addMessage(msg);
+ tx.addScheduledMessage(msg, scheduledDeliveryTime);
}
}
+
public void processed(final long consumerID, final long messageID) throws Exception
{
MessageReference ref = consumers.get(consumerID).waitForReference(messageID);
@@ -367,7 +403,7 @@
}
}
}
-
+
public void rollback() throws Exception
{
rollback(true);
@@ -463,7 +499,7 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
-
+
public SessionXAResponseMessage XAEnd(final Xid xid, final boolean failed) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
@@ -504,7 +540,7 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
-
+
public SessionXAResponseMessage XAForget(final Xid xid)
{
// Do nothing since we don't support heuristic commits / rollback from the
@@ -907,7 +943,7 @@
return response;
}
-
+
// public SessionCreateConsumerResponseMessage recreateConsumer(final SimpleString queueName,
// final SimpleString filterString,
// int windowSize,
@@ -1014,7 +1050,7 @@
/**
* Create a producer for the specified address
- *
+ *
* @param address The address to produce too
* @param windowSize - the producer window size to use for flow control. Specify -1 to disable flow control
* completely The actual window size used may be less than the specified window size if it is overridden by
@@ -1062,7 +1098,7 @@
}
return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
}
-
+
// public SessionCreateProducerResponseMessage recreateProducer(final SimpleString address,
// final int windowSize,
// final int maxRate,
@@ -1078,7 +1114,7 @@
// }
//
// final int windowToUse = flowController == null ? -1 : windowSize;
-//
+//
// // Get some initial credits to send to the producer - we try for
// // windowToUse
//
@@ -1133,6 +1169,11 @@
producers.get(producerID).send(message);
}
+ public void sendScheduledProducerMessage(final long producerID, final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+ {
+ producers.get(producerID).sendScheduled(message, scheduledDeliveryTime);
+ }
+
public int transferConnection(final RemotingConnection newConnection)
{
remotingConnection.removeFailureListener(this);
@@ -1147,11 +1188,11 @@
remotingConnection = newConnection;
remotingConnection.addFailureListener(this);
-
+
int lastReceivedCommandID = channel.getLastReceivedCommandID();
-
+
//TODO resend any dup responses
-
+
return lastReceivedCommandID;
}
@@ -1293,4 +1334,20 @@
queue.referenceAcknowledged(ref);
}
+
+ private void doSecurity(final ServerMessage msg) throws Exception
+ {
+ try
+ {
+ securityStore.check(msg.getDestination(), CheckType.WRITE, this);
+ }
+ catch (MessagingException e)
+ {
+ if (!autoCommitSends)
+ {
+ tx.markAsRollbackOnly(e);
+ }
+ throw e;
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -12,6 +12,14 @@
package org.jboss.messaging.core.server.impl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
@@ -33,6 +41,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SCHEDULED_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
@@ -48,19 +57,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import java.util.List;
-
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
@@ -80,6 +76,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
@@ -97,11 +94,15 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
+import javax.transaction.xa.Xid;
+import java.util.List;
+
/**
* A ServerSessionPacketHandler
- *
+ *
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*/
public class ServerSessionPacketHandler implements ChannelHandler
{
@@ -134,12 +135,12 @@
{
byte type = packet.getType();
- if (type == SESS_SEND)
+ if (type == SESS_SEND || type == SESS_SCHEDULED_SEND)
{
SessionSendMessage send = (SessionSendMessage)packet;
ServerMessage msg = send.getServerMessage();
-
+
if (msg.getMessageID() == 0L)
{
// must generate message id here, so we know they are in sync
@@ -148,13 +149,13 @@
send.getServerMessage().setMessageID(id);
}
}
-
+
Packet response = null;
try
{
- channel.replicatePacket(packet);
-
+ channel.replicatePacket(packet);
+
switch (type)
{
case SESS_CREATECONSUMER:
@@ -227,7 +228,7 @@
}
case SESS_ROLLBACK:
{
- session.rollback();
+ session.rollback();
//Rollback response is handled in the rollback() method
break;
}
@@ -369,6 +370,16 @@
}
break;
}
+ case SESS_SCHEDULED_SEND:
+ {
+ SessionScheduledSendMessage message = (SessionScheduledSendMessage)packet;
+ session.sendScheduledProducerMessage(message.getProducerID(), message.getServerMessage(), message.getScheduledDeliveryTime());
+ if (message.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
+ break;
+ }
case SESS_BROWSER_HASNEXTMESSAGE:
{
SessionBrowserHasNextMessageMessage message = (SessionBrowserHasNextMessageMessage)packet;
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -36,6 +36,7 @@
* A JBoss Messaging internal transaction
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*/
public interface Transaction
{
@@ -67,8 +68,10 @@
void markAsRollbackOnly(MessagingException messagingException);
- void replay(List<MessageReference> messages, List<MessageReference> acknowledgements, PageTransactionInfo pageTransaction, State prepared) throws Exception;
+ void replay(List<MessageReference> messages, List<MessageReference> scheduledMessages, List<MessageReference> acknowledgements, PageTransactionInfo pageTransaction, State prepared) throws Exception;
+ void addScheduledMessage(ServerMessage msg, long scheduledDeliveryTime) throws Exception;
+
static enum State
{
ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY;
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -28,12 +28,18 @@
import org.jboss.messaging.util.SimpleString;
import javax.transaction.xa.Xid;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
/**
* A TransactionImpl
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*/
public class TransactionImpl implements Transaction
{
@@ -51,6 +57,10 @@
private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
+ private final Map<ServerMessage, Long> scheduledPagedMessages = new HashMap<ServerMessage, Long>();
+
+ private final Map<MessageReference, Long> scheduledReferences = new HashMap<MessageReference, Long>();
+
private PageTransactionInfo pageTransaction;
private final Xid xid;
@@ -148,6 +158,32 @@
}
}
+ public void addScheduledMessage(final ServerMessage message, long scheduledDeliveryTime) throws Exception
+ {
+ if (state != State.ACTIVE)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
+
+ if (pagingManager.isPaging(message.getDestination()))
+ {
+ scheduledPagedMessages.put(message, scheduledDeliveryTime);
+ }
+ else
+ {
+ List<MessageReference> refs = route(message);
+
+ if (message.getDurableRefCount() != 0)
+ {
+ storageManager.storeMessageScheduledTransactional(id, message, scheduledDeliveryTime);
+ }
+ for (MessageReference ref : refs)
+ {
+ scheduledReferences.put(ref, scheduledDeliveryTime);
+ }
+ }
+ }
+
public void addAcknowledgement(final MessageReference acknowledgement) throws Exception
{
if (state != State.ACTIVE)
@@ -259,7 +295,16 @@
for (MessageReference ref : refsToAdd)
{
- ref.getQueue().addLast(ref);
+ Long scheduled = scheduledReferences.get(ref);
+ if(scheduled == null)
+ {
+ ref.getQueue().addLast(ref);
+ }
+ else
+ {
+ ref.setScheduledDeliveryTime(scheduled);
+ ref.getQueue().addScheduledDelivery(ref);
+ }
}
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
@@ -403,12 +448,17 @@
}
public void replay(List<MessageReference> messages,
+ List<MessageReference> scheduledMessages,
List<MessageReference> acknowledgements,
PageTransactionInfo pageTransaction,
State prepared) throws Exception
{
containsPersistent = true;
refsToAdd.addAll(messages);
+ for (MessageReference scheduledMessage : scheduledMessages)
+ {
+ this.scheduledReferences.put(scheduledMessage, scheduledMessage.getScheduledDeliveryTime());
+ }
this.acknowledgements.addAll(acknowledgements);
this.pageTransaction = pageTransaction;
@@ -428,7 +478,7 @@
// Private
// -------------------------------------------------------------------
- private void route(final ServerMessage message) throws Exception
+ private List<MessageReference> route(final ServerMessage message) throws Exception
{
List<MessageReference> refs = postOffice.route(message);
@@ -440,6 +490,7 @@
containsPersistent = true;
}
+ return refs;
}
private void pageMessages() throws Exception
@@ -481,6 +532,37 @@
}
}
+ for (ServerMessage message : scheduledPagedMessages.keySet())
+ {
+ long scheduledDeliveryTime = scheduledPagedMessages.get(message);
+ // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+ // Explained under Transaction On Paging. (This is the item B)
+ if (pagingManager.pageScheduled(message, id, scheduledDeliveryTime))
+ {
+ if (message.isDurable())
+ {
+ // We only create pageTransactions if using persistent messages
+ pageTransaction.increment();
+ pagingPersistent = true;
+ pagedDestinationsToSync.add(message.getDestination());
+ }
+ }
+ else
+ {
+ // This could happen when the PageStore left the pageState
+ List<MessageReference> refs = route(message);
+
+ if (message.getDurableRefCount() != 0)
+ {
+ storageManager.storeMessageScheduledTransactional(id, message, scheduledDeliveryTime);
+ }
+ for (MessageReference ref : refs)
+ {
+ scheduledReferences.put(ref, scheduledDeliveryTime);
+ }
+ }
+ }
+
if (pagingPersistent)
{
containsPersistent = true;
@@ -499,5 +581,9 @@
acknowledgements.clear();
pagedMessages.clear();
+
+ scheduledPagedMessages.clear();
+
+ scheduledReferences.clear();
}
}
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -23,9 +23,20 @@
import org.jboss.messaging.jms.JBossDestination;
import org.jboss.messaging.util.SimpleString;
-import javax.jms.*;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
/**
* Implementation of a JMS Message JMS Messages only live on the client side - the server only deals with MessageImpl
@@ -66,6 +77,8 @@
// Used when bridging a message
public static final String JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST = "JBM_BRIDGE_MSG_ID_LIST";
+ public static final String JBM_SCHEDULED_DELIVERY_TIME = "JBM_SCHEDULED_DELIVERY_TIME";
+
public static final byte TYPE = 0;
// Static --------------------------------------------------------
@@ -162,6 +175,8 @@
// Cache it
private String jmsType;
+ private long scheduledDeliveryTime = 0;
+
// Constructors --------------------------------------------------
/**
* constructors for test purposes only
@@ -363,6 +378,17 @@
return jmsCorrelationID;
}
+ public long getScheduledDeliveryTime()
+ {
+ return scheduledDeliveryTime;
+ }
+
+ public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+ {
+ message.putLongProperty(new SimpleString(JBM_SCHEDULED_DELIVERY_TIME), scheduledDeliveryTime);
+ this.scheduledDeliveryTime = scheduledDeliveryTime;
+ }
+
public Destination getJMSReplyTo() throws JMSException
{
if (replyTo == null)
@@ -806,6 +832,10 @@
{
Long l = new Long(value);
checkProperty(name, l);
+ if(JBM_SCHEDULED_DELIVERY_TIME.equals(name))
+ {
+ scheduledDeliveryTime = l;
+ }
message.putLongProperty(new SimpleString(name), value);
}
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,7 +22,14 @@
package org.jboss.messaging.jms.client;
-import java.util.concurrent.atomic.AtomicLong;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.jms.JBossDestination;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UUIDGenerator;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
@@ -40,16 +47,8 @@
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
+import java.util.concurrent.atomic.AtomicLong;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.jms.JBossDestination;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.UUIDGenerator;
-
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -448,10 +447,18 @@
{
coreMessage.putStringProperty(JBossConnection.CONNECTION_ID_PROPERTY_NAME, connID);
}
-
+
try
{
- producer.send(address, coreMessage);
+ //check to see if this message needs to be scheduled
+ if(jbm.getScheduledDeliveryTime() > 0)
+ {
+ producer.send(address, coreMessage, jbm.getScheduledDeliveryTime());
+ }
+ else
+ {
+ producer.send(address, coreMessage);
+ }
}
catch (MessagingException e)
{
Added: trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.timing.core.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.util.id.GUID;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.util.Calendar;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ScheduledMessageTest extends UnitTestCase
+{
+ private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+
+ private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+
+ private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/journal";
+
+ private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/bindings";
+
+ private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/page";
+
+ private SimpleString atestq = new SimpleString("ascheduledtestq");
+
+ private MessagingService messagingService;
+
+ private ConfigurationImpl configuration;
+
+ protected void setUp() throws Exception
+ {
+ File file = new File(journalDir);
+ File file2 = new File(bindingsDir);
+ File file3 = new File(pageDir);
+ deleteDirectory(file);
+ file.mkdirs();
+ deleteDirectory(file2);
+ file2.mkdirs();
+ deleteDirectory(file3);
+ file3.mkdirs();
+ configuration = new ConfigurationImpl();
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setPagingDirectory(pageDir);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (messagingService != null)
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Exception e)
+ {
+ //ignore
+ }
+ }
+ new File(journalDir).delete();
+ new File(bindingsDir).delete();
+ new File(pageDir).delete();
+ }
+
+ public void testRecoveredMessageDeliveredCorrectly() throws Exception
+ {
+
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ //start the server
+ messagingService.start();
+ //then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSession session = sessionFactory.createSession(false, true, false, false);
+ session.createQueue(atestq, atestq, null, true, true);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("testINVMCoreClient");
+ message.getBody().flip();
+ message.setDurable(true);
+ Calendar cal = Calendar.getInstance();
+ cal.roll(Calendar.SECOND, 10);
+ producer.send(message, cal.getTimeInMillis());
+
+ producer.close();
+ session.close();
+ messagingService.stop();
+ messagingService = null;
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService.start();
+
+ sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+ session = sessionFactory.createSession(false, true, true, false);
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ ClientMessage message2 = consumer.receive(10000);
+ assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
+ assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+ message2.processed();
+ session.close();
+ }
+
+ public void testRecoveredTxMessageDeliveredCorrectly() throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ //start the server
+ messagingService.start();
+ //then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSession session = sessionFactory.createSession(true, false, false, false);
+ session.createQueue(atestq, atestq, null, true, true);
+ session.start(xid, XAResource.TMNOFLAGS);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("testINVMCoreClient");
+ message.getBody().flip();
+ message.setDurable(true);
+ Calendar cal = Calendar.getInstance();
+ cal.roll(Calendar.SECOND, 10);
+ producer.send(message, cal.getTimeInMillis());
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ producer.close();
+ session.close();
+ messagingService.stop();
+ messagingService = null;
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ messagingService.start();
+
+ sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+ session = sessionFactory.createSession(true, false, false, false);
+ session.commit(xid, true);
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ ClientMessage message2 = consumer.receive(10000);
+ assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
+ assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+ message2.processed();
+ session.close();
+ }
+
+ public void testPagedMessageDeliveredCorrectly() throws Exception
+ {
+
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ configuration.setPagingMaxGlobalSizeBytes(0);
+ messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+ //start the server
+ messagingService.start();
+ //then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSession session = sessionFactory.createSession(false, true, false, false);
+ session.createQueue(atestq, atestq, null, true, true);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("testINVMCoreClient");
+ message.getBody().flip();
+ message.setDurable(true);
+ Calendar cal = Calendar.getInstance();
+ cal.roll(Calendar.SECOND, 10);
+ producer.send(message, cal.getTimeInMillis());
+
+ producer.close();
+
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ ClientMessage message2 = consumer.receive(10000);
+ assertTrue(System.currentTimeMillis() >= cal.getTimeInMillis());
+ assertEquals("testINVMCoreClient", message2.getBody().getString());
+
+ message2.processed();
+ session.close();
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,22 +22,31 @@
package org.jboss.messaging.tests.unit.core.paging.impl;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-
import org.easymock.EasyMock;
import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.paging.LastPageRecord;
+import org.jboss.messaging.core.paging.PageMessage;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.paging.impl.PageMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+
public class PageManagerImplTest extends UnitTestCase
{
@@ -187,6 +196,75 @@
}
+ public void testOnDepage() throws Exception
+ {
+ long time = System.currentTimeMillis() + 10000;
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+ MessageReference ref = EasyMock.createStrictMock(MessageReference.class);
+ refs.add(ref);
+ Queue queue = EasyMock.createStrictMock(Queue.class);
+ HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
+ queueSettings.setDefault(new QueueSettings());
+ PostOffice po = EasyMock.createStrictMock(PostOffice.class);
+ PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
+ PagingStore store = EasyMock.createNiceMock(PagingStore.class);
+ StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
+ PagingManagerImpl manager = new PagingManagerImpl(spi, storageManager, queueSettings, -1);
+ manager.setPostOffice(po);
+ ServerMessage message = EasyMock.createStrictMock(ServerMessage.class);
+
+ EasyMock.expect(storageManager.generateUniqueID()).andReturn(1l);
+ EasyMock.expect(po.route(message)).andReturn(refs);
+ EasyMock.expect(message.getDurableRefCount()).andReturn(1);
+ storageManager.storeLastPage(EasyMock.anyLong(), (LastPageRecord) EasyMock.anyObject());
+ storageManager.storeMessageTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject());
+ storageManager.commit(EasyMock.anyLong());
+ EasyMock.expect(ref.getQueue()).andReturn(queue);
+ EasyMock.expect(queue.addLast(ref)).andReturn(null);
+ EasyMock.replay(spi, store, message, storageManager, po, ref, queue);
+ SimpleString queueName = new SimpleString("aq");
+ PageMessageImpl pageMessage = new PageMessageImpl(message);
+
+ manager.onDepage(0, queueName, store, new PageMessage[] {pageMessage} );
+ EasyMock.verify(spi, store, message, storageManager, po, ref, queue);
+ }
+
+ public void testOnDepageScheduledMessage() throws Exception
+ {
+ long time = System.currentTimeMillis() + 10000;
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+ MessageReference ref = EasyMock.createStrictMock(MessageReference.class);
+ refs.add(ref);
+ Queue queue = EasyMock.createStrictMock(Queue.class);
+ HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
+ queueSettings.setDefault(new QueueSettings());
+ PostOffice po = EasyMock.createStrictMock(PostOffice.class);
+ PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
+ PagingStore store = EasyMock.createNiceMock(PagingStore.class);
+ StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
+ PagingManagerImpl manager = new PagingManagerImpl(spi, storageManager, queueSettings, -1);
+ manager.setPostOffice(po);
+ ServerMessage message = EasyMock.createStrictMock(ServerMessage.class);
+
+ EasyMock.expect(storageManager.generateUniqueID()).andReturn(1l);
+ EasyMock.expect(po.route(message)).andReturn(refs);
+ EasyMock.expect(message.getDurableRefCount()).andReturn(1);
+ ref.setScheduledDeliveryTime(time);
+ storageManager.storeLastPage(EasyMock.anyLong(), (LastPageRecord) EasyMock.anyObject());
+ storageManager.storeMessageTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject());
+ storageManager.storeMessageScheduledTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject(), EasyMock.eq(time));
+ storageManager.commit(EasyMock.anyLong());
+ EasyMock.expect(ref.getQueue()).andReturn(queue);
+ queue.addScheduledDelivery(ref);
+ EasyMock.replay(spi, store, message, storageManager, po, ref, queue);
+ SimpleString queueName = new SimpleString("aq");
+ PageMessageImpl pageMessage = new PageMessageImpl(message);
+
+ pageMessage.getProperties().putLongProperty(new SimpleString("JBM_SCHEDULED_DELIVERY_PROP"), time);
+ manager.onDepage(0, queueName, store, new PageMessage[] {pageMessage} );
+ EasyMock.verify(spi, store, message, storageManager, po, ref, queue);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -22,10 +22,6 @@
package org.jboss.messaging.tests.unit.core.paging.impl;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PageMessage;
@@ -37,6 +33,10 @@
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.jboss.messaging.util.SimpleString;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
/**
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
@@ -113,6 +113,45 @@
}
+ public void testStoreWithProperty() throws Exception
+ {
+ SequentialFileFactory factory = new FakeSequentialFileFactory();
+
+ PagingStore storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
+
+ storeImpl.start();
+
+ assertEquals(0, storeImpl.getNumberOfPages());
+
+ storeImpl.startPaging();
+
+ assertEquals(1, storeImpl.getNumberOfPages());
+
+ List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+
+ ByteBuffer buffer = createRandomBuffer(0, 10);
+
+ buffers.add(buffer);
+ SimpleString destination = new SimpleString("test");
+
+ PageMessageImpl msg = createMessage(destination, buffer);
+ msg.getProperties().putLongProperty(new SimpleString("test-property"), 12345l);
+ assertTrue(storeImpl.isPaging());
+
+ assertTrue(storeImpl.page(msg));
+
+ assertEquals(1, storeImpl.getNumberOfPages());
+
+ storeImpl.sync();
+
+ storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
+
+ storeImpl.start();
+
+ assertEquals(2, storeImpl.getNumberOfPages());
+
+ }
+
public void testDepageOnCurrentPage() throws Exception
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-10-06 09:36:10 UTC (rev 5073)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-10-06 14:27:34 UTC (rev 5074)
@@ -23,13 +23,24 @@
package org.jboss.messaging.tests.unit.core.server.impl;
import org.easymock.EasyMock;
-import static org.easymock.EasyMock.*;
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.isA;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.*;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.QueueImpl;
import org.jboss.messaging.core.server.impl.RoundRobinDistributionPolicy;
import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -741,7 +752,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addScheduledDelivery(ref);
}
More information about the jboss-cvs-commits
mailing list