[jboss-cvs] JBoss Messaging SVN: r5666 - in trunk: src/main/org/jboss/messaging/core/persistence/impl/journal and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jan 20 06:22:15 EST 2009
Author: timfox
Date: 2009-01-20 06:22:15 -0500 (Tue, 20 Jan 2009)
New Revision: 5666
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/queue/PredefinedQueueTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/server/Bindable.java
trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
Log:
more tweaks, fixes etc
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-20 11:22:15 UTC (rev 5666)
@@ -223,6 +223,7 @@
public boolean addSize(final long size) throws Exception
{
+ //log.info("Adding size " + size);
final long maxSize = getMaxSizeBytes();
final long pageSize = getPageSizeBytes();
@@ -790,7 +791,7 @@
message = pagedMessage.getMessage(storageManager);
final long transactionIdDuringPaging = pagedMessage.getTransactionID();
-
+
if (transactionIdDuringPaging >= 0)
{
final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-20 11:22:15 UTC (rev 5666)
@@ -649,6 +649,11 @@
MessageReference ref = queue.reroute(record.message, null);
ref.setDeliveryCount(record.deliveryCount);
+
+ if (scheduledDeliveryTime != 0)
+ {
+ record.message.removeProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+ }
}
}
@@ -751,6 +756,7 @@
throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
}
+ //TODO - this involves a scan - we should find a quicker qay of doing it
MessageReference removed = queue.removeReferenceWithID(messageID);
referencesToAck.add(removed);
Modified: trunk/src/main/org/jboss/messaging/core/server/Bindable.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Bindable.java 2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/Bindable.java 2009-01-20 11:22:15 UTC (rev 5666)
@@ -42,7 +42,7 @@
SimpleString getRoutingName();
- boolean accept(ServerMessage message);
+ boolean accept(ServerMessage message) throws Exception;
boolean isExclusive();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2009-01-20 11:22:15 UTC (rev 5666)
@@ -39,6 +39,10 @@
MessageReference createReference(Queue queue);
+ int incrementRefCount();
+
+ int incrementDurableRefCount();
+
int decrementDurableRefCount();
int decrementRefCount();
@@ -50,5 +54,7 @@
void setStored();
boolean isStored();
+
+ int getRefCount();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java 2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java 2009-01-20 11:22:15 UTC (rev 5666)
@@ -53,11 +53,11 @@
private final SimpleString uniqueName;
private final SimpleString routingName;
-
+
private final boolean exclusive;
-
+
private final Filter filter;
-
+
private final Transformer transformer;
public DivertImpl(final SimpleString forwardAddress,
@@ -69,28 +69,40 @@
final PostOffice postOffice)
{
this.forwardAddress = forwardAddress;
-
+
this.uniqueName = uniqueName;
-
+
this.routingName = routingName;
-
+
this.exclusive = exclusive;
-
+
this.filter = filter;
-
+
this.transformer = transformer;
this.postOffice = postOffice;
}
- public void route(ServerMessage message, final Transaction tx) throws Exception
+ public boolean accept(final ServerMessage message) throws Exception
{
+ if (filter != null && !filter.match(message))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public void route(ServerMessage message, final Transaction tx) throws Exception
+ {
SimpleString originalDestination = message.getDestination();
message.setDestination(forwardAddress);
message.putStringProperty(HDR_ORIGINAL_DESTINATION, originalDestination);
-
+
if (transformer != null)
{
message = transformer.transform(message);
@@ -99,18 +111,6 @@
postOffice.route(message, tx);
}
- public boolean accept(final ServerMessage message)
- {
- if (filter != null && !filter.match(message))
- {
- return false;
- }
- else
- {
- return true;
- }
- }
-
public SimpleString getRoutingName()
{
return routingName;
@@ -120,11 +120,9 @@
{
return uniqueName;
}
-
+
public boolean isExclusive()
{
return exclusive;
}
-
-
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-20 11:22:15 UTC (rev 5666)
@@ -808,6 +808,8 @@
Queue queue = queueFactory.createQueue(-1, name, filter, config.isDurable(), false);
QueueBinding queueBinding = new QueueBindingImpl(new SimpleString(config.getAddress()), queue);
+
+ binding = queueBinding;
postOffice.addBinding(binding);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-20 11:22:15 UTC (rev 5666)
@@ -156,7 +156,7 @@
// Bindable implementation -------------------------------------------------------------------------------------
- public boolean accept(final ServerMessage message)
+ public boolean accept(final ServerMessage message) throws Exception
{
if (filter != null && !filter.match(message))
{
@@ -164,6 +164,22 @@
}
else
{
+ int count = message.incrementRefCount();
+
+ if (count == 1)
+ {
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ store.addSize(message.getMemoryEstimate());
+ }
+
+ boolean durableRef = message.isDurable() && durable;
+
+ if (durableRef)
+ {
+ message.incrementDurableRefCount();
+ }
+
return true;
}
}
@@ -194,8 +210,10 @@
// If durable, must be persisted before anything is routed
MessageReference ref = message.createReference(this);
- addSizeToPaging(ref);
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+ store.addSize(ref.getMemoryEstimate());
+
Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
if (scheduledDeliveryTime != null)
@@ -210,6 +228,8 @@
if (!message.isStored())
{
storageManager.storeMessage(message);
+
+ message.setStored();
}
storageManager.storeReference(ref.getQueue().getPersistenceID(), message.getMessageID());
@@ -228,7 +248,9 @@
{
if (!message.isStored())
{
- storageManager.storeMessageTransactional(tx.getID(), message);
+ storageManager.storeMessageTransactional(tx.getID(), message);
+
+ message.setStored();
}
tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
@@ -243,8 +265,6 @@
getRefsOperation(tx).addRef(ref);
}
-
- message.setStored();
}
public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
@@ -255,15 +275,31 @@
MessageReference ref = message.createReference(this);
+ int count = message.incrementRefCount();
+
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ if (count == 1)
+ {
+ store.addSize(message.getMemoryEstimate());
+ }
+
+ store.addSize(ref.getMemoryEstimate());
+
+ boolean durableRef = message.isDurable() && durable;
+
+ if (durableRef)
+ {
+ message.incrementDurableRefCount();
+ }
+
Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
if (scheduledDeliveryTime != null)
{
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
}
-
- addSizeToPaging(ref);
-
+
if (tx == null)
{
addLast(ref);
@@ -863,20 +899,6 @@
// Private
// ------------------------------------------------------------------------------
- private void addSizeToPaging(final MessageReference ref) throws Exception
- {
- ServerMessage message = ref.getMessage();
-
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- if (!message.isStored())
- {
- store.addSize(message.getMemoryEstimate());
- }
-
- store.addSize(ref.getMemoryEstimate());
- }
-
private void move(final SimpleString toAddress, final MessageReference ref) throws Exception
{
move(toAddress, ref, false);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2009-01-20 11:22:15 UTC (rev 5666)
@@ -24,6 +24,7 @@
import java.util.concurrent.atomic.AtomicInteger;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.MessageReference;
@@ -41,6 +42,8 @@
*/
public class ServerMessageImpl extends MessageImpl implements ServerMessage
{
+ private static final Logger log = Logger.getLogger(ServerMessageImpl.class);
+
private final AtomicInteger durableRefCount = new AtomicInteger(0);
/** Global reference counts for paging control */
@@ -50,7 +53,6 @@
//We cache this
private volatile int memoryEstimate = -1;
-
/*
* Constructor for when reading from network
@@ -93,13 +95,13 @@
public MessageReference createReference(final Queue queue)
{
MessageReference ref = new MessageReferenceImpl(this, queue);
-
- if (durable && queue.isDurable())
- {
- durableRefCount.incrementAndGet();
- }
-
- refCount.incrementAndGet();
+//
+// if (durable && queue.isDurable())
+// {
+// durableRefCount.incrementAndGet();
+// }
+//
+// refCount.incrementAndGet();
return ref;
}
@@ -113,6 +115,16 @@
{
stored = true;
}
+
+ public int incrementRefCount()
+ {
+ return refCount.incrementAndGet();
+ }
+
+ public int incrementDurableRefCount()
+ {
+ return durableRefCount.incrementAndGet();
+ }
public int decrementDurableRefCount()
{
@@ -124,6 +136,11 @@
return refCount.decrementAndGet();
}
+ public int getRefCount()
+ {
+ return refCount.get();
+ }
+
public int getMemoryEstimate()
{
if (memoryEstimate == -1)
@@ -139,7 +156,11 @@
public ServerMessage copy()
{
- return new ServerMessageImpl(this);
+ ServerMessage m = new ServerMessageImpl(this);
+
+ log.info("created copy, new ref count is " + m.getRefCount());
+
+ return m;
}
@Override
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java 2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java 2009-01-20 11:22:15 UTC (rev 5666)
@@ -190,185 +190,187 @@
messagingService.stop();
}
-// public void testPersistentDivertRestartBeforeConsume() throws Exception
-// {
-// Configuration conf = createDefaultConfig();
-//
-// conf.setClustered(true);
-//
-// final String testAddress = "testAddress";
-//
-// final String forwardAddress1 = "forwardAddress1";
-//
-// final String forwardAddress2 = "forwardAddress2";
-//
-// final String forwardAddress3 = "forwardAddress3";
-//
-// DivertConfiguration divertConf1 = new DivertConfiguration("divert1", "divert1", testAddress, forwardAddress1, false, null, null);
-//
-// DivertConfiguration divertConf2 = new DivertConfiguration("divert2", "divert2", testAddress, forwardAddress2, false, null, null);
-//
-// DivertConfiguration divertConf3 = new DivertConfiguration("divert3", "divert3", testAddress, forwardAddress3, false, null, null);
-//
-// List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
-//
-// divertConfs.add(divertConf1);
-// divertConfs.add(divertConf2);
-// divertConfs.add(divertConf3);
-//
-// conf.setDivertConfigurations(divertConfs);
-//
-// MessagingService messagingService = MessagingServiceImpl.newMessagingService(conf);
-//
-// messagingService.start();
-//
-// ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-//
-// sf.setBlockOnPersistentSend(true);
-//
-// ClientSession session = sf.createSession(false, true, true);
-//
-// final SimpleString queueName1 = new SimpleString("queue1");
-//
-// final SimpleString queueName2 = new SimpleString("queue2");
-//
-// final SimpleString queueName3 = new SimpleString("queue3");
-//
-// final SimpleString queueName4 = new SimpleString("queue4");
-//
-// session.createQueue(new SimpleString(forwardAddress1), queueName1, null, true, false);
-//
-// session.createQueue(new SimpleString(forwardAddress2), queueName2, null, true, false);
-//
-// session.createQueue(new SimpleString(forwardAddress3), queueName3, null, true, false);
-//
-// session.createQueue(new SimpleString(testAddress), queueName4, null, true, false);
-//
-// ClientProducer producer = session.createProducer(new SimpleString(testAddress));
-//
-// final int numMessages = 10;
-//
-// final SimpleString propKey = new SimpleString("testkey");
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage message = session.createClientMessage(true);
-//
-// message.putIntProperty(propKey, i);
-//
-// producer.send(message);
-// }
-//
-// session.close();
-//
-// sf.close();
-//
-// messagingService.stop();
-//
-// messagingService.start();
-//
-// sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-//
-// sf.setBlockOnPersistentSend(true);
-//
-// session = sf.createSession(false, true, true);
-//
-// ClientConsumer consumer1 = session.createConsumer(queueName1);
-//
-// ClientConsumer consumer2 = session.createConsumer(queueName2);
-//
-// ClientConsumer consumer3 = session.createConsumer(queueName3);
-//
-// ClientConsumer consumer4 = session.createConsumer(queueName4);
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage message = consumer1.receive(200);
-//
-// assertNotNull(message);
-//
-// assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-//
-// message.acknowledge();
-// }
-//
-// assertNull(consumer1.receive(200));
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage message = consumer2.receive(200);
-//
-// assertNotNull(message);
-//
-// assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-//
-// message.acknowledge();
-// }
-//
-// assertNull(consumer2.receive(200));
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage message = consumer3.receive(200);
-//
-// assertNotNull(message);
-//
-// assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-//
-// message.acknowledge();
-// }
-//
-// assertNull(consumer3.receive(200));
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage message = consumer4.receive(200);
-//
-// assertNotNull(message);
-//
-// assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-//
-// message.acknowledge();
-// }
-//
-// assertNull(consumer4.receive(200));
-//
-// session.close();
-//
-// sf.close();
-//
-// messagingService.stop();
-//
-// messagingService.start();
-//
-// sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-//
-// sf.setBlockOnPersistentSend(true);
-//
-// session = sf.createSession(false, true, true);
-//
-// consumer1 = session.createConsumer(queueName1);
-//
-// consumer2 = session.createConsumer(queueName2);
-//
-// consumer3 = session.createConsumer(queueName3);
-//
-// consumer4 = session.createConsumer(queueName4);
-//
-// assertNull(consumer1.receive(200));
-//
-// assertNull(consumer2.receive(200));
-//
-// assertNull(consumer3.receive(200));
-//
-// assertNull(consumer4.receive(200));
-//
-// session.close();
-//
-// sf.close();
-//
-// messagingService.stop();
-// }
+ public void testPersistentDivertRestartBeforeConsume() throws Exception
+ {
+ Configuration conf = createDefaultConfig();
+
+ conf.setClustered(true);
+
+ final String testAddress = "testAddress";
+
+ final String forwardAddress1 = "forwardAddress1";
+
+ final String forwardAddress2 = "forwardAddress2";
+
+ final String forwardAddress3 = "forwardAddress3";
+
+ DivertConfiguration divertConf1 = new DivertConfiguration("divert1", "divert1", testAddress, forwardAddress1, false, null, null);
+
+ DivertConfiguration divertConf2 = new DivertConfiguration("divert2", "divert2", testAddress, forwardAddress2, false, null, null);
+
+ DivertConfiguration divertConf3 = new DivertConfiguration("divert3", "divert3", testAddress, forwardAddress3, false, null, null);
+
+ List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
+
+ divertConfs.add(divertConf1);
+ divertConfs.add(divertConf2);
+ divertConfs.add(divertConf3);
+
+ conf.setDivertConfigurations(divertConfs);
+
+ MessagingService messagingService = MessagingServiceImpl.newMessagingService(conf);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ sf.setBlockOnPersistentSend(true);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ final SimpleString queueName1 = new SimpleString("queue1");
+
+ final SimpleString queueName2 = new SimpleString("queue2");
+
+ final SimpleString queueName3 = new SimpleString("queue3");
+
+ final SimpleString queueName4 = new SimpleString("queue4");
+
+ session.createQueue(new SimpleString(forwardAddress1), queueName1, null, true, false);
+
+ session.createQueue(new SimpleString(forwardAddress2), queueName2, null, true, false);
+
+ session.createQueue(new SimpleString(forwardAddress3), queueName3, null, true, false);
+
+ session.createQueue(new SimpleString(testAddress), queueName4, null, true, false);
+
+ ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+ final int numMessages = 10;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+
+ message.putIntProperty(propKey, i);
+
+ producer.send(message);
+ }
+
+ session.close();
+
+ sf.close();
+
+ messagingService.stop();
+
+ messagingService.start();
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ sf.setBlockOnPersistentSend(true);
+
+ session = sf.createSession(false, true, true);
+
+ session.start();
+
+ ClientConsumer consumer1 = session.createConsumer(queueName1);
+
+ ClientConsumer consumer2 = session.createConsumer(queueName2);
+
+ ClientConsumer consumer3 = session.createConsumer(queueName3);
+
+ ClientConsumer consumer4 = session.createConsumer(queueName4);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer2.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer2.receive(200));
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer3.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer3.receive(200));
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer4.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer4.receive(200));
+
+ session.close();
+
+ sf.close();
+
+ messagingService.stop();
+
+ messagingService.start();
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ sf.setBlockOnPersistentSend(true);
+
+ session = sf.createSession(false, true, true);
+
+ consumer1 = session.createConsumer(queueName1);
+
+ consumer2 = session.createConsumer(queueName2);
+
+ consumer3 = session.createConsumer(queueName3);
+
+ consumer4 = session.createConsumer(queueName4);
+
+ assertNull(consumer1.receive(200));
+
+ assertNull(consumer2.receive(200));
+
+ assertNull(consumer3.receive(200));
+
+ assertNull(consumer4.receive(200));
+
+ session.close();
+
+ sf.close();
+
+ messagingService.stop();
+ }
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java 2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java 2009-01-20 11:22:15 UTC (rev 5666)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -43,6 +44,8 @@
*/
public class ExpiryAddressTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(ExpiryAddressTest.class);
+
private MessagingService messagingService;
private ClientSession clientSession;
@@ -119,6 +122,7 @@
assertNotNull(m);
+ log.info("acking");
m.acknowledge();
assertEquals(m.getBody().getString(), "heyho!");
@@ -131,6 +135,7 @@
assertNotNull(m);
+ log.info("acking");
m.acknowledge();
assertEquals(m.getBody().getString(), "heyho!");
Added: trunk/tests/src/org/jboss/messaging/tests/integration/queue/PredefinedQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/PredefinedQueueTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/PredefinedQueueTest.java 2009-01-20 11:22:15 UTC (rev 5666)
@@ -0,0 +1,485 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.QueueConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Bindings;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A PredefinedQueueTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 19 Jan 2009 15:44:52
+ *
+ *
+ */
+public class PredefinedQueueTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(PredefinedQueueTest.class);
+
+ public void testFailOnCreatePredefinedQueues() throws Exception
+ {
+ Configuration conf = createDefaultConfig();
+
+ final String testAddress = "testAddress";
+
+ final String queueName1 = "queue1";
+
+ final String queueName2 = "queue2";
+
+ final String queueName3 = "queue3";
+
+ QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, true);
+
+ QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName2, null, true);
+
+ QueueConfiguration queue3 = new QueueConfiguration(testAddress, queueName3, null, true);
+
+ List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+
+ queueConfs.add(queue1);
+ queueConfs.add(queue2);
+ queueConfs.add(queue3);
+
+ conf.setQueueConfigurations(queueConfs);
+
+ MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingService(conf);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ try
+ {
+ session.createQueue(testAddress, queueName1, null, false, false);
+
+ fail("Should throw exception");
+ }
+ catch (MessagingException me)
+ {
+ assertEquals(MessagingException.QUEUE_EXISTS, me.getCode());
+ }
+ try
+ {
+ session.createQueue(testAddress, queueName2, null, false, false);
+
+ fail("Should throw exception");
+ }
+ catch (MessagingException me)
+ {
+ assertEquals(MessagingException.QUEUE_EXISTS, me.getCode());
+ }
+ try
+ {
+ session.createQueue(testAddress, queueName3, null, false, false);
+
+ fail("Should throw exception");
+ }
+ catch (MessagingException me)
+ {
+ assertEquals(MessagingException.QUEUE_EXISTS, me.getCode());
+ }
+
+ session.close();
+
+ sf.close();
+
+ messagingService.stop();
+ }
+
+ public void testDeploySameNames() throws Exception
+ {
+ Configuration conf = createDefaultConfig();
+
+ final String testAddress = "testAddress";
+
+ final String queueName1 = "queue1";
+
+ final String queueName2 = "queue2";
+
+ QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, true);
+
+ QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName1, null, true);
+
+ QueueConfiguration queue3 = new QueueConfiguration(testAddress, queueName2, null, true);
+
+ List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+
+ queueConfs.add(queue1);
+ queueConfs.add(queue2);
+ queueConfs.add(queue3);
+
+ conf.setQueueConfigurations(queueConfs);
+
+ MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingService(conf);
+
+ messagingService.start();
+
+ Bindings bindings = messagingService.getServer().getPostOffice().getBindingsForAddress(new SimpleString(testAddress));
+
+ assertEquals(2, bindings.getBindings().size());
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.start();
+
+ ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session.createConsumer(queueName1);
+
+ ClientConsumer consumer2 = session.createConsumer(queueName2);
+
+ final int numMessages = 10;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
+ assertNotNull(message);
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+ message.acknowledge();
+
+ message = consumer2.receive(200);
+ assertNotNull(message);
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+ assertNull(consumer2.receive(200));
+
+ session.close();
+
+ sf.close();
+
+ messagingService.stop();
+ }
+
+ public void testDeployPreexistingQueues() throws Exception
+ {
+ Configuration conf = createDefaultConfig();
+
+ final String testAddress = "testAddress";
+
+ final String queueName1 = "queue1";
+
+ final String queueName2 = "queue2";
+
+ final String queueName3 = "queue3";
+
+ MessagingService messagingService = MessagingServiceImpl.newMessagingService(conf);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(testAddress, queueName1, null, true, false);
+
+ session.createQueue(testAddress, queueName2, null, true, false);
+
+ session.createQueue(testAddress, queueName3, null, true, false);
+
+ session.close();
+
+ sf.close();
+
+ messagingService.stop();
+
+ QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, true);
+
+ QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName2, null, true);
+
+ QueueConfiguration queue3 = new QueueConfiguration(testAddress, queueName3, null, true);
+
+ List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+
+ queueConfs.add(queue1);
+ queueConfs.add(queue2);
+ queueConfs.add(queue3);
+
+ conf.setQueueConfigurations(queueConfs);
+
+ messagingService.start();
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ session = sf.createSession(false, true, true);
+
+ session.start();
+
+ ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session.createConsumer(queueName1);
+
+ ClientConsumer consumer2 = session.createConsumer(queueName2);
+
+ ClientConsumer consumer3 = session.createConsumer(queueName3);
+
+ final int numMessages = 10;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
+ assertNotNull(message);
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+ message.acknowledge();
+
+ message = consumer2.receive(200);
+ assertNotNull(message);
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+ message.acknowledge();
+
+ message = consumer3.receive(200);
+ assertNotNull(message);
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+ assertNull(consumer2.receive(200));
+ assertNull(consumer3.receive(200));
+
+ session.close();
+
+ sf.close();
+
+ messagingService.stop();
+ }
+
+ public void testDurableNonDurable() throws Exception
+ {
+ Configuration conf = createDefaultConfig();
+
+ final String testAddress = "testAddress";
+
+ final String queueName1 = "queue1";
+
+ final String queueName2 = "queue2";
+
+ QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, false);
+
+ QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName2, null, true);
+
+ List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+
+ queueConfs.add(queue1);
+ queueConfs.add(queue2);
+
+ conf.setQueueConfigurations(queueConfs);
+
+ MessagingService messagingService = MessagingServiceImpl.newMessagingService(conf);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final int numMessages = 1;
+
+ log.info("sending messages");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+
+ message.putIntProperty(propKey, i);
+
+ producer.send(message);
+ }
+
+ session.close();
+
+ log.info("stopping");
+
+ sf.close();
+
+ messagingService.stop();
+
+ messagingService.start();
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ session = sf.createSession(false, true, true);
+
+ session.start();
+
+ ClientConsumer consumer1 = session.createConsumer(queueName1);
+
+ ClientConsumer consumer2 = session.createConsumer(queueName2);
+
+ ClientMessage message = consumer1.receive(200);
+
+ assertNull(message);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ message = consumer2.receive(200);
+ assertNotNull(message);
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+ assertNull(consumer2.receive(200));
+
+ session.close();
+
+ sf.close();
+
+ messagingService.stop();
+ }
+
+
+ public void testDeployWithFilter() throws Exception
+ {
+ Configuration conf = createDefaultConfig();
+
+ final String testAddress = "testAddress";
+
+ final String queueName1 = "queue1";
+
+ final String filter = "cheese='camembert'";
+
+ QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, filter, false);
+
+ List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+
+ queueConfs.add(queue1);
+
+ conf.setQueueConfigurations(queueConfs);
+
+ MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingService(conf);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final int numMessages = 1;
+
+ log.info("sending messages");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+
+ message.putStringProperty(new SimpleString("cheese"), new SimpleString("camembert"));
+
+ message.putIntProperty(propKey, i);
+
+ producer.send(message);
+ }
+
+ session.start();
+
+ ClientConsumer consumer1 = session.createConsumer(queueName1);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
+ assertNotNull(message);
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+
+ message.putStringProperty(new SimpleString("cheese"), new SimpleString("roquefort"));
+
+ message.putIntProperty(propKey, i);
+
+ producer.send(message);
+ }
+
+ assertNull(consumer1.receive(200));
+
+ session.close();
+
+ sf.close();
+
+ messagingService.stop();
+ }
+
+
+}
More information about the jboss-cvs-commits
mailing list