[hornetq-commits] JBoss hornetq SVN: r8783 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Jan 9 13:26:20 EST 2010


Author: timfox
Date: 2010-01-09 13:26:20 -0500 (Sat, 09 Jan 2010)
New Revision: 8783

Modified:
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-263

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-01-08 16:38:13 UTC (rev 8782)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-01-09 18:26:20 UTC (rev 8783)
@@ -88,8 +88,6 @@
 
    private final List<MessageHandler> handlers = new ArrayList<MessageHandler>();
 
-   private final ConcurrentSet<MessageReference> expiringMessageReferences = new ConcurrentHashSet<MessageReference>();
-
    private final ScheduledDeliveryHandler scheduledDeliveryHandler;
 
    private boolean direct;
@@ -432,7 +430,7 @@
       }
    }
 
-   public synchronized MessageReference removeReferenceWithID(final long id) throws Exception
+   public MessageReference removeReferenceWithID(final long id) throws Exception
    {
       Iterator<MessageReference> iterator = messageReferences.iterator();
 
@@ -448,8 +446,6 @@
 
             removed = ref;
 
-            removeExpiringReference(removed);
-
             break;
          }
       }
@@ -481,7 +477,7 @@
       return ref;
    }
 
-   public synchronized MessageReference getReference(final long id)
+   public MessageReference getReference(final long id)
    {
       Iterator<MessageReference> iterator = messageReferences.iterator();
 
@@ -633,44 +629,40 @@
    {
       int count = 0;
 
-      synchronized (this)
-      {
+      Transaction tx = new TransactionImpl(storageManager);
 
-         Transaction tx = new TransactionImpl(storageManager);
+      Iterator<MessageReference> iter = messageReferences.iterator();
 
-         Iterator<MessageReference> iter = messageReferences.iterator();
+      while (iter.hasNext())
+      {
+         MessageReference ref = iter.next();
 
-         while (iter.hasNext())
+         if (filter == null || filter.match(ref.getMessage()))
          {
-            MessageReference ref = iter.next();
-
-            if (filter == null || filter.match(ref.getMessage()))
-            {
-               deliveringCount.incrementAndGet();
-               acknowledge(tx, ref);
-               iter.remove();
-               count++;
-            }
+            deliveringCount.incrementAndGet();
+            acknowledge(tx, ref);
+            iter.remove();
+            count++;
          }
+      }
 
-         List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
-         for (MessageReference messageReference : cancelled)
+      List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+      for (MessageReference messageReference : cancelled)
+      {
+         if (filter == null || filter.match(messageReference.getMessage()))
          {
-            if (filter == null || filter.match(messageReference.getMessage()))
-            {
-               deliveringCount.incrementAndGet();
-               acknowledge(tx, messageReference);
-               count++;
-            }
+            deliveringCount.incrementAndGet();
+            acknowledge(tx, messageReference);
+            count++;
          }
-
-         tx.commit();
       }
 
+      tx.commit();
+
       return count;
    }
 
-   public synchronized boolean deleteReference(final long messageID) throws Exception
+   public boolean deleteReference(final long messageID) throws Exception
    {
       boolean deleted = false;
 
@@ -696,7 +688,7 @@
       return deleted;
    }
 
-   public synchronized boolean expireReference(final long messageID) throws Exception
+   public boolean expireReference(final long messageID) throws Exception
    {
       Iterator<MessageReference> iter = messageReferences.iterator();
 
@@ -714,7 +706,7 @@
       return false;
    }
 
-   public synchronized int expireReferences(final Filter filter) throws Exception
+   public int expireReferences(final Filter filter) throws Exception
    {
       Transaction tx = new TransactionImpl(storageManager);
 
@@ -738,18 +730,23 @@
       return count;
    }
 
-   public synchronized void expireReferences() throws Exception
+   public void expireReferences() throws Exception
    {
-      for (MessageReference expiringMessageReference : expiringMessageReferences)
+      Iterator<MessageReference> iter = messageReferences.iterator();
+
+      while (iter.hasNext())
       {
-         if (expiringMessageReference.getMessage().isExpired())
+         MessageReference ref = iter.next();
+         if (ref.getMessage().isExpired())
          {
-            expireReference(expiringMessageReference.getMessage().getMessageID());
+            deliveringCount.incrementAndGet();
+            expire(ref);
+            iter.remove();
          }
       }
    }
 
-   public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
+   public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
    {
       Iterator<MessageReference> iter = messageReferences.iterator();
 
@@ -767,7 +764,7 @@
       return false;
    }
 
-   public synchronized boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
+   public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
    {
       Iterator<MessageReference> iter = messageReferences.iterator();
 
@@ -785,7 +782,7 @@
       return false;
    }
 
-   public synchronized int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
+   public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
    {
       Transaction tx = new TransactionImpl(storageManager);
 
@@ -821,7 +818,7 @@
       return count;
    }
 
-   public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
+   public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
    {
       Iterator<MessageReference> iter = messageReferences.iterator();
 
@@ -1165,13 +1162,6 @@
                      busyCount++;
 
                      handler.reset();
-
-                     // if (groupID != null )
-                     // {
-                     // // group id being set seems to make delivery stop
-                     // // FIXME !!! why??
-                     // break;
-                     // }
                   }
                   else if (status == HandleStatus.NO_MATCH)
                   {
@@ -1214,6 +1204,11 @@
          return false;
       }
 
+      if (checkExpired(reference))
+      {
+         return true;
+      }
+
       int startPos = pos;
       int busyCount = 0;
       boolean setPromptDelivery = false;
@@ -1223,49 +1218,46 @@
 
          Consumer consumer = handler.getConsumer();
 
-         if (!checkExpired(reference))
+         SimpleString groupID = reference.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+
+         boolean tryHandle = true;
+
+         if (groupID != null)
          {
-            SimpleString groupID = reference.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+            Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
 
-            boolean tryHandle = true;
+            if (groupConsumer != null && groupConsumer != consumer)
+            {
+               tryHandle = false;
+            }
+         }
 
-            if (groupID != null)
+         if (tryHandle)
+         {
+            HandleStatus status = handle(reference, consumer);
+
+            if (status == HandleStatus.HANDLED)
             {
-               Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+               return true;
+            }
+            else if (status == HandleStatus.BUSY)
+            {
+               busyCount++;
 
-               if (groupConsumer != null && groupConsumer != consumer)
+               if (groupID != null)
                {
-                  tryHandle = false;
+                  return false;
                }
             }
-
-            if (tryHandle)
+            else if (status == HandleStatus.NO_MATCH)
             {
-               HandleStatus status = handle(reference, consumer);
-
-               if (status == HandleStatus.HANDLED)
+               // if consumer filter reject the message make sure it won't be assigned the message group
+               if (groupID != null)
                {
-                  return true;
+                  groups.remove(groupID);
                }
-               else if (status == HandleStatus.BUSY)
-               {
-                  busyCount++;
 
-                  if (groupID != null)
-                  {
-                     return false;
-                  }
-               }
-               else if (status == HandleStatus.NO_MATCH)
-               {
-                  // if consumer filter reject the message make sure it won't be assigned the message group
-                  if (groupID != null)
-                  {
-                     groups.remove(groupID);
-                  }
-
-                  setPromptDelivery = true;
-               }
+               setPromptDelivery = true;
             }
          }
 
@@ -1320,11 +1312,6 @@
 
       if (add)
       {
-         if (ref.getMessage().getExpiration() != 0)
-         {
-            expiringMessageReferences.addIfAbsent(ref);
-         }
-
          if (first)
          {
             messageReferences.addFirst(ref, ref.getMessage().getPriority());
@@ -1377,14 +1364,6 @@
       return status;
    }
 
-   private void removeExpiringReference(final MessageReference ref)
-   {
-      if (ref.getMessage().getExpiration() > 0)
-      {
-         expiringMessageReferences.remove(ref);
-      }
-   }
-
    private void postAcknowledge(final MessageReference ref) throws Exception
    {
       final ServerMessage message = ref.getMessage();
@@ -1420,8 +1399,6 @@
          }
       }
 
-      queue.removeExpiringReference(ref);
-
       queue.deliveringCount.decrementAndGet();
 
       message.decrementRefCount(ref);

Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-01-08 16:38:13 UTC (rev 8782)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-01-09 18:26:20 UTC (rev 8783)
@@ -28,6 +28,7 @@
 import org.hornetq.api.core.client.ClientProducer;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.impl.TestSupportPageStore;
@@ -821,7 +822,106 @@
       }
 
    }
+   
+   public void testDropMessagesExpiring() throws Exception
+   {
+      clearData();
 
+      Configuration config = createDefaultConfig();
+
+      HashMap<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+
+      AddressSettings set = new AddressSettings();
+      set.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
+
+      settings.put(PagingTest.ADDRESS.toString(), set);
+
+      HornetQServer server = createServer(true, config, 1024,  1024 * 1024, settings);
+
+      server.start();
+
+      final int numberOfMessages = 30000;
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+         
+         sf.setAckBatchSize(0);
+
+         ClientSession session = sf.createSession();
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+         
+         class MyHandler implements MessageHandler
+         {
+            int count;
+
+            public void onMessage(ClientMessage message)
+            {
+               try
+               {
+                  Thread.sleep(1);
+               }
+               catch (Exception e)
+               {
+                  
+               }
+               
+               count++;
+               
+               if (count % 1000 == 0)
+               {
+                  log.info("received " + count);
+               }
+               
+               try
+               {
+                  message.acknowledge();
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+               }
+            }           
+         }
+         
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+         session.start();
+         
+         consumer.setMessageHandler(new MyHandler());
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            byte[] body = new byte[1024];
+
+            message = session.createMessage(false);
+            message.getBodyBuffer().writeBytes(body);
+            
+            message.setExpiration(System.currentTimeMillis() + 100);
+
+            producer.send(message);
+         }
+         
+         session.close();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    private void internalTestPageMultipleDestinations(final boolean transacted) throws Exception
    {
       Configuration config = createDefaultConfig();



More information about the hornetq-commits mailing list