[jboss-cvs] JBoss Messaging SVN: r5325 - in trunk: src/main/org/jboss/messaging/core/client and 24 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Nov 10 09:50:54 EST 2008
Author: timfox
Date: 2008-11-10 09:50:54 -0500 (Mon, 10 Nov 2008)
New Revision: 5325
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java
Modified:
trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/server/JournalType.java
trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossMessageTest.java
Log:
Scheduled delivery now just uses standard message send and header
Modified: trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ScheduledMessageExample.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -34,6 +34,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.jms.client.JBossTextMessage;
import org.jboss.messaging.util.SimpleString;
@@ -61,7 +62,8 @@
log.info("current time " + df.format(cal.getTime()));
cal.add(Calendar.SECOND, 5);
log.info("message scheduled for " + df.format(cal.getTime()));
- clientProducer.send(message, cal.getTimeInMillis());
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, cal.getTimeInMillis());
+ clientProducer.send(message);
ClientConsumer clientConsumer = clientSession.createConsumer(queue);
clientSession.start();
ClientMessage msg = clientConsumer.receive(7000);
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -37,10 +37,6 @@
void send(SimpleString address, ClientMessage message) throws MessagingException;
- void send(final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException;
-
- void send(final SimpleString address, final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException;
-
void registerAcknowledgementHandler(AcknowledgementHandler handler);
void unregisterAcknowledgementHandler(AcknowledgementHandler handler);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -19,7 +19,6 @@
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TokenBucketLimiter;
@@ -103,30 +102,16 @@
{
checkClosed();
- doSend(null, msg, 0);
+ doSend(null, msg);
}
public void send(final SimpleString address, final ClientMessage msg) throws MessagingException
{
checkClosed();
- doSend(address, msg, 0);
+ doSend(address, msg);
}
- public void send(final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
- {
- checkClosed();
-
- doSend(null, msg, scheduleDeliveryTime);
- }
-
- public void send(final SimpleString address, final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
- {
- checkClosed();
-
- doSend(address, msg, scheduleDeliveryTime);
- }
-
public void registerAcknowledgementHandler(final AcknowledgementHandler handler)
{
// TODO
@@ -206,7 +191,7 @@
closed = true;
}
- private void doSend(final SimpleString address, final ClientMessage msg, long scheduledDeliveryTime) throws MessagingException
+ private void doSend(final SimpleString address, final ClientMessage msg) throws MessagingException
{
if (address != null)
{
@@ -226,22 +211,12 @@
if (autoGroupId != null)
{
- msg.putStringProperty(MessageImpl.GROUP_ID, autoGroupId);
+ msg.putStringProperty(MessageImpl.HDR_GROUP_ID, autoGroupId);
}
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
- SessionSendMessage message;
-
- // check to see if this message need to be scheduled.
- if (scheduledDeliveryTime <= 0)
- {
- message = new SessionSendMessage(id, msg, sendBlocking);
- }
- else
- {
- message = new SessionScheduledSendMessage(id, msg, sendBlocking, scheduledDeliveryTime);
- }
+ SessionSendMessage message = new SessionSendMessage(id, msg, sendBlocking);
if (sendBlocking)
{
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -322,7 +322,7 @@
String s = getString(e, "journal-type", journalType.toString());
- if (s == null || (!s.equals(JournalType.NIO.toString()) && !s.equals(JournalType.ASYNCIO.toString()) && !s.equals(JournalType.JDBC.toString())))
+ if (s == null || (!s.equals(JournalType.NIO.toString()) && !s.equals(JournalType.ASYNCIO.toString())))
{
throw new IllegalArgumentException("Invalid journal type " + s);
}
@@ -335,11 +335,7 @@
{
journalType = JournalType.ASYNCIO;
}
- else if (s.equals(JournalType.JDBC.toString()))
- {
- journalType = JournalType.JDBC;
- }
-
+
journalSyncTransactional = getBoolean(e, "journal-sync-transactional", journalSyncTransactional);
journalSyncNonTransactional = getBoolean(e, "journal-sync-non-transactional", journalSyncNonTransactional);
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -62,7 +62,9 @@
public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("JBM_ORIG_MESSAGE_ID");
- public static final SimpleString GROUP_ID = new SimpleString("JBMGroupID");
+ public static final SimpleString HDR_GROUP_ID = new SimpleString("JBM_GROUP_ID");
+
+ public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("JBM_SCHED_DELIVERY");
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -24,7 +24,6 @@
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.util.TypedProperties;
/**
*
@@ -39,7 +38,4 @@
ServerMessage getMessage();
long getTransactionID();
-
- TypedProperties getProperties();
-
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -92,22 +92,8 @@
* @param message
* @return false if destination is not on page mode
*/
- boolean pageScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception;
-
- /**
- * Page, only if destination is in page mode.
- * @param message
- * @return false if destination is not on page mode
- */
boolean page(ServerMessage message, long transactionId) throws Exception;
- /**
- * Page, only if destination is in page mode.
- * @param message
- * @return false if destination is not on page mode
- */
- boolean pageScheduled(ServerMessage message, long transactionId, long scheduledDeliveryTime) throws Exception;
-
/**
* Point to inform/restoring Transactions used when the messages were added into paging
* */
@@ -139,5 +125,4 @@
* @throws Exception
* */
void clearLastPageRecord(LastPageRecord lastRecord) throws Exception;
-
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -27,7 +27,6 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.util.DataConstants;
-import org.jboss.messaging.util.TypedProperties;
/**
*
@@ -54,19 +53,15 @@
private long transactionID = -1;
- private final TypedProperties properties;
-
public PageMessageImpl(final ServerMessage message, final long transactionID)
{
this.message = message;
this.transactionID = transactionID;
- properties = new TypedProperties();
}
public PageMessageImpl(final ServerMessage message)
{
this.message = message;
- properties = new TypedProperties();
}
public PageMessageImpl()
@@ -74,19 +69,6 @@
this(new ServerMessageImpl());
}
- public PageMessageImpl(final ServerMessage message, final TypedProperties properties)
- {
- this.message = message;
- this.properties = properties;
- }
-
- public PageMessageImpl(final ServerMessage message, final TypedProperties properties, final long transactionID)
- {
- this.message = message;
- this.transactionID = transactionID;
- this.properties = properties;
- }
-
public ServerMessage getMessage()
{
return message;
@@ -97,30 +79,23 @@
return transactionID;
}
- public TypedProperties getProperties()
- {
- return properties;
- }
-
// EncodingSupport implementation --------------------------------
public void decode(final MessagingBuffer buffer)
{
transactionID = buffer.getLong();
message.decode(buffer);
- properties.decode(buffer);
}
public void encode(final MessagingBuffer buffer)
{
buffer.putLong(transactionID);
message.encode(buffer);
- properties.encode(buffer);
}
public int getEncodeSize()
{
- return DataConstants.SIZE_LONG + message.getEncodeSize() + properties.getEncodeSize();
+ return DataConstants.SIZE_LONG + message.getEncodeSize();
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -45,7 +45,6 @@
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.TypedProperties;
/**
* <p>Look at the <a href="http://wiki.jboss.org/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
@@ -93,8 +92,6 @@
// private static final boolean isTrace = log.isTraceEnabled();
private static final boolean isTrace = true;
- private static final SimpleString SCHEDULED_DELIVERY_PROP = new SimpleString("JBM_SCHEDULED_DELIVERY_PROP");
-
// This is just a debug tool method.
// During debugs you could make log.trace as log.info, and change the
// variable isTrace above
@@ -204,7 +201,6 @@
HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
- final List<MessageReference> scheduledRefsToAdd = new ArrayList<MessageReference>();
for (PageMessage msg : data)
{
@@ -240,25 +236,8 @@
pageTransactionsToUpdate.add(pageTransactionInfo);
}
}
- Long scheduledDeliveryTime = (Long) msg.getProperties().getProperty(SCHEDULED_DELIVERY_PROP);
- //if this is a scheduled message we add it to the queue as just that
- if(scheduledDeliveryTime == null)
- {
- refsToAdd.addAll(postOffice.route(msg.getMessage()));
- }
- else
- {
- List<MessageReference> refs = postOffice.route(msg.getMessage());
- for (MessageReference ref : refs)
- {
- ref.setScheduledDeliveryTime(scheduledDeliveryTime);
- if(ref.getQueue().isDurable())
- {
- storageManager.storeMessageReferenceScheduledTransactional(depageTransactionID, ref.getQueue().getPersistenceID(), msg.getMessage().getMessageID(), scheduledDeliveryTime);
- }
- }
- scheduledRefsToAdd.addAll(refs);
- }
+
+ refsToAdd.addAll(postOffice.route(msg.getMessage()));
if (msg.getMessage().getDurableRefCount() != 0)
{
@@ -272,7 +251,7 @@
{
// http://wiki.jboss.org/wiki/JBossMessaging2Paging
// numberOfReads==numberOfWrites -> We delete the record
- storageManager.storeDeleteTransactional(depageTransactionID, pageWithTransaction.getRecordID());
+ storageManager.storeDeletePageTransaction(depageTransactionID, pageWithTransaction.getRecordID());
transactions.remove(pageWithTransaction.getTransactionID());
}
else
@@ -288,10 +267,6 @@
ref.getQueue().addLast(ref);
}
- for (MessageReference ref : scheduledRefsToAdd)
- {
- ref.getQueue().addLast(ref);
- }
if (globalMode.get())
{
return globalSize.get() < maxGlobalSize - WATERMARK_GLOBAL_PAGE && pagingStore.getMaxSizeBytes() <= 0 ||
@@ -337,20 +312,6 @@
return getPageStore(message.getDestination()).page(new PageMessageImpl(message));
}
- public boolean pageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
- {
- TypedProperties properties = new TypedProperties();
- properties.putLongProperty(SCHEDULED_DELIVERY_PROP, scheduledDeliveryTime);
- return getPageStore(message.getDestination()).page(new PageMessageImpl(message, properties));
- }
-
- public boolean pageScheduled(final ServerMessage message, final long transactionId, final long scheduledDeliveryTime) throws Exception
- {
- TypedProperties properties = new TypedProperties();
- properties.putLongProperty(SCHEDULED_DELIVERY_PROP, scheduledDeliveryTime);
- return getPageStore(message.getDestination()).page(new PageMessageImpl(message, properties));
- }
-
public void addTransaction(final PageTransactionInfo pageTransaction)
{
transactions.put(pageTransaction.getTransactionID(), pageTransaction);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -53,24 +53,27 @@
long generateUniqueID();
+
void storeMessage(ServerMessage message) throws Exception;
void storeAcknowledge(long queueID, long messageID) throws Exception;
+
+ void updateDeliveryCount(MessageReference ref) throws Exception;
+
+ void updateScheduledDeliveryTime(MessageReference ref) throws Exception;
void storeDelete(long messageID) throws Exception;
-
- void storeMessageReferenceScheduled(final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception;
-
+
+
+
void storeMessageTransactional(long txID, ServerMessage message) throws Exception;
void storeAcknowledgeTransactional(long txID, long queueID, long messageiD) throws Exception;
+
+ void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception;
- void storeMessageReferenceScheduledTransactional(final long txID, final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception;
-
void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
- /** Used to delete non-messaging data (such as PageTransaction and LasPage) */
- void storeDeleteTransactional(long txID, long recordID) throws Exception;
void prepare(long txID, Xid xid) throws Exception;
@@ -78,12 +81,15 @@
void rollback(long txID) throws Exception;
+
void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;
void storeLastPage(long txID, LastPageRecord pageTransaction) throws Exception;
+
+ void storeDeletePageTransaction(long txID, long recordID) throws Exception;
- void updateDeliveryCount(MessageReference ref) throws Exception;
-
+
+
void loadMessages(PostOffice postOffice, Map<Long, Queue> queues, ResourceManager resourceManager) throws Exception;
// Bindings related operations
@@ -97,5 +103,4 @@
boolean deleteDestination(SimpleString destination) throws Exception;
void loadBindings(QueueFactory queueFactory, List<Binding> bindings, List<SimpleString> destinations) throws Exception;
-
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -110,9 +110,9 @@
public static final byte LAST_PAGE = 35;
- public static final byte SET_SCHEDULED_DELIVERY_TIME = 44;
+ public static final byte SET_SCHEDULED_DELIVERY_TIME = 36;
- //This will produce a unique id **for this node only**
+ // This will produce a unique id **for this node only**
private final IDGenerator idGenerator = new TimeAndCounterIDGenerator();
private final AtomicLong bindingIDSequence = new AtomicLong(0);
@@ -175,11 +175,9 @@
log.info("NIO Journal selected");
journalFF = new NIOSequentialFileFactory(bindingsDir);
}
- else if (config.getJournalType() == JournalType.JDBC)
+ else
{
- log.info("JDBC Journal selected");
- // Sanity check only... this is previously tested
- throw new IllegalArgumentException("JDBC Journal is not supported yet");
+ throw new IllegalArgumentException("Unsupported journal type " + config.getJournalType());
}
messageJournal = new JournalImpl(config.getJournalFileSize(),
@@ -222,10 +220,12 @@
messageJournal.appendDeleteRecord(messageID);
}
- public void storeMessageReferenceScheduled(final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception
+ public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
{
- ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(scheduledDeliveryTime, queueID);
- messageJournal.appendUpdateRecord(messageID, SET_SCHEDULED_DELIVERY_TIME, encoding);
+ ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
+ ref.getQueue().getPersistenceID());
+
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), SET_SCHEDULED_DELIVERY_TIME, encoding);
}
// Transactional operations
@@ -271,15 +271,20 @@
messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, new ACKEncoding(queueID));
}
- public void storeDeleteTransactional(final long txID, final long recordID) throws Exception
+ public void storeDeletePageTransaction(final long txID, final long recordID) throws Exception
{
messageJournal.appendDeleteRecordTransactional(txID, recordID, null);
}
- public void storeMessageReferenceScheduledTransactional(final long txID, final long queueID, final long messageID, final long scheduledDeliveryTime) throws Exception
+ public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
{
- ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(scheduledDeliveryTime, queueID);
- messageJournal.appendUpdateRecordTransactional(txID, messageID, SET_SCHEDULED_DELIVERY_TIME, encoding);
+ ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
+ ref.getQueue().getPersistenceID());
+
+ messageJournal.appendUpdateRecordTransactional(txID,
+ ref.getMessage().getMessageID(),
+ SET_SCHEDULED_DELIVERY_TIME,
+ encoding);
}
public void storeDeleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -321,6 +326,7 @@
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
messageJournal.load(records, preparedTransactions);
+
for (RecordInfo record : records)
{
byte[] data = record.data;
@@ -436,16 +442,16 @@
Queue queue = queues.get(encoding.queueID);
- if (queue == null)
- {
- throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
- }
- //remove the reference and then add it back in with the scheduled time set.
- MessageReference removed = queue.removeReferenceWithID(messageID);
+ if (queue == null)
+ {
+ throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+ }
+ // remove the reference and then add it back in with the scheduled time set.
+ MessageReference removed = queue.removeReferenceWithID(messageID);
- removed.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
+ removed.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
- queue.addLast(removed);
+ queue.addLast(removed);
break;
}
@@ -661,13 +667,10 @@
Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this, postOffice);
- List<MessageReference> messages = new ArrayList<MessageReference>();
+ List<MessageReference> references = new ArrayList<MessageReference>();
- List<MessageReference> scheduledMessages = new ArrayList<MessageReference>();
+ List<MessageReference> referencesToAck = new ArrayList<MessageReference>();
- List<MessageReference> messagesToAck = new ArrayList<MessageReference>();
-
-
PageTransactionInfoImpl pageTransactionInfo = null;
// first get any sent messages for this tx and recreate
@@ -691,7 +694,7 @@
List<MessageReference> refs = postOffice.route(message);
- messages.addAll(refs);
+ references.addAll(refs);
break;
}
@@ -712,7 +715,7 @@
MessageReference removed = queue.removeReferenceWithID(messageID);
- messagesToAck.add(removed);
+ referencesToAck.add(removed);
if (removed == null)
{
@@ -746,21 +749,21 @@
throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
}
- for (MessageReference ref : messages)
+ for (MessageReference ref : references)
{
- if(ref.getQueue().getPersistenceID() == encoding.queueID &&
- ref.getMessage().getMessageID() == messageID)
+ if (ref.getQueue().getPersistenceID() == encoding.queueID && ref.getMessage().getMessageID() == messageID)
{
ref.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
- scheduledMessages.add(ref);
}
}
break;
}
default:
+ {
log.warn("InternalError: Record type " + recordType +
" not recognized. Maybe you're using journal files created on a different version");
+ }
}
}
@@ -787,7 +790,7 @@
MessageReference removed = queue.removeReferenceWithID(messageID);
- messagesToAck.add(removed);
+ referencesToAck.add(removed);
if (removed == null)
{
@@ -796,7 +799,7 @@
}
// now we recreate the state of the tx and add to the resource manager
- tx.replay(messages, scheduledMessages, messagesToAck, pageTransactionInfo, Transaction.State.PREPARED);
+ tx.replay(references, referencesToAck, pageTransactionInfo);
resourceManager.putTransaction(xid, tx);
}
@@ -1028,6 +1031,7 @@
super(queueID);
}
}
+
private static class ScheduledDeliveryEncoding extends QueueEncoding
{
long scheduledDeliveryTime;
@@ -1040,7 +1044,6 @@
public ScheduledDeliveryEncoding()
{
-
}
public int getEncodeSize()
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -116,7 +116,7 @@
{
}
- public void storeDeleteTransactional(long txID, long messageID)
+ public void storeDeletePageTransaction(long txID, long messageID)
throws Exception
{
}
@@ -129,7 +129,11 @@
{
}
- public void storeMessageReferenceScheduledTransactional(long txID, long queueID, long messageID, long scheduledDeliveryTime) throws Exception
+ public void updateScheduledDeliveryTime(MessageReference ref) throws Exception
+ {
+ }
+
+ public void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception
{
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -48,7 +48,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SCHEDULED_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
@@ -126,7 +125,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
@@ -785,11 +783,6 @@
packet = new SessionProducerCloseMessage();
break;
}
- case SESS_SCHEDULED_SEND:
- {
- packet = new SessionScheduledSendMessage();
- break;
- }
case NULL_RESPONSE:
{
packet = new NullResponseMessage();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -148,12 +148,10 @@
public static final byte SESS_MANAGEMENT_SEND = 80;
- public static final byte SESS_SCHEDULED_SEND = 81;
+ public static final byte SESS_FAILOVER_COMPLETE = 81;
- public static final byte SESS_FAILOVER_COMPLETE = 82;
+ public static final byte SESS_REPLICATE_DELIVERY = 82;
- public static final byte SESS_REPLICATE_DELIVERY = 83;
-
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionScheduledSendMessage.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -1,61 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class SessionScheduledSendMessage extends SessionSendMessage
-{
- private long scheduledDeliveryTime;
-
- public SessionScheduledSendMessage(final long producerID, final ClientMessage message, final boolean requiresResponse, final long scheduledDeliveryTime)
- {
- super(SESS_SCHEDULED_SEND, producerID, message, requiresResponse);
- this.scheduledDeliveryTime = scheduledDeliveryTime;
- }
-
- public SessionScheduledSendMessage()
- {
- super(SESS_SCHEDULED_SEND);
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- super.encodeBody(buffer);
- buffer.putLong(scheduledDeliveryTime);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- super.decodeBody(buffer);
- scheduledDeliveryTime = buffer.getLong();
- }
-
- public long getScheduledDeliveryTime()
- {
- return scheduledDeliveryTime;
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/server/JournalType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/JournalType.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/server/JournalType.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -31,5 +31,5 @@
*/
public enum JournalType
{
- NIO, ASYNCIO, JDBC;
+ NIO, ASYNCIO;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerProducer.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -38,6 +38,4 @@
void close() throws Exception;
void send(ServerMessage msg) throws Exception;
-
- void sendScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -39,7 +39,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
@@ -78,8 +77,6 @@
void send(ServerMessage msg) throws Exception;
- void sendScheduled(ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
-
void handleAcknowledge(final SessionAcknowledgeMessage packet);
void handleExpired(final SessionExpiredMessage packet);
@@ -140,8 +137,6 @@
void handleSendProducerMessage(SessionSendMessage packet);
- void handleSendScheduledProducerMessage(SessionScheduledSendMessage packet);
-
void handleManagementMessage(SessionSendManagementMessage packet);
void handleFailedOver(Packet packet);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/GroupingRoundRobinDistributionPolicy.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -54,7 +54,7 @@
public HandleStatus distribute(MessageReference reference)
{
- final SimpleString groupId = (SimpleString) reference.getMessage().getProperty(MessageImpl.GROUP_ID);
+ final SimpleString groupId = (SimpleString) reference.getMessage().getProperty(MessageImpl.HDR_GROUP_ID);
if (groupId != null)
{
boolean bound;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -124,13 +124,13 @@
return queue;
}
- public boolean cancel(final StorageManager persistenceManager,
+ public boolean cancel(final StorageManager storageManager,
final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
if (message.isDurable() && queue.isDurable())
{
- persistenceManager.updateDeliveryCount(this);
+ storageManager.updateDeliveryCount(this);
}
QueueSettings queueSettings = queueSettingsRepository.getMatch(queue.getName().toString());
@@ -139,7 +139,7 @@
if (maxDeliveries > 0 && deliveryCount >= maxDeliveries)
{
log.warn("Message has reached maximum delivery attempts, sending it to DLQ");
- sendToDLQ(persistenceManager, postOffice, queueSettingsRepository);
+ sendToDLQ(storageManager, postOffice, queueSettingsRepository);
return false;
}
@@ -150,7 +150,8 @@
if (redeliveryDelay > 0)
{
scheduledDeliveryTime = System.currentTimeMillis() + redeliveryDelay;
- persistenceManager.storeMessageReferenceScheduled(queue.getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
+
+ storageManager.updateScheduledDeliveryTime(this);
}
queue.referenceCancelled();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -67,10 +67,5 @@
public void send(final ServerMessage message) throws Exception
{
session.send(message);
- }
-
- public void sendScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
- {
- session.sendScheduled(message, scheduledDeliveryTime);
- }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -35,6 +35,7 @@
import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
@@ -66,7 +67,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
@@ -300,6 +300,8 @@
{
// check the user has write access to this address.
doSecurity(msg);
+
+ Long scheduledDeliveryTime = (Long)msg.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
if (autoCommitSends)
{
@@ -309,54 +311,31 @@
if (msg.getDurableRefCount() != 0)
{
- storageManager.storeMessage(msg);
+ storageManager.storeMessage(msg);
}
for (MessageReference ref : refs)
{
- ref.getQueue().addLast(ref);
- }
- }
- }
- else
- {
- tx.addMessage(msg);
- }
- }
-
- public void sendScheduled(final ServerMessage msg, final long scheduledDeliveryTime) throws Exception
- {
- doSecurity(msg);
-
- if (autoCommitSends)
- {
- if (!pager.pageScheduled(msg, scheduledDeliveryTime))
- {
- List<MessageReference> refs = postOffice.route(msg);
-
- if (msg.getDurableRefCount() != 0)
- {
- storageManager.storeMessage(msg);
- }
-
- for (MessageReference ref : refs)
- {
- if (ref.getQueue().isDurable())
+ if (scheduledDeliveryTime != null)
{
- storageManager.storeMessageReferenceScheduled(ref.getQueue().getPersistenceID(),
- msg.getMessageID(),
- scheduledDeliveryTime);
+ ref.setScheduledDeliveryTime(scheduledDeliveryTime.longValue());
+
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+ {
+ storageManager.updateScheduledDeliveryTime(ref);
+ }
}
- ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+
ref.getQueue().addLast(ref);
}
}
}
else
{
- tx.addScheduledMessage(msg, scheduledDeliveryTime);
+ tx.addMessage(msg);
}
}
+
public void doHandleCreateConsumer(final SessionCreateConsumerMessage packet)
{
@@ -2401,91 +2380,7 @@
}
}
- public void handleSendScheduledProducerMessage(final SessionScheduledSendMessage packet)
- {
- ServerMessage msg = packet.getServerMessage();
-
- final SendLock lock;
-
- if (channel.getReplicatingChannel() != null)
- {
- lock = postOffice.getAddressLock(msg.getDestination());
-
- lock.beforeSend();
- }
- else
- {
- lock = null;
- }
-
- if (msg.getMessageID() == 0L)
- {
- // must generate message id here, so we know they are in sync on live and backup
- long id = storageManager.generateUniqueID();
-
- msg.setMessageID(id);
- }
-
- DelayedResult result = channel.replicatePacket(packet);
-
- //With a send we must make sure it is replicated to backup before being processed on live
- //or can end up with delivery being processed on backup before original send
-
- if (result == null)
- {
- doSendScheduled(packet);
- }
- else
- {
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doSendScheduled(packet);
-
- lock.afterSend();
- }
- });
- }
- }
-
- private void doSendScheduled(final SessionScheduledSendMessage packet)
- {
- Packet response = null;
-
- try
- {
- producers.get(packet.getProducerID()).sendScheduled(packet.getServerMessage(), packet.getScheduledDeliveryTime());
-
- if (packet.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
- }
- catch (Exception e)
- {
- log.error("Failed to send scheduled message", e);
- if (packet.isRequiresResponse())
- {
- if (e instanceof MessagingException)
- {
- response = new MessagingExceptionMessage((MessagingException)e);
- }
- else
- {
- response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
- }
- }
- }
-
- channel.confirm(packet);
-
- if (response != null)
- {
- channel.send(response);
- }
- }
-
+
public void handleManagementMessage(final SessionSendManagementMessage packet)
{
ServerMessage msg = packet.getServerMessage();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -32,7 +32,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SCHEDULED_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
@@ -53,7 +52,6 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -68,7 +66,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
@@ -302,12 +299,6 @@
session.handleSendProducerMessage(message);
break;
}
- case SESS_SCHEDULED_SEND:
- {
- SessionScheduledSendMessage message = (SessionScheduledSendMessage)packet;
- session.handleSendScheduledProducerMessage(message);
- break;
- }
case SESS_MANAGEMENT_SEND:
{
SessionSendManagementMessage message = (SessionSendManagementMessage)packet;
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.transaction;
@@ -50,7 +50,7 @@
void addMessage(ServerMessage message) throws Exception;
void addAcknowledgement(MessageReference acknowledgement) throws Exception;
-
+
int getAcknowledgementsCount();
long getID();
@@ -69,10 +69,10 @@
void markAsRollbackOnly(MessagingException messagingException);
- void replay(List<MessageReference> messages, List<MessageReference> scheduledMessages, List<MessageReference> acknowledgements, PageTransactionInfo pageTransaction, State prepared) throws Exception;
+ void replay(List<MessageReference> messages,
+ List<MessageReference> acknowledgements,
+ PageTransactionInfo pageTransaction) throws Exception;
- void addScheduledMessage(ServerMessage msg, long scheduledDeliveryTime) throws Exception;
-
List<MessageReference> timeout() throws Exception;
long getCreateTime();
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -13,17 +13,16 @@
package org.jboss.messaging.core.transaction.impl;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Collections;
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
@@ -59,10 +58,6 @@
private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
- private final Map<ServerMessage, Long> scheduledPagedMessages = new HashMap<ServerMessage, Long>();
-
- private final Map<MessageReference, Long> scheduledReferences = new HashMap<MessageReference, Long>();
-
private PageTransactionInfo pageTransaction;
private final Xid xid;
@@ -91,12 +86,12 @@
}
else
{
- this.pagingManager = postOffice.getPagingManager();
+ pagingManager = postOffice.getPagingManager();
}
- this.xid = null;
+ xid = null;
- this.id = storageManager.generateUniqueID();
+ id = storageManager.generateUniqueID();
createTime = System.currentTimeMillis();
}
@@ -113,12 +108,12 @@
}
else
{
- this.pagingManager = postOffice.getPagingManager();
+ pagingManager = postOffice.getPagingManager();
}
this.xid = xid;
- this.id = storageManager.generateUniqueID();
+ id = storageManager.generateUniqueID();
createTime = System.currentTimeMillis();
}
@@ -139,7 +134,7 @@
}
else
{
- this.pagingManager = postOffice.getPagingManager();
+ pagingManager = postOffice.getPagingManager();
}
createTime = System.currentTimeMillis();
@@ -170,35 +165,6 @@
}
}
- public void addScheduledMessage(final ServerMessage message, long scheduledDeliveryTime) throws Exception
- {
- if (state != State.ACTIVE)
- {
- throw new IllegalStateException("Transaction is in invalid state " + state);
- }
-
- if (pagingManager.isPaging(message.getDestination()))
- {
- scheduledPagedMessages.put(message, scheduledDeliveryTime);
- }
- else
- {
- List<MessageReference> refs = route(message);
-
- for (MessageReference ref : refs)
- {
- scheduledReferences.put(ref, scheduledDeliveryTime);
- if (ref.getQueue().isDurable())
- {
- storageManager.storeMessageReferenceScheduledTransactional(id,
- ref.getQueue().getPersistenceID(),
- message.getMessageID(),
- scheduledDeliveryTime);
- }
- }
- }
- }
-
public List<MessageReference> timeout() throws Exception
{
// we need to synchronize with commit and rollback just in case they get called atthesame time
@@ -332,16 +298,7 @@
for (MessageReference ref : refsToAdd)
{
- Long scheduled = scheduledReferences.get(ref);
- if (scheduled == null)
- {
- ref.getQueue().addLast(ref);
- }
- else
- {
- ref.setScheduledDeliveryTime(scheduled);
- ref.getQueue().addLast(ref);
- }
+ ref.getQueue().addLast(ref);
}
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
@@ -458,25 +415,19 @@
return containsPersistent;
}
- public void markAsRollbackOnly(MessagingException messagingException)
+ public void markAsRollbackOnly(final MessagingException messagingException)
{
state = State.ROLLBACK_ONLY;
this.messagingException = messagingException;
}
- public void replay(List<MessageReference> messages,
- List<MessageReference> scheduledMessages,
- List<MessageReference> acknowledgements,
- PageTransactionInfo pageTransaction,
- State prepared) throws Exception
+ public void replay(final List<MessageReference> messages,
+ final List<MessageReference> acknowledgements,
+ final PageTransactionInfo pageTransaction) throws Exception
{
containsPersistent = true;
refsToAdd.addAll(messages);
- for (MessageReference scheduledMessage : scheduledMessages)
- {
- this.scheduledReferences.put(scheduledMessage, scheduledMessage.getScheduledDeliveryTime());
- }
this.acknowledgements.addAll(acknowledgements);
this.pageTransaction = pageTransaction;
@@ -485,7 +436,7 @@
pagingManager.addTransaction(this.pageTransaction);
}
- state = prepared;
+ state = State.PREPARED;
}
public void setContainsPersistent(final boolean containsPersistent)
@@ -498,6 +449,8 @@
private List<MessageReference> route(final ServerMessage message) throws Exception
{
+ Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+
List<MessageReference> refs = postOffice.route(message);
refsToAdd.addAll(refs);
@@ -509,6 +462,19 @@
containsPersistent = true;
}
+ if (scheduledDeliveryTime != null)
+ {
+ for (MessageReference ref : refs)
+ {
+ ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+ {
+ storageManager.updateScheduledDeliveryTimeTransactional(id, ref);
+ }
+ }
+ }
+
return refs;
}
@@ -522,7 +488,7 @@
{
if (pageTransaction == null)
{
- pageTransaction = new PageTransactionInfoImpl(this.id);
+ pageTransaction = new PageTransactionInfoImpl(id);
// To avoid a race condition where depage happens before the transaction is completed, we need to inform the
// pager about this transaction is being processed
pagingManager.addTransaction(pageTransaction);
@@ -550,37 +516,6 @@
}
}
- for (ServerMessage message : scheduledPagedMessages.keySet())
- {
- long scheduledDeliveryTime = scheduledPagedMessages.get(message);
- // http://wiki.jboss.org/wiki/JBossMessaging2Paging
- // Explained under Transaction On Paging. (This is the item B)
- if (pagingManager.pageScheduled(message, id, scheduledDeliveryTime))
- {
- if (message.isDurable())
- {
- // We only create pageTransactions if using persistent messages
- pageTransaction.increment();
- pagingPersistent = true;
- pagedDestinationsToSync.add(message.getDestination());
- }
- }
- else
- {
- // This could happen when the PageStore left the pageState
- List<MessageReference> refs = route(message);
-
- for (MessageReference ref : refs)
- {
- scheduledReferences.put(ref, scheduledDeliveryTime);
- if (ref.getQueue().isDurable())
- {
- storageManager.storeMessageReferenceScheduledTransactional(id, ref.getQueue().getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
- }
- }
- }
- }
-
if (pagingPersistent)
{
containsPersistent = true;
@@ -599,9 +534,5 @@
acknowledgements.clear();
pagedMessages.clear();
-
- scheduledPagedMessages.clear();
-
- scheduledReferences.clear();
}
}
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -78,8 +78,6 @@
// Used when bridging a message
public static final String JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST = "JBM_BRIDGE_MSG_ID_LIST";
- public static final String JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME = "JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME";
-
public static final byte TYPE = 0;
// Static --------------------------------------------------------
@@ -176,8 +174,6 @@
// Cache it
private String jmsType;
- private long scheduledDeliveryTime = 0;
-
// Constructors --------------------------------------------------
/**
* constructors for test purposes only
@@ -379,17 +375,6 @@
return jmsCorrelationID;
}
- public long getScheduledDeliveryTime()
- {
- return scheduledDeliveryTime;
- }
-
- public void setScheduledDeliveryTime(long scheduledDeliveryTime)
- {
- message.putLongProperty(new SimpleString(JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME), scheduledDeliveryTime);
- this.scheduledDeliveryTime = scheduledDeliveryTime;
- }
-
public Destination getJMSReplyTo() throws JMSException
{
if (replyTo == null)
@@ -561,7 +546,7 @@
public boolean propertyExists(final String name) throws JMSException
{
return message.containsProperty(new SimpleString(name)) || name.equals(JMSXDELIVERYCOUNT)
- || (JMSXGROUPID.equals(name) && message.containsProperty(MessageImpl.GROUP_ID));
+ || (JMSXGROUPID.equals(name) && message.containsProperty(MessageImpl.HDR_GROUP_ID));
}
public boolean getBooleanProperty(final String name) throws JMSException
@@ -723,7 +708,7 @@
Object value;
if(JMSXGROUPID.equals(name))
{
- value = message.getProperty(MessageImpl.GROUP_ID);
+ value = message.getProperty(MessageImpl.HDR_GROUP_ID);
}
else
{
@@ -833,10 +818,6 @@
{
Long l = new Long(value);
checkProperty(name, l);
- if(JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME.equals(name))
- {
- scheduledDeliveryTime = l;
- }
message.putLongProperty(new SimpleString(name), value);
}
@@ -859,7 +840,7 @@
checkProperty(name, value);
if(JMSXGROUPID.equals(name))
{
- message.putStringProperty(MessageImpl.GROUP_ID, new SimpleString(value));
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, new SimpleString(value));
}
else
{
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessageProducer.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -451,15 +451,7 @@
try
{
- //check to see if this message needs to be scheduled
- if(jbm.getScheduledDeliveryTime() > 0)
- {
- producer.send(address, coreMessage, jbm.getScheduledDeliveryTime());
- }
- else
- {
- producer.send(address, coreMessage);
- }
+ producer.send(address, coreMessage);
}
catch (MessagingException e)
{
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -28,6 +28,7 @@
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.jms.client.JBossMessage;
@@ -90,7 +91,7 @@
long now = System.currentTimeMillis();
TextMessage tm1 = sess.createTextMessage("testScheduled1");
- tm1.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 29000);
+ tm1.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 29000);
prod.send(tm1);
//First send some non scheduled messages
@@ -110,25 +111,25 @@
//These numbers have to be large with Hudson, since restart can take some time
TextMessage tm5 = sess.createTextMessage("testScheduled5");
- tm5.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 27000);
+ tm5.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 27000);
prod.send(tm5);
TextMessage tm6 = sess.createTextMessage("testScheduled6");
- tm6.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 26000);
+ tm6.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 26000);
prod.send(tm6);
TextMessage tm7 = sess.createTextMessage("testScheduled7");
- tm7.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 25000);
+ tm7.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 25000);
prod.send(tm7);
TextMessage tm8 = sess.createTextMessage("testScheduled8");
- tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 28000);
+ tm8.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 28000);
prod.send(tm8);
//And one scheduled with a -ve number
TextMessage tm9 = sess.createTextMessage("testScheduled9");
- tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, -3);
+ tm8.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), -3);
prod.send(tm9);
//Now stop the server and restart it
@@ -417,7 +418,7 @@
long now = System.currentTimeMillis();
TextMessage tm1 = sess.createTextMessage("testScheduled1");
- tm1.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 7000);
+ tm1.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 7000);
prod.send(tm1);
//First send some non scheduled messages
@@ -435,25 +436,25 @@
//Now send some more scheduled messages
TextMessage tm5 = sess.createTextMessage("testScheduled5");
- tm5.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 5000);
+ tm5.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 5000);
prod.send(tm5);
TextMessage tm6 = sess.createTextMessage("testScheduled6");
- tm6.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 4000);
+ tm6.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 4000);
prod.send(tm6);
TextMessage tm7 = sess.createTextMessage("testScheduled7");
- tm7.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 3000);
+ tm7.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 3000);
prod.send(tm7);
TextMessage tm8 = sess.createTextMessage("testScheduled8");
- tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 6000);
+ tm8.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 6000);
prod.send(tm8);
//And one scheduled with a -ve number
TextMessage tm9 = sess.createTextMessage("testScheduled9");
- tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, -3);
+ tm8.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), -3);
prod.send(tm9);
if (tx)
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -39,6 +39,7 @@
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
@@ -117,7 +118,9 @@
message.putIntProperty(new SimpleString("count"), i);
message.getBody().putString("aardvarks");
message.getBody().flip();
- producer.send(message, now + delay * i);
+ long deliveryTime = now + delay * i;
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, deliveryTime);
+ producer.send(message);
}
ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -36,6 +36,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -172,7 +173,8 @@
ClientMessage message = createMessage(session, "m1");
long time = System.currentTimeMillis();
time += 10000;
- producer.send(message, time);
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(message);
producer.close();
@@ -335,8 +337,10 @@
message.setDurable(true);
long time = System.currentTimeMillis();
time += 10000;
- producer.send(message, time);
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(message);
+ log.info("Recover is " + recover);
if (recover)
{
producer.close();
@@ -386,15 +390,20 @@
ClientMessage m5 = createMessage(session, "m5");
long time = System.currentTimeMillis();
time += 10000;
- producer.send(m1, time);
+ m1.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m1);
time += 1000;
- producer.send(m2, time);
+ m2.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m2);
time += 1000;
- producer.send(m3, time);
+ m3.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m3);
time += 1000;
- producer.send(m4, time);
+ m4.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m4);
time += 1000;
- producer.send(m5, time);
+ m5.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m5);
time -= 4000;
if (recover)
{
@@ -467,15 +476,20 @@
ClientMessage m5 = createMessage(session, "m5");
long time = System.currentTimeMillis();
time += 10000;
- producer.send(m1, time);
+ m1.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m1);
time += 3000;
- producer.send(m2, time);
+ m2.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m2);
time -= 2000;
- producer.send(m3, time);
+ m3.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m3);
time += 3000;
- producer.send(m4, time);
+ m4.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m4);
time -= 2000;
- producer.send(m5, time);
+ m5.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m5);
time -= 2000;
ClientConsumer consumer = null;
if (recover)
@@ -549,13 +563,16 @@
ClientMessage m5 = createMessage(session, "m5");
long time = System.currentTimeMillis();
time += 10000;
- producer.send(m1, time);
+ m1.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m1);
producer.send(m2);
time += 1000;
- producer.send(m3, time);
+ m3.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m3);
producer.send(m4);
time += 1000;
- producer.send(m5, time);
+ m5.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(m5);
time -= 2000;
ClientConsumer consumer = null;
if (recover)
@@ -629,7 +646,8 @@
message.setDurable(true);
Calendar cal = Calendar.getInstance();
cal.roll(Calendar.SECOND, 10);
- producer.send(message, cal.getTimeInMillis());
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, cal.getTimeInMillis());
+ producer.send(message);
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
if (recover)
Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/server/impl/QueueImplTest.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -296,7 +296,7 @@
{
if (i % 2 == 0)
{
- storageManager.storeDeleteTransactional(1, i);
+ storageManager.storeDeletePageTransaction(1, i);
}
}
@@ -304,7 +304,7 @@
{
if (i % 2 == 0)
{
- storageManager.storeDeleteTransactional(1, i);
+ storageManager.storeDeletePageTransaction(1, i);
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -127,8 +127,8 @@
conf.setCreateJournalDir(b);
assertEquals(b, conf.isCreateJournalDir());
- i = randomInt() % 3;
- JournalType journal = i == 0 ? JournalType.ASYNCIO : i == 1 ? JournalType.JDBC : i == 2 ? JournalType.NIO : JournalType.NIO;
+ i = randomInt() % 2;
+ JournalType journal = i == 0 ? JournalType.ASYNCIO : JournalType.NIO;
conf.setJournalType(journal);
assertEquals(journal, conf.getJournalType());
@@ -206,8 +206,8 @@
b = randomBoolean();
conf.setCreateJournalDir(b);
- i = randomInt() % 3;
- JournalType journal = i == 0 ? JournalType.ASYNCIO : i == 1 ? JournalType.JDBC : i == 2 ? JournalType.NIO : JournalType.NIO;
+ i = randomInt() % 2;
+ JournalType journal = i == 0 ? JournalType.ASYNCIO : JournalType.NIO;
conf.setJournalType(journal);
b = randomBoolean();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -24,21 +24,16 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
import org.easymock.EasyMock;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.PageMessage;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.paging.impl.PageMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
-import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
@@ -102,46 +97,7 @@
EasyMock.verify(spi, store, message, storageManager, po, ref, queue);
}
- public void testOnDepageScheduledMessage() throws Exception
- {
- long time = System.currentTimeMillis() + 10000;
- List<MessageReference> refs = new ArrayList<MessageReference>();
- MessageReference ref = EasyMock.createStrictMock(MessageReference.class);
- refs.add(ref);
- Queue queue = EasyMock.createStrictMock(Queue.class);
- HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
- queueSettings.setDefault(new QueueSettings());
- PostOffice po = EasyMock.createStrictMock(PostOffice.class);
- PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- PagingStore store = EasyMock.createNiceMock(PagingStore.class);
- StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
- PagingManagerImpl manager = new PagingManagerImpl(spi, storageManager, queueSettings, -1);
- manager.setPostOffice(po);
- ServerMessage message = EasyMock.createStrictMock(ServerMessage.class);
-
- EasyMock.expect(storageManager.generateUniqueID()).andReturn(1l);
- EasyMock.expect(po.route(message)).andReturn(refs);
- EasyMock.expect(message.getDurableRefCount()).andReturn(1);
- ref.setScheduledDeliveryTime(time);
- storageManager.storeLastPage(EasyMock.anyLong(), (LastPageRecord) EasyMock.anyObject());
- storageManager.storeMessageReferenceScheduledTransactional(EasyMock.anyLong(), EasyMock.anyLong(), EasyMock.anyLong(), EasyMock.eq(time));
- storageManager.storeMessageTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject());
- storageManager.commit(EasyMock.anyLong());
- EasyMock.expect(ref.getQueue()).andStubReturn(queue);
- EasyMock.expect(queue.isDurable()).andReturn(true);
- EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
- EasyMock.expect(message.getMessageID()).andStubReturn(2);
- //storageManager.storeMessageReferenceScheduledTransactional(1,1,2,time);
- EasyMock.expect(queue.addLast(ref)).andReturn(HandleStatus.HANDLED);
- EasyMock.replay(spi, store, message, storageManager, po, ref, queue);
- SimpleString queueName = new SimpleString("aq");
- PageMessageImpl pageMessage = new PageMessageImpl(message);
-
- pageMessage.getProperties().putLongProperty(new SimpleString("JBM_SCHEDULED_DELIVERY_PROP"), time);
- manager.onDepage(0, queueName, store, new PageMessage[] {pageMessage} );
- EasyMock.verify(spi, store, message, storageManager, po, ref, queue);
- }
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -113,45 +113,7 @@
}
- public void testStoreWithProperty() throws Exception
- {
- SequentialFileFactory factory = new FakeSequentialFileFactory();
-
- PagingStore storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
-
- storeImpl.start();
-
- assertEquals(0, storeImpl.getNumberOfPages());
-
- storeImpl.startPaging();
-
- assertEquals(1, storeImpl.getNumberOfPages());
-
- List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
-
- ByteBuffer buffer = createRandomBuffer(0, 10);
-
- buffers.add(buffer);
- SimpleString destination = new SimpleString("test");
-
- PageMessageImpl msg = createMessage(destination, buffer);
- msg.getProperties().putLongProperty(new SimpleString("test-property"), 12345l);
- assertTrue(storeImpl.isPaging());
-
- assertTrue(storeImpl.page(msg));
-
- assertEquals(1, storeImpl.getNumberOfPages());
-
- storeImpl.sync();
-
- storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
-
- storeImpl.start();
-
- assertEquals(2, storeImpl.getNumberOfPages());
-
- }
-
+
public void testDepageOnCurrentPage() throws Exception
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -169,7 +169,7 @@
messageJournal.appendDeleteRecordTransactional(txID, messageID, null);
EasyMock.replay(messageJournal, bindingsJournal);
- jsm.storeDeleteTransactional(txID, messageID);
+ jsm.storeDeletePageTransaction(txID, messageID);
EasyMock.verify(messageJournal, bindingsJournal);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/GroupingRoundRobinDistributionPolicyTest.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -57,7 +57,7 @@
policy.addConsumer(consumer);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
- EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid1"));
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
EasyMock.replay(consumer, serverMessage, reference);
@@ -74,7 +74,7 @@
Consumer c3 = EasyMock.createStrictMock(Consumer.class);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
- EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid1"));
DistributionPolicy dp = new RoundRobinDistributionPolicy();
dp.addConsumer(c1);
dp.addConsumer(c2);
@@ -97,7 +97,7 @@
Consumer c3 = EasyMock.createStrictMock(Consumer.class);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
- EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid1"));
DistributionPolicy dp = new RoundRobinDistributionPolicy();
dp.addConsumer(c1);
dp.addConsumer(c2);
@@ -123,7 +123,7 @@
policy.addConsumer(consumer3);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
- EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid1"));
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
EasyMock.replay(consumer, consumer2, consumer3, serverMessage, reference);
@@ -140,10 +140,10 @@
policy.addConsumer(consumer);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
- EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid1"));
ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
- EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+ EasyMock.expect(serverMessage2.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid2"));
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
EasyMock.replay(consumer, serverMessage, serverMessage2, reference, reference2);
@@ -164,10 +164,10 @@
policy.addConsumer(consumer3);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
- EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid1"));
ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
- EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+ EasyMock.expect(serverMessage2.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid2"));
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
EasyMock.expect(consumer2.handle(reference2)).andReturn(HandleStatus.HANDLED);
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
@@ -191,7 +191,7 @@
policy.addConsumer(consumer3);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
- EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid1"));
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.BUSY);
EasyMock.expect(consumer2.handle(reference)).andReturn(HandleStatus.HANDLED);
EasyMock.replay(consumer, consumer2, consumer3, serverMessage, reference);
@@ -210,7 +210,7 @@
policy.addConsumer(consumer3);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
- EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid1"));
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.BUSY);
EasyMock.replay(consumer, consumer2, consumer3, serverMessage, reference);
@@ -238,28 +238,28 @@
policy.addConsumer(consumer3);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
- EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid1"));
ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
- EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+ EasyMock.expect(serverMessage2.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid2"));
ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference3.getMessage()).andStubReturn(serverMessage3);
- EasyMock.expect(serverMessage3.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
+ EasyMock.expect(serverMessage3.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid3"));
ServerMessage serverMessage4 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference4.getMessage()).andStubReturn(serverMessage4);
- EasyMock.expect(serverMessage4.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid4"));
+ EasyMock.expect(serverMessage4.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid4"));
ServerMessage serverMessage5 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference5.getMessage()).andStubReturn(serverMessage5);
- EasyMock.expect(serverMessage5.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid5"));
+ EasyMock.expect(serverMessage5.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid5"));
ServerMessage serverMessage6 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference6.getMessage()).andStubReturn(serverMessage6);
- EasyMock.expect(serverMessage6.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid6"));
+ EasyMock.expect(serverMessage6.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid6"));
ServerMessage serverMessage7 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference7.getMessage()).andStubReturn(serverMessage7);
- EasyMock.expect(serverMessage7.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid7"));
+ EasyMock.expect(serverMessage7.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid7"));
ServerMessage serverMessage8 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference8.getMessage()).andStubReturn(serverMessage8);
- EasyMock.expect(serverMessage8.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid8"));
+ EasyMock.expect(serverMessage8.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid8"));
ServerMessage serverMessage9 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference9.getMessage()).andStubReturn(serverMessage9);
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
@@ -271,7 +271,7 @@
EasyMock.expect(consumer.handle(reference7)).andReturn(HandleStatus.HANDLED);
EasyMock.expect(consumer2.handle(reference8)).andReturn(HandleStatus.HANDLED);
EasyMock.expect(consumer3.handle(reference9)).andReturn(HandleStatus.HANDLED);
- EasyMock.expect(serverMessage9.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid9"));
+ EasyMock.expect(serverMessage9.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid9"));
EasyMock.replay(consumer, consumer2, consumer3, serverMessage, serverMessage2, serverMessage3, serverMessage4,
serverMessage5, serverMessage6, serverMessage7, serverMessage8, serverMessage9, reference,
reference2, reference3, reference4, reference5, reference6, reference7, reference8, reference9);
@@ -305,13 +305,13 @@
policy.addConsumer(consumer4);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference.getMessage()).andStubReturn(serverMessage);
- EasyMock.expect(serverMessage.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid1"));
+ EasyMock.expect(serverMessage.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid1"));
ServerMessage serverMessage2 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference2.getMessage()).andStubReturn(serverMessage2);
- EasyMock.expect(serverMessage2.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid2"));
+ EasyMock.expect(serverMessage2.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid2"));
ServerMessage serverMessage3 = EasyMock.createStrictMock(ServerMessage.class);
EasyMock.expect(reference3.getMessage()).andStubReturn(serverMessage3);
- EasyMock.expect(serverMessage3.getProperty(MessageImpl.GROUP_ID)).andStubReturn(new SimpleString("gid3"));
+ EasyMock.expect(serverMessage3.getProperty(MessageImpl.HDR_GROUP_ID)).andStubReturn(new SimpleString("gid3"));
EasyMock.expect(consumer.handle(reference)).andReturn(HandleStatus.HANDLED);
EasyMock.expect(consumer2.handle(reference2)).andReturn(HandleStatus.HANDLED);
EasyMock.expect(consumer3.handle(reference3)).andReturn(HandleStatus.HANDLED);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossMessageTest.java 2008-11-10 03:45:33 UTC (rev 5324)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossMessageTest.java 2008-11-10 14:50:54 UTC (rev 5325)
@@ -392,13 +392,13 @@
JBossMessage message = new JBossMessage();
message.setStringProperty(JBossMessage.JMSXGROUPID, "testid");
assertTrue(message.propertyExists(JBossMessage.JMSXGROUPID));
- assertEquals(new SimpleString("testid"), message.getCoreMessage().getProperty(MessageImpl.GROUP_ID));
+ assertEquals(new SimpleString("testid"), message.getCoreMessage().getProperty(MessageImpl.HDR_GROUP_ID));
}
public void testGetJMSXGroupIDProperty() throws Exception
{
JBossMessage message = new JBossMessage();
- message.getCoreMessage().putStringProperty(MessageImpl.GROUP_ID, new SimpleString("testid"));
+ message.getCoreMessage().putStringProperty(MessageImpl.HDR_GROUP_ID, new SimpleString("testid"));
assertTrue(message.propertyExists(JBossMessage.JMSXGROUPID));
assertEquals("testid", message.getStringProperty(JBossMessage.JMSXGROUPID));
}
More information about the jboss-cvs-commits
mailing list