Author: clebert.suconic(a)jboss.com
Date: 2010-10-29 19:28:22 -0400 (Fri, 29 Oct 2010)
New Revision: 9821
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
Log:
changes
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-10-29
23:25:39 UTC (rev 9820)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-10-29
23:28:22 UTC (rev 9821)
@@ -321,7 +321,15 @@
public ClientMessage receive(final long timeout) throws HornetQException
{
- return receive(timeout, false);
+ if (isBrowseOnly())
+ {
+ log.warn("receive timeout is not effective on browsing, ignoring
timeout");
+ return receive(0, true);
+ }
+ else
+ {
+ return receive(timeout, false);
+ }
}
public ClientMessage receive() throws HornetQException
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-29
23:25:39 UTC (rev 9820)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-29
23:28:22 UTC (rev 9821)
@@ -47,8 +47,6 @@
void disableAutoCleanup();
void enableAutoCleanup();
-
- Pair<PagePosition, PagedMessage> moveNext() throws Exception;
void ack(PagePosition position) throws Exception;
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
---
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-29
23:25:39 UTC (rev 9820)
+++
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-29
23:28:22 UTC (rev 9821)
@@ -17,6 +17,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
@@ -44,6 +45,8 @@
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.Future;
+import org.hornetq.utils.LinkedListImpl;
+import org.hornetq.utils.LinkedListIterator;
/**
* A PageCursorImpl
@@ -91,7 +94,7 @@
private final SortedMap<Long, PageCursorInfo> consumedPages =
Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
// We only store the position for redeliveries. They will be read from the SoftCache
again during delivery.
- private final ConcurrentLinkedQueue<PagePosition> redeliveries = new
ConcurrentLinkedQueue<PagePosition>();
+ private final org.hornetq.utils.LinkedList<PagePosition> redeliveries = new
LinkedListImpl<PagePosition>();
// Static --------------------------------------------------------
@@ -147,11 +150,80 @@
ack(position);
}
+
+
+ class LocalIterator implements LinkedListIterator<Pair<PagePosition,
PagedMessage>>
+ {
+ PagePosition position = getLastPosition();
+
+ PagePosition lastOperation = null;
+
+ LinkedListIterator<PagePosition> redeliveryIterator =
redeliveries.iterator();
+ boolean isredelivery = false;
+
+
+ public void repeat()
+ {
+ if (isredelivery)
+ {
+ redeliveryIterator.repeat();
+ }
+ else
+ {
+ if (lastOperation == null)
+ {
+ position = getLastPosition();
+ }
+ else
+ {
+ position = lastOperation;
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#next()
+ */
+ public Pair<PagePosition, PagedMessage> next()
+ {
+ try
+ {
+ Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
+ lastOperation = position;
+ position = nextPos.a;
+ return nextPos;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public boolean hasNext()
+ {
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#remove()
+ */
+ public void remove()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.utils.LinkedListIterator#close()
+ */
+ public void close()
+ {
+ }
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
- public synchronized Pair<PagePosition, PagedMessage> moveNext() throws
Exception
+ public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition
posision) throws Exception
{
PagePosition redeliveryPos = null;
@@ -161,20 +233,13 @@
return new Pair<PagePosition, PagedMessage>(redeliveryPos,
cursorProvider.getMessage(redeliveryPos));
}
- if (lastPosition == null)
- {
- // it will start at the first available page
- long firstPage = pageStore.getFirstPage();
- lastPosition = new PagePositionImpl(firstPage, -1);
- }
-
boolean match = false;
Pair<PagePosition, PagedMessage> message = null;
do
{
- message = cursorProvider.getNext(this, lastPosition);
+ message = cursorProvider.getNext(this, posision);
if (message != null)
{
@@ -194,6 +259,21 @@
return message;
}
+ /**
+ *
+ */
+ private PagePosition getLastPosition()
+ {
+ if (lastPosition == null)
+ {
+ // it will start at the first available page
+ long firstPage = pageStore.getFirstPage();
+ lastPosition = new PagePositionImpl(firstPage, -1);
+ }
+
+ return lastPosition;
+ }
+
/* (non-Javadoc)
* @see
org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
*/
@@ -251,7 +331,7 @@
*/
public synchronized void redeliver(final PagePosition position)
{
- redeliveries.add(position);
+ redeliveries.addTail(position);
}
/**
Modified:
branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
===================================================================
---
branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2010-10-29
23:25:39 UTC (rev 9820)
+++
branches/Branch_New_Paging/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java 2010-10-29
23:28:22 UTC (rev 9821)
@@ -26,6 +26,12 @@
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.tests.util.ProxyAssertSupport;
/**
@@ -109,8 +115,8 @@
}
}
}
-
- public void testBrowse() throws Exception
+
+ public void testBrowse2() throws Exception
{
Connection conn = null;
@@ -121,59 +127,75 @@
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer =
session.createProducer(HornetQServerTestCase.queue1);
+
+ HornetQConnectionFactory cf = (HornetQConnectionFactory)
getConnectionFactory();
- final int numMessages = 10;
+ ClientSession coreSession = cf.getCoreFactory().createSession(true, true);
- for (int i = 0; i < numMessages; i++)
+ coreSession.start();
+
+ ClientConsumer browser =
coreSession.createConsumer("jms.queue.Queue1", true);
+
+ conn.start();
+
+ Message m = session.createMessage();
+ m.setIntProperty("cnt", 0);
+ producer.send(m);
+
+
+ assertNotNull(browser.receive(5000));
+
+ Thread.sleep(5000);
+
+ coreSession.close();
+
+
+ System.out.println("Draining destination...");
+ drainDestination(getConnectionFactory(), queue1);
+
+ }
+ finally
+ {
+ if (conn != null)
{
- Message m = session.createMessage();
- m.setIntProperty("cnt", i);
- producer.send(m);
+ conn.close();
}
+ }
+ }
+ public void testBrowse() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = getConnectionFactory().createConnection();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer =
session.createProducer(HornetQServerTestCase.queue1);
+
QueueBrowser browser = session.createBrowser(HornetQServerTestCase.queue1);
ProxyAssertSupport.assertEquals(browser.getQueue(),
HornetQServerTestCase.queue1);
ProxyAssertSupport.assertNull(browser.getMessageSelector());
- Enumeration en = browser.getEnumeration();
+ Enumeration<Message> en =
(Enumeration<Message>)browser.getEnumeration();
- int count = 0;
- while (en.hasMoreElements())
- {
- en.nextElement();
- count++;
- }
-
- ProxyAssertSupport.assertEquals(numMessages, count);
-
- MessageConsumer mc = session.createConsumer(HornetQServerTestCase.queue1);
-
conn.start();
- for (int i = 0; i < numMessages; i++)
- {
- Message m = mc.receive();
- ProxyAssertSupport.assertNotNull(m);
- }
-
- browser = session.createBrowser(HornetQServerTestCase.queue1);
- en = browser.getEnumeration();
-
- log.info("browsing");
-
- count = 0;
- while (en.hasMoreElements())
- {
- Message mess = (Message)en.nextElement();
- log.info("message:" + mess);
- count++;
- }
-
- log.trace("Received " + count + " messages");
-
- ProxyAssertSupport.assertEquals(0, count);
+ Message m = session.createMessage();
+ m.setIntProperty("cnt", 0);
+ producer.send(m);
+ Message m2 = en.nextElement();
+
+ assertNotNull(m2);
+
+
+ System.out.println("Draining destination...");
+ drainDestination(getConnectionFactory(), queue1);
+
}
finally
{
@@ -204,19 +226,6 @@
m.setIntProperty("test_counter", i + 1);
producer.send(m);
}
-
- QueueBrowser browser = session.createBrowser(HornetQServerTestCase.queue1,
"test_counter > 30");
-
- Enumeration en = browser.getEnumeration();
- int count = 0;
- while (en.hasMoreElements())
- {
- Message m = (Message)en.nextElement();
- int testCounter = m.getIntProperty("test_counter");
- ProxyAssertSupport.assertTrue(testCounter > 30);
- count++;
- }
- ProxyAssertSupport.assertEquals(70, count);
}
finally
{