[jboss-cvs] JBoss Messaging SVN: r5666 - in trunk: src/main/org/jboss/messaging/core/persistence/impl/journal and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jan 20 06:22:15 EST 2009


Author: timfox
Date: 2009-01-20 06:22:15 -0500 (Tue, 20 Jan 2009)
New Revision: 5666

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/queue/PredefinedQueueTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/server/Bindable.java
   trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
   trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
Log:
more tweaks, fixes etc

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-20 11:22:15 UTC (rev 5666)
@@ -223,6 +223,7 @@
 
    public boolean addSize(final long size) throws Exception
    {
+      //log.info("Adding size " + size);
       final long maxSize = getMaxSizeBytes();
 
       final long pageSize = getPageSizeBytes();
@@ -790,7 +791,7 @@
          message = pagedMessage.getMessage(storageManager);
 
          final long transactionIdDuringPaging = pagedMessage.getTransactionID();
-         
+
          if (transactionIdDuringPaging >= 0)
          {
             final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging);

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-20 11:22:15 UTC (rev 5666)
@@ -649,6 +649,11 @@
             MessageReference ref = queue.reroute(record.message, null);
             
             ref.setDeliveryCount(record.deliveryCount);
+            
+            if (scheduledDeliveryTime != 0)
+            {
+               record.message.removeProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+            }
          }
       }
 
@@ -751,6 +756,7 @@
                      throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
                   }
 
+                  //TODO - this involves a scan - we should find a quicker qay of doing it
                   MessageReference removed = queue.removeReferenceWithID(messageID);
 
                   referencesToAck.add(removed);

Modified: trunk/src/main/org/jboss/messaging/core/server/Bindable.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Bindable.java	2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/Bindable.java	2009-01-20 11:22:15 UTC (rev 5666)
@@ -42,7 +42,7 @@
    
    SimpleString getRoutingName();
    
-   boolean accept(ServerMessage message);
+   boolean accept(ServerMessage message) throws Exception;
    
    boolean isExclusive();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java	2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java	2009-01-20 11:22:15 UTC (rev 5666)
@@ -39,6 +39,10 @@
 
    MessageReference createReference(Queue queue);
 
+   int incrementRefCount();
+   
+   int incrementDurableRefCount();
+   
    int decrementDurableRefCount();
    
    int decrementRefCount();
@@ -50,5 +54,7 @@
    void setStored();
    
    boolean isStored();
+   
+   int getRefCount();
 }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java	2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java	2009-01-20 11:22:15 UTC (rev 5666)
@@ -53,11 +53,11 @@
    private final SimpleString uniqueName;
 
    private final SimpleString routingName;
-   
+
    private final boolean exclusive;
-   
+
    private final Filter filter;
-   
+
    private final Transformer transformer;
 
    public DivertImpl(final SimpleString forwardAddress,
@@ -69,28 +69,40 @@
                      final PostOffice postOffice)
    {
       this.forwardAddress = forwardAddress;
-      
+
       this.uniqueName = uniqueName;
-      
+
       this.routingName = routingName;
-      
+
       this.exclusive = exclusive;
-      
+
       this.filter = filter;
-      
+
       this.transformer = transformer;
 
       this.postOffice = postOffice;
    }
 
-   public void route(ServerMessage message, final Transaction tx) throws Exception
+   public boolean accept(final ServerMessage message) throws Exception
    {
+      if (filter != null && !filter.match(message))
+      {
+         return false;
+      }
+      else
+      {
+         return true;
+      }
+   }
+
+   public void route(ServerMessage message, final Transaction tx) throws Exception
+   {      
       SimpleString originalDestination = message.getDestination();
 
       message.setDestination(forwardAddress);
 
       message.putStringProperty(HDR_ORIGINAL_DESTINATION, originalDestination);
-      
+
       if (transformer != null)
       {
          message = transformer.transform(message);
@@ -99,18 +111,6 @@
       postOffice.route(message, tx);
    }
 
-   public boolean accept(final ServerMessage message)
-   {
-      if (filter != null && !filter.match(message))
-      {
-         return false;
-      }
-      else
-      {
-         return true;
-      }
-   }
-
    public SimpleString getRoutingName()
    {
       return routingName;
@@ -120,11 +120,9 @@
    {
       return uniqueName;
    }
-   
+
    public boolean isExclusive()
    {
       return exclusive;
    }
-
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-20 11:22:15 UTC (rev 5666)
@@ -808,6 +808,8 @@
             Queue queue = queueFactory.createQueue(-1, name, filter, config.isDurable(), false);
 
             QueueBinding queueBinding = new QueueBindingImpl(new SimpleString(config.getAddress()), queue);
+            
+            binding = queueBinding;
 
             postOffice.addBinding(binding);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-20 11:22:15 UTC (rev 5666)
@@ -156,7 +156,7 @@
 
    // Bindable implementation -------------------------------------------------------------------------------------
 
-   public boolean accept(final ServerMessage message)
+   public boolean accept(final ServerMessage message) throws Exception
    {
       if (filter != null && !filter.match(message))
       {
@@ -164,6 +164,22 @@
       }
       else
       {
+         int count = message.incrementRefCount();
+                                     
+         if (count == 1)
+         {
+            PagingStore store = pagingManager.getPageStore(message.getDestination());
+            
+            store.addSize(message.getMemoryEstimate());
+         }
+       
+         boolean durableRef = message.isDurable() && durable;
+         
+         if (durableRef)
+         {
+            message.incrementDurableRefCount();
+         }
+         
          return true;
       }
    }
@@ -194,8 +210,10 @@
       // If durable, must be persisted before anything is routed
       MessageReference ref = message.createReference(this);
       
-      addSizeToPaging(ref);
+      PagingStore store = pagingManager.getPageStore(message.getDestination());
       
+      store.addSize(ref.getMemoryEstimate());
+      
       Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
       
       if (scheduledDeliveryTime != null)
@@ -210,6 +228,8 @@
             if (!message.isStored())
             {
                storageManager.storeMessage(message);
+               
+               message.setStored();
             }
             
             storageManager.storeReference(ref.getQueue().getPersistenceID(), message.getMessageID());
@@ -228,7 +248,9 @@
          {
             if (!message.isStored())
             {                  
-               storageManager.storeMessageTransactional(tx.getID(), message);                                               
+               storageManager.storeMessageTransactional(tx.getID(), message);   
+               
+               message.setStored();
             }
             
             tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);        
@@ -243,8 +265,6 @@
 
          getRefsOperation(tx).addRef(ref);
       }
-
-      message.setStored();
    }
    
    public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
@@ -255,15 +275,31 @@
    
       MessageReference ref = message.createReference(this);
       
+      int count = message.incrementRefCount();
+      
+      PagingStore store = pagingManager.getPageStore(message.getDestination());
+      
+      if (count == 1)
+      {
+         store.addSize(message.getMemoryEstimate());
+      }
+
+      store.addSize(ref.getMemoryEstimate());
+      
+      boolean durableRef = message.isDurable() && durable;
+      
+      if (durableRef)
+      {
+         message.incrementDurableRefCount();
+      }
+      
       Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
       
       if (scheduledDeliveryTime != null)
       {
          ref.setScheduledDeliveryTime(scheduledDeliveryTime);
       }
-      
-      addSizeToPaging(ref);
-      
+                       
       if (tx == null)
       {
          addLast(ref);
@@ -863,20 +899,6 @@
    // Private
    // ------------------------------------------------------------------------------
 
-   private void addSizeToPaging(final MessageReference ref) throws Exception
-   {
-      ServerMessage message = ref.getMessage();
-      
-      PagingStore store = pagingManager.getPageStore(message.getDestination());
-      
-      if (!message.isStored())
-      {
-         store.addSize(message.getMemoryEstimate());
-      }
-
-      store.addSize(ref.getMemoryEstimate());
-   }
-     
    private void move(final SimpleString toAddress, final MessageReference ref) throws Exception
    {
       move(toAddress, ref, false);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2009-01-20 11:22:15 UTC (rev 5666)
@@ -24,6 +24,7 @@
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.MessageReference;
@@ -41,6 +42,8 @@
  */
 public class ServerMessageImpl extends MessageImpl implements ServerMessage
 {
+   private static final Logger log = Logger.getLogger(ServerMessageImpl.class);
+   
    private final AtomicInteger durableRefCount = new AtomicInteger(0);
 
    /** Global reference counts for paging control */
@@ -50,7 +53,6 @@
    
    //We cache this
    private volatile int memoryEstimate = -1;
-
    
    /*
     * Constructor for when reading from network
@@ -93,13 +95,13 @@
    public MessageReference createReference(final Queue queue)
    {
       MessageReference ref = new MessageReferenceImpl(this, queue);
-
-      if (durable && queue.isDurable())
-      {
-         durableRefCount.incrementAndGet();
-      }
-      
-      refCount.incrementAndGet();
+//
+//      if (durable && queue.isDurable())
+//      {
+//         durableRefCount.incrementAndGet();
+//      }
+//      
+//      refCount.incrementAndGet();
     
       return ref;
    }
@@ -113,6 +115,16 @@
    {
       stored = true;
    }
+   
+   public int incrementRefCount()
+   {
+      return refCount.incrementAndGet();
+   }
+   
+   public int incrementDurableRefCount()
+   {
+      return durableRefCount.incrementAndGet();
+   }
 
    public int decrementDurableRefCount()
    {
@@ -124,6 +136,11 @@
       return refCount.decrementAndGet();
    }
    
+   public int getRefCount()
+   {
+      return refCount.get();
+   }
+   
    public int getMemoryEstimate()
    {
       if (memoryEstimate == -1)
@@ -139,7 +156,11 @@
 
    public ServerMessage copy()
    {
-      return new ServerMessageImpl(this);
+      ServerMessage m = new ServerMessageImpl(this);
+      
+      log.info("created copy, new ref count is " + m.getRefCount());
+      
+      return m;
    }
 
    @Override

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java	2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/divert/PersistentDivertTest.java	2009-01-20 11:22:15 UTC (rev 5666)
@@ -190,185 +190,187 @@
       messagingService.stop();
    }
    
-//   public void testPersistentDivertRestartBeforeConsume() throws Exception
-//   {
-//      Configuration conf = createDefaultConfig();
-//      
-//      conf.setClustered(true);
-//      
-//      final String testAddress = "testAddress";
-//      
-//      final String forwardAddress1 = "forwardAddress1";
-//      
-//      final String forwardAddress2 = "forwardAddress2";
-//      
-//      final String forwardAddress3 = "forwardAddress3";
-//      
-//      DivertConfiguration divertConf1 = new DivertConfiguration("divert1", "divert1", testAddress, forwardAddress1, false, null, null);
-//      
-//      DivertConfiguration divertConf2 = new DivertConfiguration("divert2", "divert2", testAddress, forwardAddress2, false, null, null);
-//      
-//      DivertConfiguration divertConf3 = new DivertConfiguration("divert3", "divert3", testAddress, forwardAddress3, false, null, null);
-//      
-//      List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
-//      
-//      divertConfs.add(divertConf1);
-//      divertConfs.add(divertConf2);
-//      divertConfs.add(divertConf3);
-//      
-//      conf.setDivertConfigurations(divertConfs);
-//      
-//      MessagingService messagingService = MessagingServiceImpl.newMessagingService(conf);
-//           
-//      messagingService.start();
-//      
-//      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-//      
-//      sf.setBlockOnPersistentSend(true);
-//
-//      ClientSession session = sf.createSession(false, true, true);
-//      
-//      final SimpleString queueName1 = new SimpleString("queue1");
-//      
-//      final SimpleString queueName2 = new SimpleString("queue2");
-//      
-//      final SimpleString queueName3 = new SimpleString("queue3");
-//      
-//      final SimpleString queueName4 = new SimpleString("queue4");
-//      
-//      session.createQueue(new SimpleString(forwardAddress1), queueName1, null, true, false);
-//      
-//      session.createQueue(new SimpleString(forwardAddress2), queueName2, null, true, false);
-//      
-//      session.createQueue(new SimpleString(forwardAddress3), queueName3, null, true, false);
-//      
-//      session.createQueue(new SimpleString(testAddress), queueName4, null, true, false);
-//
-//      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
-//           
-//      final int numMessages = 10;
-//      
-//      final SimpleString propKey = new SimpleString("testkey");
-//      
-//      for (int i = 0; i < numMessages; i++)
-//      {
-//         ClientMessage message = session.createClientMessage(true);
-//         
-//         message.putIntProperty(propKey, i);
-//         
-//         producer.send(message);
-//      }
-//      
-//      session.close();
-//      
-//      sf.close();
-//      
-//      messagingService.stop();
-//      
-//      messagingService.start();
-//      
-//      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-//      
-//      sf.setBlockOnPersistentSend(true);
-//
-//      session = sf.createSession(false, true, true);
-//      
-//      ClientConsumer consumer1 = session.createConsumer(queueName1);
-//      
-//      ClientConsumer consumer2 = session.createConsumer(queueName2);
-//      
-//      ClientConsumer consumer3 = session.createConsumer(queueName3);
-//      
-//      ClientConsumer consumer4 = session.createConsumer(queueName4);
-//      
-//      for (int i = 0; i < numMessages; i++)
-//      {
-//         ClientMessage message = consumer1.receive(200);
-//         
-//         assertNotNull(message);
-//         
-//         assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-//         
-//         message.acknowledge();
-//      }
-//      
-//      assertNull(consumer1.receive(200));
-//      
-//      for (int i = 0; i < numMessages; i++)
-//      {
-//         ClientMessage message = consumer2.receive(200);
-//         
-//         assertNotNull(message);
-//         
-//         assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-//         
-//         message.acknowledge();
-//      }
-//      
-//      assertNull(consumer2.receive(200));
-//      
-//      for (int i = 0; i < numMessages; i++)
-//      {
-//         ClientMessage message = consumer3.receive(200);
-//         
-//         assertNotNull(message);
-//         
-//         assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-//         
-//         message.acknowledge();
-//      }
-//      
-//      assertNull(consumer3.receive(200));
-//      
-//      for (int i = 0; i < numMessages; i++)
-//      {
-//         ClientMessage message = consumer4.receive(200);
-//         
-//         assertNotNull(message);
-//         
-//         assertEquals((Integer)i, (Integer)message.getProperty(propKey));
-//         
-//         message.acknowledge();
-//      }
-//      
-//      assertNull(consumer4.receive(200));
-//                 
-//      session.close();
-//      
-//      sf.close();
-//      
-//      messagingService.stop();
-//      
-//      messagingService.start();
-//      
-//      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-//      
-//      sf.setBlockOnPersistentSend(true);
-//
-//      session = sf.createSession(false, true, true);
-//      
-//      consumer1 = session.createConsumer(queueName1);
-//      
-//      consumer2 = session.createConsumer(queueName2);
-//      
-//      consumer3 = session.createConsumer(queueName3);
-//      
-//      consumer4 = session.createConsumer(queueName4);
-//            
-//      assertNull(consumer1.receive(200));
-//      
-//      assertNull(consumer2.receive(200));
-//      
-//      assertNull(consumer3.receive(200));
-//      
-//      assertNull(consumer4.receive(200));
-//      
-//      session.close();
-//      
-//      sf.close();
-//      
-//      messagingService.stop();
-//   }
+   public void testPersistentDivertRestartBeforeConsume() throws Exception
+   {
+      Configuration conf = createDefaultConfig();
+      
+      conf.setClustered(true);
+      
+      final String testAddress = "testAddress";
+      
+      final String forwardAddress1 = "forwardAddress1";
+      
+      final String forwardAddress2 = "forwardAddress2";
+      
+      final String forwardAddress3 = "forwardAddress3";
+      
+      DivertConfiguration divertConf1 = new DivertConfiguration("divert1", "divert1", testAddress, forwardAddress1, false, null, null);
+      
+      DivertConfiguration divertConf2 = new DivertConfiguration("divert2", "divert2", testAddress, forwardAddress2, false, null, null);
+      
+      DivertConfiguration divertConf3 = new DivertConfiguration("divert3", "divert3", testAddress, forwardAddress3, false, null, null);
+      
+      List<DivertConfiguration> divertConfs = new ArrayList<DivertConfiguration>();
+      
+      divertConfs.add(divertConf1);
+      divertConfs.add(divertConf2);
+      divertConfs.add(divertConf3);
+      
+      conf.setDivertConfigurations(divertConfs);
+      
+      MessagingService messagingService = MessagingServiceImpl.newMessagingService(conf);
+           
+      messagingService.start();
+      
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      
+      sf.setBlockOnPersistentSend(true);
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      final SimpleString queueName1 = new SimpleString("queue1");
+      
+      final SimpleString queueName2 = new SimpleString("queue2");
+      
+      final SimpleString queueName3 = new SimpleString("queue3");
+      
+      final SimpleString queueName4 = new SimpleString("queue4");
+      
+      session.createQueue(new SimpleString(forwardAddress1), queueName1, null, true, false);
+      
+      session.createQueue(new SimpleString(forwardAddress2), queueName2, null, true, false);
+      
+      session.createQueue(new SimpleString(forwardAddress3), queueName3, null, true, false);
+      
+      session.createQueue(new SimpleString(testAddress), queueName4, null, true, false);
+
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+           
+      final int numMessages = 10;
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(true);
+         
+         message.putIntProperty(propKey, i);
+         
+         producer.send(message);
+      }
+      
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+      
+      messagingService.start();
+      
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      
+      sf.setBlockOnPersistentSend(true);
+
+      session = sf.createSession(false, true, true);
+      
+      session.start();
+      
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+      
+      ClientConsumer consumer2 = session.createConsumer(queueName2);
+      
+      ClientConsumer consumer3 = session.createConsumer(queueName3);
+      
+      ClientConsumer consumer4 = session.createConsumer(queueName4);
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(200);
+         
+         assertNotNull(message);
+         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer1.receive(200));
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer2.receive(200);
+         
+         assertNotNull(message);
+         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer2.receive(200));
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer3.receive(200);
+         
+         assertNotNull(message);
+         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer3.receive(200));
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer4.receive(200);
+         
+         assertNotNull(message);
+         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer4.receive(200));
+                 
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+      
+      messagingService.start();
+      
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      
+      sf.setBlockOnPersistentSend(true);
+
+      session = sf.createSession(false, true, true);
+      
+      consumer1 = session.createConsumer(queueName1);
+      
+      consumer2 = session.createConsumer(queueName2);
+      
+      consumer3 = session.createConsumer(queueName3);
+      
+      consumer4 = session.createConsumer(queueName4);
+            
+      assertNull(consumer1.receive(200));
+      
+      assertNull(consumer2.receive(200));
+      
+      assertNull(consumer3.receive(200));
+      
+      assertNull(consumer4.receive(200));
+      
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+   }
    
 
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java	2009-01-20 09:04:09 UTC (rev 5665)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java	2009-01-20 11:22:15 UTC (rev 5666)
@@ -32,6 +32,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -43,6 +44,8 @@
  */
 public class ExpiryAddressTest extends UnitTestCase
 {
+   private static final Logger log = Logger.getLogger(ExpiryAddressTest.class);
+
    private MessagingService messagingService;
 
    private ClientSession clientSession;
@@ -119,6 +122,7 @@
       
       assertNotNull(m);
       
+      log.info("acking");
       m.acknowledge();
       
       assertEquals(m.getBody().getString(), "heyho!");
@@ -131,6 +135,7 @@
       
       assertNotNull(m);
       
+      log.info("acking");
       m.acknowledge();
       
       assertEquals(m.getBody().getString(), "heyho!");

Added: trunk/tests/src/org/jboss/messaging/tests/integration/queue/PredefinedQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/PredefinedQueueTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/PredefinedQueueTest.java	2009-01-20 11:22:15 UTC (rev 5666)
@@ -0,0 +1,485 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.QueueConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Bindings;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A PredefinedQueueTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 19 Jan 2009 15:44:52
+ *
+ *
+ */
+public class PredefinedQueueTest extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(PredefinedQueueTest.class);
+
+   public void testFailOnCreatePredefinedQueues() throws Exception
+   {
+      Configuration conf = createDefaultConfig();
+      
+      final String testAddress = "testAddress";
+      
+      final String queueName1 = "queue1";
+      
+      final String queueName2 = "queue2";
+      
+      final String queueName3 = "queue3";
+      
+      QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, true);
+      
+      QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName2, null, true);
+      
+      QueueConfiguration queue3 = new QueueConfiguration(testAddress, queueName3, null, true);
+      
+      List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+      
+      queueConfs.add(queue1);
+      queueConfs.add(queue2);
+      queueConfs.add(queue3);
+      
+      conf.setQueueConfigurations(queueConfs);
+      
+      MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingService(conf);
+           
+      messagingService.start();
+      
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      try
+      {
+         session.createQueue(testAddress, queueName1, null, false, false);
+         
+         fail("Should throw exception");
+      }
+      catch (MessagingException me)
+      {
+         assertEquals(MessagingException.QUEUE_EXISTS, me.getCode());
+      }
+      try
+      {
+         session.createQueue(testAddress, queueName2, null, false, false);
+         
+         fail("Should throw exception");
+      }
+      catch (MessagingException me)
+      {
+         assertEquals(MessagingException.QUEUE_EXISTS, me.getCode());
+      }
+      try
+      {
+         session.createQueue(testAddress, queueName3, null, false, false);
+         
+         fail("Should throw exception");
+      }
+      catch (MessagingException me)
+      {
+         assertEquals(MessagingException.QUEUE_EXISTS, me.getCode());
+      }
+            
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+   }
+   
+   public void testDeploySameNames() throws Exception
+   {
+      Configuration conf = createDefaultConfig();
+      
+      final String testAddress = "testAddress";
+      
+      final String queueName1 = "queue1";
+      
+      final String queueName2 = "queue2";
+      
+      QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, true);
+      
+      QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName1, null, true);
+      
+      QueueConfiguration queue3 = new QueueConfiguration(testAddress, queueName2, null, true);
+      
+      List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+      
+      queueConfs.add(queue1);
+      queueConfs.add(queue2);
+      queueConfs.add(queue3);
+      
+      conf.setQueueConfigurations(queueConfs);
+      
+      MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingService(conf);
+           
+      messagingService.start();
+      
+      Bindings bindings = messagingService.getServer().getPostOffice().getBindingsForAddress(new SimpleString(testAddress));
+      
+      assertEquals(2, bindings.getBindings().size());
+      
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+            
+      session.start();
+      
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+      
+      ClientConsumer consumer2 = session.createConsumer(queueName2);
+      
+      final int numMessages = 10;
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(false);
+         
+         message.putIntProperty(propKey, i);
+         
+         producer.send(message);
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+         
+         message = consumer2.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+   }
+   
+   public void testDeployPreexistingQueues() throws Exception
+   {
+      Configuration conf = createDefaultConfig();
+      
+      final String testAddress = "testAddress";
+      
+      final String queueName1 = "queue1";
+      
+      final String queueName2 = "queue2";
+      
+      final String queueName3 = "queue3";
+                 
+      MessagingService messagingService = MessagingServiceImpl.newMessagingService(conf);
+           
+      messagingService.start();
+      
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      session.createQueue(testAddress, queueName1, null, true, false);
+        
+      session.createQueue(testAddress, queueName2, null, true, false);
+         
+      session.createQueue(testAddress, queueName3, null, true, false);
+      
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+      
+      QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, true);
+      
+      QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName2, null, true);
+      
+      QueueConfiguration queue3 = new QueueConfiguration(testAddress, queueName3, null, true);
+      
+      List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+      
+      queueConfs.add(queue1);
+      queueConfs.add(queue2);
+      queueConfs.add(queue3);
+      
+      conf.setQueueConfigurations(queueConfs);
+      
+      messagingService.start();
+      
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      session = sf.createSession(false, true, true);
+      
+      session.start();
+      
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+      
+      ClientConsumer consumer2 = session.createConsumer(queueName2);
+      
+      ClientConsumer consumer3 = session.createConsumer(queueName3);
+      
+      final int numMessages = 10;
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(false);
+         
+         message.putIntProperty(propKey, i);
+         
+         producer.send(message);
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+         
+         message = consumer2.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+         
+         message = consumer3.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+      assertNull(consumer3.receive(200));
+      
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+   }
+   
+   public void testDurableNonDurable() throws Exception
+   {
+      Configuration conf = createDefaultConfig();
+      
+      final String testAddress = "testAddress";
+      
+      final String queueName1 = "queue1";
+      
+      final String queueName2 = "queue2";
+      
+      QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, false);
+      
+      QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName2, null, true);
+      
+      List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+      
+      queueConfs.add(queue1);
+      queueConfs.add(queue2);
+      
+      conf.setQueueConfigurations(queueConfs);
+      
+      MessagingService messagingService = MessagingServiceImpl.newMessagingService(conf);
+           
+      messagingService.start();
+      
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      final int numMessages = 1;
+            
+      log.info("sending messages");
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(true);
+         
+         message.putIntProperty(propKey, i);
+         
+         producer.send(message);
+      }
+      
+      session.close();
+      
+      log.info("stopping");
+      
+      sf.close();
+      
+      messagingService.stop();
+      
+      messagingService.start();
+      
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      session = sf.createSession(false, true, true);
+      
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+      
+      ClientConsumer consumer2 = session.createConsumer(queueName2);
+      
+      ClientMessage message = consumer1.receive(200);  
+      
+      assertNull(message);
+            
+      for (int i = 0; i < numMessages; i++)
+      {
+         message = consumer2.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+   }
+   
+   
+   public void testDeployWithFilter() throws Exception
+   {
+      Configuration conf = createDefaultConfig();
+      
+      final String testAddress = "testAddress";
+      
+      final String queueName1 = "queue1";
+      
+      final String filter = "cheese='camembert'";
+      
+      QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, filter, false);
+      
+      List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+      
+      queueConfs.add(queue1);
+
+      conf.setQueueConfigurations(queueConfs);
+      
+      MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingService(conf);
+           
+      messagingService.start();
+      
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      final int numMessages = 1;
+            
+      log.info("sending messages");
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(true);
+         
+         message.putStringProperty(new SimpleString("cheese"), new SimpleString("camembert"));
+         
+         message.putIntProperty(propKey, i);
+         
+         producer.send(message);
+      }
+            
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer1.receive(200));
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(true);
+         
+         message.putStringProperty(new SimpleString("cheese"), new SimpleString("roquefort"));
+         
+         message.putIntProperty(propKey, i);
+         
+         producer.send(message);
+      }
+      
+      assertNull(consumer1.receive(200));
+            
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+   }
+  
+   
+}




More information about the jboss-cvs-commits mailing list