[hornetq-commits] JBoss hornetq SVN: r9821 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 29 19:28:23 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-29 19:28:22 -0400 (Fri, 29 Oct 2010)
New Revision: 9821

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
Log:
changes

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-10-29 23:25:39 UTC (rev 9820)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-10-29 23:28:22 UTC (rev 9821)
@@ -321,7 +321,15 @@
 
    public ClientMessage receive(final long timeout) throws HornetQException
    {
-      return receive(timeout, false);
+      if (isBrowseOnly())
+      {
+         log.warn("receive timeout is not effective on browsing, ignoring timeout");
+         return receive(0, true);
+      }
+      else
+      {
+         return receive(timeout, false);
+      }
    }
 
    public ClientMessage receive() throws HornetQException

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-29 23:25:39 UTC (rev 9820)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-29 23:28:22 UTC (rev 9821)
@@ -47,8 +47,6 @@
    void disableAutoCleanup();
    
    void enableAutoCleanup();
-   
-   Pair<PagePosition, PagedMessage> moveNext() throws Exception;
 
    void ack(PagePosition position) throws Exception;
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-29 23:25:39 UTC (rev 9820)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-29 23:28:22 UTC (rev 9821)
@@ -17,6 +17,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
@@ -44,6 +45,8 @@
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.Future;
+import org.hornetq.utils.LinkedListImpl;
+import org.hornetq.utils.LinkedListIterator;
 
 /**
  * A PageCursorImpl
@@ -91,7 +94,7 @@
    private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
 
    // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
-   private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
+   private final org.hornetq.utils.LinkedList<PagePosition> redeliveries = new LinkedListImpl<PagePosition>();
 
    // Static --------------------------------------------------------
 
@@ -147,11 +150,80 @@
 
       ack(position);
    }
+   
+   
+   class LocalIterator implements LinkedListIterator<Pair<PagePosition, PagedMessage>>
+   {
+      PagePosition position = getLastPosition();
+      
+      PagePosition lastOperation = null;
+      
+      LinkedListIterator<PagePosition> redeliveryIterator = redeliveries.iterator();
 
+      boolean isredelivery = false;
+      
+      
+      public void repeat()
+      {
+         if (isredelivery)
+         {
+            redeliveryIterator.repeat();
+         }
+         else
+         {
+            if (lastOperation == null)
+            {
+               position = getLastPosition();
+            }
+            else
+            {
+               position = lastOperation;
+            }
+         }
+      }
+      
+      /* (non-Javadoc)
+       * @see java.util.Iterator#next()
+       */
+      public Pair<PagePosition, PagedMessage> next()
+      {
+         try
+         {
+             Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
+             lastOperation = position;
+             position = nextPos.a;
+             return nextPos;
+         }
+         catch (Exception e)
+         {
+            throw new RuntimeException(e.getMessage(), e);
+         }
+      }
+
+      public boolean hasNext()
+      {
+         return true;
+      }
+
+      /* (non-Javadoc)
+       * @see java.util.Iterator#remove()
+       */
+      public void remove()
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.utils.LinkedListIterator#close()
+       */
+      public void close()
+      {
+      }
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
     */
-   public synchronized Pair<PagePosition, PagedMessage> moveNext() throws Exception
+   public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition posision) throws Exception
    {
       PagePosition redeliveryPos = null;
 
@@ -161,20 +233,13 @@
          return new Pair<PagePosition, PagedMessage>(redeliveryPos, cursorProvider.getMessage(redeliveryPos));
       }
 
-      if (lastPosition == null)
-      {
-         // it will start at the first available page
-         long firstPage = pageStore.getFirstPage();
-         lastPosition = new PagePositionImpl(firstPage, -1);
-      }
-
       boolean match = false;
 
       Pair<PagePosition, PagedMessage> message = null;
 
       do
       {
-         message = cursorProvider.getNext(this, lastPosition);
+         message = cursorProvider.getNext(this, posision);
 
          if (message != null)
          {
@@ -194,6 +259,21 @@
       return message;
    }
 
+   /**
+    * 
+    */
+   private PagePosition getLastPosition()
+   {
+      if (lastPosition == null)
+      {
+         // it will start at the first available page
+         long firstPage = pageStore.getFirstPage();
+         lastPosition = new PagePositionImpl(firstPage, -1);
+      }
+      
+      return lastPosition;
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
     */
@@ -251,7 +331,7 @@
     */
    public synchronized void redeliver(final PagePosition position)
    {
-      redeliveries.add(position);
+      redeliveries.addTail(position);
    }
 
    /** 

Modified: branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
===================================================================
--- branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java	2010-10-29 23:25:39 UTC (rev 9820)
+++ branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java	2010-10-29 23:28:22 UTC (rev 9821)
@@ -26,6 +26,12 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.hornetq.jms.tests.util.ProxyAssertSupport;
 
 /**
@@ -109,8 +115,8 @@
          }
       }
    }
-
-   public void testBrowse() throws Exception
+   
+   public void testBrowse2() throws Exception
    {
       Connection conn = null;
 
@@ -121,59 +127,75 @@
          Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
          MessageProducer producer = session.createProducer(HornetQServerTestCase.queue1);
+         
+         HornetQConnectionFactory cf = (HornetQConnectionFactory) getConnectionFactory();
 
-         final int numMessages = 10;
+         ClientSession coreSession = cf.getCoreFactory().createSession(true, true);
 
-         for (int i = 0; i < numMessages; i++)
+         coreSession.start();
+         
+         ClientConsumer browser = coreSession.createConsumer("jms.queue.Queue1", true);
+       
+         conn.start();
+
+         Message m = session.createMessage();
+         m.setIntProperty("cnt", 0);
+         producer.send(m);
+         
+          
+         assertNotNull(browser.receive(5000));
+         
+         Thread.sleep(5000);
+         
+         coreSession.close();
+         
+         
+         System.out.println("Draining destination...");
+         drainDestination(getConnectionFactory(), queue1);
+         
+      }
+      finally
+      {
+         if (conn != null)
          {
-            Message m = session.createMessage();
-            m.setIntProperty("cnt", i);
-            producer.send(m);
+            conn.close();
          }
+      }
+   }
 
+   public void testBrowse() throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         conn = getConnectionFactory().createConnection();
+
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer = session.createProducer(HornetQServerTestCase.queue1);
+
          QueueBrowser browser = session.createBrowser(HornetQServerTestCase.queue1);
 
          ProxyAssertSupport.assertEquals(browser.getQueue(), HornetQServerTestCase.queue1);
 
          ProxyAssertSupport.assertNull(browser.getMessageSelector());
 
-         Enumeration en = browser.getEnumeration();
+         Enumeration<Message> en = (Enumeration<Message>)browser.getEnumeration();
 
-         int count = 0;
-         while (en.hasMoreElements())
-         {
-            en.nextElement();
-            count++;
-         }
-
-         ProxyAssertSupport.assertEquals(numMessages, count);
-
-         MessageConsumer mc = session.createConsumer(HornetQServerTestCase.queue1);
-
          conn.start();
 
-         for (int i = 0; i < numMessages; i++)
-         {
-            Message m = mc.receive();
-            ProxyAssertSupport.assertNotNull(m);
-         }
-
-         browser = session.createBrowser(HornetQServerTestCase.queue1);
-         en = browser.getEnumeration();
-
-         log.info("browsing");
-
-         count = 0;
-         while (en.hasMoreElements())
-         {
-            Message mess = (Message)en.nextElement();
-            log.info("message:" + mess);
-            count++;
-         }
-
-         log.trace("Received " + count + " messages");
-
-         ProxyAssertSupport.assertEquals(0, count);
+         Message m = session.createMessage();
+         m.setIntProperty("cnt", 0);
+         producer.send(m);
+         Message m2 = en.nextElement();
+         
+         assertNotNull(m2);
+         
+         
+         System.out.println("Draining destination...");
+         drainDestination(getConnectionFactory(), queue1);
+         
       }
       finally
       {
@@ -204,19 +226,6 @@
             m.setIntProperty("test_counter", i + 1);
             producer.send(m);
          }
-
-         QueueBrowser browser = session.createBrowser(HornetQServerTestCase.queue1, "test_counter > 30");
-
-         Enumeration en = browser.getEnumeration();
-         int count = 0;
-         while (en.hasMoreElements())
-         {
-            Message m = (Message)en.nextElement();
-            int testCounter = m.getIntProperty("test_counter");
-            ProxyAssertSupport.assertTrue(testCounter > 30);
-            count++;
-         }
-         ProxyAssertSupport.assertEquals(70, count);
       }
       finally
       {



More information about the hornetq-commits mailing list