[hornetq-commits] JBoss hornetq SVN: r7924 - in trunk: src/main/org/hornetq/core/remoting/impl and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 27 11:22:40 EDT 2009


Author: timfox
Date: 2009-08-27 11:22:39 -0400 (Thu, 27 Aug 2009)
New Revision: 7924

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
   trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java
Log:
Fixed various issues with pre-ack

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -101,8 +101,6 @@
 
    private boolean stopped = false;
 
-   private final boolean preAcknowledge;
-
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -112,8 +110,7 @@
                              final int ackBatchSize,
                              final TokenBucketLimiter rateLimiter,
                              final Executor executor,
-                             final Channel channel,
-                             final boolean preAcknowledge)
+                             final Channel channel)
    {
       this.id = id;
 
@@ -128,8 +125,6 @@
       this.clientWindowSize = clientWindowSize;
 
       this.ackBatchSize = ackBatchSize;
-
-      this.preAcknowledge = preAcknowledge;
    }
 
    // ClientConsumer implementation
@@ -214,7 +209,8 @@
             if (m != null)
             {
                // if we have already pre acked we cant expire
-               boolean expired = !preAcknowledge && m.isExpired();
+               boolean expired = m.isExpired();
+               
                flowControlBeforeConsumption(m);
 
                if (expired)

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -232,7 +232,7 @@
    {
       internalCreateQueue(address, queueName, null, false, false);
    }
-   
+
    public void createQueue(final SimpleString address, final SimpleString queueName, final boolean durable) throws HornetQException
    {
       internalCreateQueue(address, queueName, null, durable, false);
@@ -321,12 +321,12 @@
    {
       return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate, false);
    }
-   
+
    public void createQueue(final String address, final String queueName) throws HornetQException
    {
       internalCreateQueue(toSimpleString(address), toSimpleString(queueName), null, false, false);
    }
-   
+
    public ClientConsumer createConsumer(final String queueName, final String filterString) throws HornetQException
    {
       return createConsumer(toSimpleString(queueName), toSimpleString(filterString));
@@ -618,9 +618,13 @@
    {
       checkClosed();
 
-      SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
+      //We don't send expiries for pre-ack since message will already have been acked on server
+      if (!preAcknowledge)
+      {
+         SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
 
-      channel.send(message);
+         channel.send(message);
+      }
    }
 
    public void addConsumer(final ClientConsumerInternal consumer)
@@ -798,7 +802,7 @@
    {
       channel.returnBlocking();
    }
-   
+
    public ConnectionManager getConnectionManager()
    {
       return connectionManager;
@@ -1193,8 +1197,7 @@
                                                                                                                 false)
                                                                                   : null,
                                                                executor,
-                                                               channel,
-                                                               preAcknowledge);
+                                                               channel);
 
       addConsumer(consumer);
 
@@ -1204,8 +1207,6 @@
 
       if (windowSize != 0)
       {
-         log.info("Sending " + windowSize + " initial credits");
-         
          channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
       }
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -585,12 +585,12 @@
       else
       {
          if (packet.isResponse())
-         {
-            response = packet;
-
+         {            
             confirm(packet);
 
             lock.lock();
+            
+            response = packet;
 
             try
             {

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -232,16 +232,14 @@
 
       failureCheckThread.close();
       
-     // We need to stop them accepting first so no new connections are accepted after we send the disconnect message
+      // We need to stop them accepting first so no new connections are accepted after we send the disconnect message
       for (Acceptor acceptor : acceptors)
       {
          acceptor.pause();
       }
-
-      log.info("there are " + connections.size() + " connections to close on server close");
+     
       for (ConnectionEntry entry : connections.values())
-      {
-         log.info("sending disconnect message");
+      {       
          entry.connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
       }
            

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -146,6 +146,8 @@
    private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
 
    private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
+   
+   private final SimpleString expiryAddress;
 
    public QueueImpl(final long persistenceID,
                     final SimpleString address,
@@ -190,6 +192,15 @@
       direct = true;
 
       scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
+      
+      if (addressSettingsRepository != null)
+      {
+         expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
+      }
+      else
+      {
+         expiryAddress = null;
+      }
    }
    
    // Bindable implementation -------------------------------------------------------------------------------------
@@ -737,35 +748,17 @@
             messageReferences.addFirst(reference, reference.getMessage().getPriority());
          }
       }
-   }
+   }     
 
    public void expire(final MessageReference ref) throws Exception
-   {
-      SimpleString expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
-
+   {      
+      log.info("expiring ref " + this.expiryAddress);
       if (expiryAddress != null)
       {
-         Bindings bindingList = postOffice.getBindingsForAddress(expiryAddress);
-
-         if (bindingList.getBindings().isEmpty())
-         {
-            if (log.isDebugEnabled())
-            {
-               log.debug("Message " + ref + " has expired without any binding for expiry address " + expiryAddress + ", dropping it");
-            }
-         }
-         else
-         {
-            move(expiryAddress, ref, true);
-         }
+         move(expiryAddress, ref, true);         
       }
       else
-      {
-         if (log.isDebugEnabled())
-         {
-            log.debug("Message " + ref + " has expired without any expiry address configured for " + name + ", dropping it");
-         }
-
+      {         
          acknowledge(ref);
       }
    }
@@ -1312,6 +1305,7 @@
 
       Iterator<MessageReference> iterator = null;
 
+      //TODO - this needs to be optimised!! Creating too much stuff on an inner loop
       int totalConsumers = distributionPolicy.getConsumerCount();
       Set<Consumer> busyConsumers = new HashSet<Consumer>();
       Set<Consumer> nullReferences = new HashSet<Consumer>();
@@ -1335,6 +1329,7 @@
             else
             {
                reference = null;
+               
                if (consumer.getFilter() != null)
                {
                   // we have iterated on the whole queue for
@@ -1361,6 +1356,32 @@
          else
          {
             nullReferences.remove(consumer);
+            
+            if (reference.getMessage().isExpired())
+            {
+               //We expire messages on the server too
+               if (iterator == null)
+               {
+                  messageReferences.removeFirst();
+               }
+               else
+               {
+                  iterator.remove();
+               }
+               
+               referenceHandled();
+               
+               try
+               {
+                  expire(reference);
+               }
+               catch (Exception e)
+               {
+                  log.error("Failed to expire ref", e);
+               }
+               
+               continue;
+            }
          }
 
          initPagingStore(reference.getMessage().getDestination());
@@ -1629,7 +1650,7 @@
 
       // TODO: We could optimize this by storing the paging-store for the address on the Queue. We would need to know
       // the Address for the Queue
-      PagingStore store = null;
+      PagingStore store;
 
       if (pagingManager != null)
       {
@@ -1637,13 +1658,14 @@
 
          store.addSize(-ref.getMemoryEstimate());
       }
+      else
+      {
+         store = null;
+      }
 
-      if (message.decrementRefCount() == 0)
+      if (message.decrementRefCount() == 0 && store != null)
       {
-         if (store != null)
-         {
-            store.addSize(-ref.getMessage().getMemoryEstimate());
-         }
+         store.addSize(-ref.getMessage().getMemoryEstimate());         
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -397,7 +397,7 @@
       {
          return null;
       }
-
+            
       // Expiries can come in out of sequence with respect to delivery order
 
       Iterator<MessageReference> iter = deliveringRefs.iterator();
@@ -418,15 +418,6 @@
          }
       }
 
-      if (ref == null)
-      {
-         throw new IllegalStateException("Could not find reference with id " + messageID +
-                                         " backup " +
-                                         messageQueue.isBackup() +
-                                         " closed " +
-                                         closed);
-      }
-
       return ref;
    }
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -1622,7 +1622,6 @@
       {
          MessageReference ref = consumers.get(packet.getConsumerID()).getExpired(packet.getMessageID());
 
-         // Null implies a browser
          if (ref != null)
          {
             ref.getQueue().expire(ref);

Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -248,8 +248,8 @@
  *          <p/>
  *          $Id$
  */
-public class HornetQConnection implements Connection, QueueConnection, TopicConnection, XAConnection, XAQueueConnection,
-         XATopicConnection
+public class HornetQConnection implements Connection, QueueConnection, TopicConnection, XAConnection,
+         XAQueueConnection, XATopicConnection
 {
    // Constants ------------------------------------------------------------------------------------
 
@@ -304,18 +304,18 @@
    private final int transactionBatchSize;
 
    private ClientSession initialSession;
-   
+
    private final Exception creationStack;
 
    // Constructors ---------------------------------------------------------------------------------
 
    public HornetQConnection(final String username,
-                          final String password,
-                          final int connectionType,
-                          final String clientID,
-                          final int dupsOKBatchSize,
-                          final int transactionBatchSize,
-                          final ClientSessionFactory sessionFactory)
+                            final String password,
+                            final int connectionType,
+                            final String clientID,
+                            final int dupsOKBatchSize,
+                            final int transactionBatchSize,
+                            final ClientSessionFactory sessionFactory)
    {
       this.username = username;
 
@@ -334,7 +334,7 @@
       this.dupsOKBatchSize = dupsOKBatchSize;
 
       this.transactionBatchSize = transactionBatchSize;
-      
+
       this.creationStack = new Exception();
    }
 
@@ -622,19 +622,18 @@
    {
       if (!closed)
       {
-         log.warn("I'm closing a JMS connection you left open. Please make sure you close all JMS connections explicitly " +
-                  "before letting them go out of scope!");
-         
+         log.warn("I'm closing a JMS connection you left open. Please make sure you close all JMS connections explicitly " + "before letting them go out of scope!");
+
          log.warn("The JMS connection you didn't close was created here:", creationStack);
-         
+
          close();
       }
    }
 
    protected HornetQSession createSessionInternal(final boolean transacted,
-                                                int acknowledgeMode,
-                                                final boolean isXA,
-                                                final int type) throws JMSException
+                                                  int acknowledgeMode,
+                                                  final boolean isXA,
+                                                  final int type) throws JMSException
    {
       if (transacted)
       {
@@ -647,19 +646,43 @@
 
          if (acknowledgeMode == Session.SESSION_TRANSACTED)
          {
-            session = sessionFactory.createSession(username, password, isXA, false, false, false, transactionBatchSize);
+            session = sessionFactory.createSession(username,
+                                                   password,
+                                                   isXA,
+                                                   false,
+                                                   false,
+                                                   sessionFactory.isPreAcknowledge(),
+                                                   transactionBatchSize);
          }
          else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE)
          {
-            session = sessionFactory.createSession(username, password, isXA, true, true, false, 0);
+            session = sessionFactory.createSession(username,
+                                                   password,
+                                                   isXA,
+                                                   true,
+                                                   true,
+                                                   sessionFactory.isPreAcknowledge(),
+                                                   0);
          }
          else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
          {
-            session = sessionFactory.createSession(username, password, isXA, true, true, false, dupsOKBatchSize);
+            session = sessionFactory.createSession(username,
+                                                   password,
+                                                   isXA,
+                                                   true,
+                                                   true,
+                                                   sessionFactory.isPreAcknowledge(),
+                                                   dupsOKBatchSize);
          }
          else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
          {
-            session = sessionFactory.createSession(username, password, isXA, true, false, false, transactionBatchSize);
+            session = sessionFactory.createSession(username,
+                                                   password,
+                                                   isXA,
+                                                   true,
+                                                   false,
+                                                   sessionFactory.isPreAcknowledge(),
+                                                   transactionBatchSize);
          }
          else if (acknowledgeMode == HornetQSession.PRE_ACKNOWLEDGE)
          {
@@ -722,33 +745,33 @@
    private static class JMSFailureListener implements FailureListener
    {
       private WeakReference<HornetQConnection> connectionRef;
-      
+
       JMSFailureListener(final HornetQConnection connection)
       {
          connectionRef = new WeakReference<HornetQConnection>(connection);
       }
-      
+
       public synchronized void connectionFailed(final HornetQException me)
       {
          if (me == null)
          {
             return;
          }
-         
+
          HornetQConnection conn = connectionRef.get();
-         
+
          if (conn != null)
          {
             try
             {
                final ExceptionListener exceptionListener = conn.getExceptionListener();
-               
+
                if (exceptionListener != null)
                {
                   final JMSException je = new JMSException(me.toString());
-      
+
                   je.initCause(me);
-      
+
                   new Thread(new Runnable()
                   {
                      public void run()

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -212,11 +212,7 @@
          server.start();
 
          ClientSessionFactory sf = createInVMFactory();
-
-         session = sf.createSession(false, false, false);
-
-         session.createQueue(ADDRESS, ADDRESS, true);
-
+         
          SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
          SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
 
@@ -228,6 +224,11 @@
 
          server.getAddressSettingsRepository().addMatch("*", addressSettings);
 
+         session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+        
          session.createQueue(ADDRESS_DLA, ADDRESS_DLA, true);
          session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
 
@@ -346,21 +347,21 @@
          server = createServer(true);
 
          server.start();
+         
+         AddressSettings addressSettings = new AddressSettings();
 
+         SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
+         
+         addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
+
+         server.getAddressSettingsRepository().addMatch("*", addressSettings);
+
          ClientSessionFactory sf = createInVMFactory();
 
          session = sf.createSession(false, false, false);
 
          session.createQueue(ADDRESS, ADDRESS, true);
-
-         SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
-
-         AddressSettings addressSettings = new AddressSettings();
-
-         addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
-
-         server.getAddressSettingsRepository().addMatch("*", addressSettings);
-
+                 
          session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
 
          ClientProducer producer = session.createProducer(ADDRESS);

Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -22,6 +22,7 @@
 import org.hornetq.core.client.ClientSessionFactory;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.SimpleString;
@@ -77,16 +78,71 @@
       consumer.close();
       session.deleteQueue(queue);
    }
+   
+   public void testMessageExpirationOnServer() throws Exception
+   {
+      SimpleString address = randomSimpleString();
+      SimpleString queue = randomSimpleString();
 
+      session.createQueue(address, queue, false);
+
+      ClientProducer producer = session.createProducer(address);
+      ClientConsumer consumer = session.createConsumer(queue);
+      ClientMessage message = session.createClientMessage(false);
+      message.setExpiration(System.currentTimeMillis() + EXPIRATION);
+      producer.send(message);
+
+      Thread.sleep(EXPIRATION * 2);
+           
+      session.start();
+      
+      Thread.sleep(500);
+      
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getDeliveringCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getMessageCount());
+
+      
+      ClientMessage message2 = consumer.receive(500);
+      assertNull(message2);
+
+      consumer.close();
+      session.deleteQueue(queue);
+   }
+   
+   public void testMessageExpirationOnClient() throws Exception
+   {
+      SimpleString address = randomSimpleString();
+      SimpleString queue = randomSimpleString();
+
+      session.createQueue(address, queue, false);
+
+      ClientProducer producer = session.createProducer(address);
+      ClientMessage message = session.createClientMessage(false);
+      message.setExpiration(System.currentTimeMillis() + EXPIRATION);
+      producer.send(message);
+     
+      session.start();
+      
+      Thread.sleep(EXPIRATION * 2);
+
+      ClientConsumer consumer = session.createConsumer(queue);
+      ClientMessage message2 = consumer.receive(500);
+      assertNull(message2);
+                 
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getDeliveringCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getMessageCount());
+
+      consumer.close();
+      session.deleteQueue(queue);
+   }
+
    public void testMessageExpiredWithExpiryAddress() throws Exception
    {
       SimpleString address = randomSimpleString();
       SimpleString queue = randomSimpleString();
       final SimpleString expiryAddress = randomSimpleString();
       SimpleString expiryQueue = randomSimpleString();
-
-      session.createQueue(address, queue, false);
-      session.createQueue(expiryAddress, expiryQueue, false);
+      
       server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings()
       {
          @Override
@@ -96,6 +152,10 @@
          }
       });
 
+      session.createQueue(address, queue, false);
+      session.createQueue(expiryAddress, expiryQueue, false);
+      
+
       ClientProducer producer = session.createProducer(address);
       ClientMessage message = session.createClientMessage(false);
       message.setExpiration(System.currentTimeMillis() + EXPIRATION);

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -74,13 +74,42 @@
          Message m = consumer.receive(500);
          assertNotNull(m);
       }
-      // assert that all the messages are there and none have been acked
+
       SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
       assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
       assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
       conn.close();
    }
+   
+   public void testPreCommitAcksSetOnConnectionFactory() throws Exception
+   {
+      ((HornetQConnectionFactory)cf).setPreAcknowledge(true);
+      Connection conn = cf.createConnection();
+      
+      Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      jBossQueue = new HornetQQueue(Q_NAME);
+      MessageProducer producer = session.createProducer(jBossQueue);
+      MessageConsumer consumer = session.createConsumer(jBossQueue);
+      int noOfMessages = 100;
+      for (int i = 0; i < noOfMessages; i++)
+      {
+         producer.send(session.createTextMessage("m" + i));
+      }
 
+      conn.start();
+      for (int i = 0; i < noOfMessages; i++)
+      {
+         Message m = consumer.receive(500);
+         assertNotNull(m);
+      }
+      
+      //Messages should all have been acked since we set pre ack on the cf
+      SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
+      conn.close();
+   }
+
    public void testPreCommitAcksWithMessageExpiry() throws Exception
    {
       Connection conn = cf.createConnection();
@@ -95,14 +124,41 @@
          producer.setTimeToLive(1);
          producer.send(textMessage);
       }
+      
+      Thread.sleep(2);
 
       conn.start();
+      Message m = consumer.receive(500);
+      assertNull(m);
+      
+      SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
+      conn.close();
+   }
+   
+   public void testPreCommitAcksWithMessageExpirySetOnConnectionFactory() throws Exception
+   {
+      ((HornetQConnectionFactory)cf).setPreAcknowledge(true);
+      Connection conn = cf.createConnection();
+      Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      jBossQueue = new HornetQQueue(Q_NAME);
+      MessageProducer producer = session.createProducer(jBossQueue);
+      MessageConsumer consumer = session.createConsumer(jBossQueue);
+      int noOfMessages = 1000;
       for (int i = 0; i < noOfMessages; i++)
       {
-         Message m = consumer.receive(500);
-         assertNotNull(m);
+         TextMessage textMessage = session.createTextMessage("m" + i);
+         producer.setTimeToLive(1);
+         producer.send(textMessage);
       }
-      // assert that all the messages are there and none have been acked
+      
+      Thread.sleep(2);
+
+      conn.start();
+      Message m = consumer.receive(500);
+      assertNull(m);
+      
       SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
       assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
       assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
@@ -112,7 +168,7 @@
    public void testClearExceptionListener() throws Exception
    {
       Connection conn = cf.createConnection();
-      Session session = conn.createSession(false, HornetQSession.PRE_ACKNOWLEDGE);
+      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
       jBossQueue = new HornetQQueue(Q_NAME);
       MessageConsumer consumer = session.createConsumer(jBossQueue);
       consumer.setMessageListener(new MessageListener()
@@ -129,7 +185,7 @@
    public void testCantReceiveWhenListenerIsSet() throws Exception
    {
       Connection conn = cf.createConnection();
-      Session session = conn.createSession(false, HornetQSession.PRE_ACKNOWLEDGE);
+      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
       jBossQueue = new HornetQQueue(Q_NAME);
       MessageConsumer consumer = session.createConsumer(jBossQueue);
       consumer.setMessageListener(new MessageListener()

Modified: trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java	2009-08-27 14:22:20 UTC (rev 7923)
+++ trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java	2009-08-27 15:22:39 UTC (rev 7924)
@@ -67,17 +67,7 @@
       }
       Thread.sleep(1600);
       assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
-      assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
-      ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
-      clientSession.start();
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage cm = consumer.receive(500);
-         assertNotNull(cm);
-         //assertEquals("m" + i, cm.getBody().getString());
-      }
-      consumer.close();
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());      
    }
 
    public void testExpireFromMultipleQueues() throws Exception
@@ -102,16 +92,6 @@
       Thread.sleep(1600);
       assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
       assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
-      ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
-      clientSession.start();
-      for (int i = 0; i < numMessages * 2; i++)
-      {
-         ClientMessage cm = consumer.receive(500);
-         assertNotNull(cm);
-         //assertEquals("m" + i, cm.getBody().getString());
-      }
-      consumer.close();
    }
 
    public void testExpireHalf() throws Exception
@@ -131,16 +111,6 @@
       Thread.sleep(1600);
       assertEquals(numMessages / 2, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
       assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
-      ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
-      clientSession.start();
-      for (int i = 0; i < numMessages; i += 2)
-      {
-         ClientMessage cm = consumer.receive(500);
-         assertNotNull(cm);
-         //assertEquals("m" + i, cm.getBody().getString());
-      }
-      consumer.close();
    }
 
    public void testExpireConsumeHalf() throws Exception
@@ -167,24 +137,16 @@
       Thread.sleep(2100);
       assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
       assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
-      consumer = clientSession.createConsumer(expiryQueue);
-      clientSession.start();
-      for (int i = 50; i < numMessages; i++)
-      {
-         ClientMessage cm = consumer.receive(500);
-         assertNotNull(cm);
-         //assertEquals("m" + i, cm.getBody().getString());
-      }
-      consumer.close();
    }
 
-   public void testExpireToMultipleQueues() throws Exception
-   {
-      clientSession.createQueue(qName, qName2, null, false);
+   public void testExpireToExpiryQueue() throws Exception
+   {      
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setExpiryAddress(expiryAddress);
       server.getAddressSettingsRepository().addMatch(qName2.toString(), addressSettings);
+      clientSession.deleteQueue(qName);
+      clientSession.createQueue(qName, qName, null, false);
+      clientSession.createQueue(qName, qName2, null, false);
       ClientProducer producer = clientSession.createProducer(qName);
       int numMessages = 100;
       long expiration = System.currentTimeMillis();
@@ -222,7 +184,8 @@
       CountDownLatch latch = new CountDownLatch(1);
       DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(consumer, latch);
       clientSession.start();
-      new Thread(dummyMessageHandler).start();
+      Thread thr = new Thread(dummyMessageHandler);      
+      thr.start();
       long expiration = System.currentTimeMillis() + 1000;
       int numMessages = 0;
       long sendMessagesUntil = System.currentTimeMillis() + 2000;
@@ -257,6 +220,7 @@
       }
       assertTrue(dummyMessageHandler.payloads.isEmpty());
       consumer.close();
+      thr.join();
    }
 
    public static void main(String[] args) throws Exception



More information about the hornetq-commits mailing list