JBoss hornetq SVN: r9813 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-25 20:08:05 -0400 (Mon, 25 Oct 2010)
New Revision: 9813
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Clear depage logic
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -93,4 +93,14 @@
void addSize(int size);
void executeRunnableWhenMemoryAvailable(Runnable runnable);
+
+ /** This method will hold and producer, but it wait operations to finish before locking (write lock) */
+ void lock();
+
+ /**
+ *
+ * Call this method using the same thread used by the last call of {@link PagingStore#lock()}
+ *
+ */
+ void unlock();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -79,4 +79,10 @@
void redeliver(PagePosition position);
void printDebug();
+
+ /**
+ * @param minPage
+ * @return
+ */
+ boolean isComplete(long minPage);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -271,6 +271,15 @@
processACK(position);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#isComplete(long)
+ */
+ public boolean isComplete(long page)
+ {
+ PageCursorInfo info = consumedPages.get(page);
+ return info != null && info.isDone();
+ }
+
/**
* All the data associated with the cursor should go away here
*/
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -14,7 +14,9 @@
package org.hornetq.core.paging.cursor.impl;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@@ -66,7 +68,7 @@
private final Executor executor;
- private SoftValueHashMap<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
+ private Map<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
private ConcurrentMap<Long, PageCursor> activeCursors = new ConcurrentHashMap<Long, PageCursor>();
@@ -85,7 +87,7 @@
this.storageManager = storageManager;
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
- }
+ }
// Public --------------------------------------------------------
@@ -93,16 +95,21 @@
{
return pagingStore;
}
-
- public PageCursor createPersistentCursor(long cursorID, Filter filter)
+
+ public synchronized PageCursor createPersistentCursor(long cursorID, Filter filter)
{
PageCursor activeCursor = activeCursors.get(cursorID);
if (activeCursor != null)
{
throw new IllegalStateException("Cursor " + cursorID + " had already been created");
}
-
- activeCursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), filter, cursorID);
+
+ activeCursor = new PageCursorImpl(this,
+ pagingStore,
+ storageManager,
+ executorFactory.getExecutor(),
+ filter,
+ cursorID);
activeCursors.put(cursorID, activeCursor);
return activeCursor;
}
@@ -110,7 +117,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
- public PageCursor getPersistentCursor(long cursorID)
+ public synchronized PageCursor getPersistentCursor(long cursorID)
{
return activeCursors.get(cursorID);
}
@@ -118,9 +125,14 @@
/**
* this will create a non-persistent cursor
*/
- public PageCursor createNonPersistentCursor(Filter filter)
+ public synchronized PageCursor createNonPersistentCursor(Filter filter)
{
- PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), filter, 0);
+ PageCursor cursor = new PageCursorImpl(this,
+ pagingStore,
+ storageManager,
+ executorFactory.getExecutor(),
+ filter,
+ 0);
nonPersistentCursors.add(cursor);
return cursor;
}
@@ -230,14 +242,20 @@
return cache;
}
- public synchronized void addPageCache(PageCache cache)
+ public void addPageCache(PageCache cache)
{
- softCache.put(cache.getPageId(), cache);
+ synchronized (softCache)
+ {
+ softCache.put(cache.getPageId(), cache);
+ }
}
- public synchronized int getCacheSize()
+ public int getCacheSize()
{
- return softCache.size();
+ synchronized (softCache)
+ {
+ return softCache.size();
+ }
}
public void processReload() throws Exception
@@ -302,24 +320,70 @@
private void cleanup()
{
- long minPage = getMinPageInUse();
+ ArrayList<Page> depagedPages = new ArrayList<Page>();
- try
+ pagingStore.lock();
+
+ synchronized (this)
{
- for (long i = pagingStore.getFirstPage(); i < minPage; i++)
+ try
{
- Page page = pagingStore.depage();
- if (page != null)
+ ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
+ cursorList.addAll(activeCursors.values());
+ cursorList.addAll(nonPersistentCursors);
+
+ long minPage = checkMinPage(cursorList);
+
+ if (minPage == pagingStore.getCurrentWritingPage())
{
- System.out.println("Deleting " + page);
- page.delete();
+ boolean complete = true;
+
+ for (PageCursor cursor: cursorList)
+ {
+ if (!cursor.isComplete(minPage))
+ {
+ complete = false;
+ break;
+ }
+ }
+
+ if (complete)
+ {
+ System.out.println("Depaging complete now. We can leave page state at this point!");
+ // move every cursor away from the main page, clearing every cursor's old pages while only keeping a bookmark for the next page case it happens again
+ }
}
+
+ for (long i = pagingStore.getFirstPage(); i < minPage; i++)
+ {
+ Page page = pagingStore.depage();
+ depagedPages.add(page);
+ }
}
+ catch (Exception ex)
+ {
+ log.warn("Couldn't complete cleanup on paging", ex);
+ return;
+ }
+ finally
+ {
+ pagingStore.unlock();
+ }
}
+
+ try
+ {
+ for (Page depagedPage : depagedPages)
+ {
+ depagedPage.delete();
+ }
+ }
catch (Exception ex)
{
log.warn("Couldn't complete cleanup on paging", ex);
+ return;
}
+
}
public void printDebug()
@@ -343,15 +407,11 @@
// Private -------------------------------------------------------
- private long getMinPageInUse()
+ /**
+ * This method is synchronized because we want it to be atomic with the cursors being used
+ */
+ private long checkMinPage(List<PageCursor> cursorList)
{
- ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
- synchronized (this)
- {
- cursorList.addAll(activeCursors.values());
- cursorList.addAll(nonPersistentCursors);
- }
-
if (cursorList.size() == 0)
{
return 0l;
@@ -378,7 +438,7 @@
{
boolean needToRead = false;
PageCache cache = null;
- synchronized (this)
+ synchronized (softCache)
{
if (pageId > pagingStore.getCurrentWritingPage())
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -217,6 +217,16 @@
// PagingStore implementation ------------------------------------
+ public void lock()
+ {
+ writeLock.lock();
+ }
+
+ public void unlock()
+ {
+ writeLock.unlock();
+ }
+
public PageCursorProvider getCursorProvier()
{
return cursorProvider;
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-25 19:35:10 UTC (rev 9812)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-26 00:08:05 UTC (rev 9813)
@@ -272,6 +272,7 @@
for (int i = 0; i < 1000; i++)
{
+ System.out.println("Reading Msg : " + i);
Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertNotNull(msg);
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
13 years, 6 months
JBoss hornetq SVN: r9812 - in trunk: hornetq-rest/docbook/reference/en and 6 other directories.
by do-not-reply@jboss.org
Author: bill.burke(a)jboss.com
Date: 2010-10-25 15:35:10 -0400 (Mon, 25 Oct 2010)
New Revision: 9812
Added:
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/embedding-hornetq.xml
trunk/hornetq-rest/docbook/reference/en/master.xml
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java
Log:
fix docs, merge REST from beta2 release
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/docs/user-manual/en/configuration-index.xml 2010-10-25 19:35:10 UTC (rev 9812)
@@ -1026,7 +1026,7 @@
<entry>generic</entry>
</row>
<row>
- <entry id="configuration.connection-factory.signature">
+ <entry id="configuration.connection-factory.signature.xa">
<link linkend="using-jms.configure.factory.types">connection-factory.xa</link>
</entry>
<entry>Boolean</entry>
Modified: trunk/docs/user-manual/en/embedding-hornetq.xml
===================================================================
--- trunk/docs/user-manual/en/embedding-hornetq.xml 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/docs/user-manual/en/embedding-hornetq.xml 2010-10-25 19:35:10 UTC (rev 9812)
@@ -46,19 +46,8 @@
two different helper classes for this depending on whether your using the
HornetQ Core API or JMS.</para>
-<<<<<<< .mine
<section>
<title>Core API Only</title>
-=======
-config.setAcceptorConfigurations(transports);</programlisting>
- <para>You need to instantiate and start HornetQ server. The class <literal
- >org.hornetq.api.core.server.HornetQ</literal> has a few static methods for creating
- servers with common configurations.</para>
- <programlisting>import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-
->>>>>>> .r9629
-
<para>For instantiating a core HornetQ Server only, the steps are pretty
simple. The example requires that you have defined a configuration file
<literal>hornetq-configuration.xml</literal> in your
@@ -66,38 +55,9 @@
...
-<<<<<<< .mine
EmbeddedHornetQ embedded = new EmbeddedHornetQ();
embedded.start();
-=======
-HornetQServer server = HornetQServers.newHornetQServer(config);
->>>>>>> .r9629
-<<<<<<< .mine
-// Assuming you defined an "in-vm" acceptor within your hornetq-configuration.xml file
-=======
-server.start();</programlisting>
- <para>You also have the option of instantiating <literal>HornetQServerImpl</literal>
- directly:</para>
- <programlisting>HornetQServer server = new HornetQServerImpl(config);
-server.start();</programlisting>
- </section>
- <section>
- <title>Dependency Frameworks</title>
- <para>You may also choose to use a dependency injection framework such as <trademark>JBoss
- Micro Container</trademark> or <trademark>Spring Framework</trademark>.</para>
- <para>HornetQ standalone uses JBoss Micro Container as the injection framework. <literal
- >HornetQBootstrapServer</literal> and <literal>hornetq-beans.xml</literal> which are
- part of the HornetQ distribution provide a very complete implementation of what's needed
- to bootstrap the server using JBoss Micro Container. </para>
- <para>When using JBoss Micro Container, you need to provide an XML file declaring the
- <literal>HornetQServer</literal> and <literal>Configuration</literal> object, you
- can also inject a security manager and a MBean server if you want, but those are
- optional.</para>
- <para>A very basic XML Bean declaration for the JBoss Micro Container would be:</para>
- <programlisting><?xml version="1.0" encoding="UTF-8"?>
->>>>>>> .r9629
-
ClientSessionFactory nettyFactory = HornetQClient.createClientSessionFactory(
new TransportConfiguration(
InVMConnectorFactory.class.getName()));
Modified: trunk/hornetq-rest/docbook/reference/en/master.xml
===================================================================
--- trunk/hornetq-rest/docbook/reference/en/master.xml 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/docbook/reference/en/master.xml 2010-10-25 19:35:10 UTC (rev 9812)
@@ -870,6 +870,16 @@
the server. Only usable on topics.</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term>selector</term>
+
+ <listitem>
+ <para>This is an optional JMS selector string. The HornetQ REST
+ interface adds HTTP headers to the JMS message for REST produced
+ messages. HTTP headers are prefixed with "http_" and every '-'
+ charactor is converted to a '$'.</para>
+ </listitem>
+ </varlistentry>
</variablelist>
<sect1>
@@ -1480,6 +1490,10 @@
<programlisting><push-registration>
<durable>false</durable>
+ <selector><![CDATA[
+ SomeAttribute > 1
+ ]]>
+ </selector>
<link rel="push" href="http://somewhere.com" type="application/json" method="PUT"/>
</push-registration>
</programlisting>
@@ -1493,6 +1507,10 @@
<literal>queue-push-store-dir</literal> config variable defined in
Chapter 2. (<literal>topic-push-store-dir</literal> for topics).</para>
+ <para>The <literal>selector</literal> element is optional and defines a
+ JMS message selector. You should enclose it within CDATA blocks as some
+ of the selector characters are illegal XML.</para>
+
<para>The <literal>link</literal> element specifies the basis of the
interaction. The <literal>href</literal> attribute contains the URL you
want to interact with. It is the only required attribute. The
@@ -1562,11 +1580,16 @@
<title>The Topic Push Subscription XML</title>
<para>The push XML for a topic is the same except the root element is
- push-topic-registration. The rest of the document is the same. Here's an
+ push-topic-registration. (Also remember the <literal>selector</literal>
+ element is optional). The rest of the document is the same. Here's an
example of a template registration:</para>
<programlisting><push-topic-registration>
<durable>true</durable>
+ <selector><![CDATA[
+ SomeAttribute > 1
+ ]]>
+ </selector>
<link rel="template" href="http://somewhere.com/resources/{id}/messages" method="POST"/>
</push-topic registration></programlisting>
</sect1>
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -31,10 +31,10 @@
protected String startup = Long.toString(System.currentTimeMillis());
protected volatile Acknowledgement ack;
- public AcknowledgedQueueConsumer(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager)
+ public AcknowledgedQueueConsumer(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager, String selector)
throws HornetQException
{
- super(factory, destination, id, serviceManager);
+ super(factory, destination, id, serviceManager, selector);
autoAck = false;
}
@@ -187,9 +187,7 @@
try
{
- session = factory.createSession();
- consumer = session.createConsumer(destination);
- session.start();
+ createSession();
}
catch (Exception e)
{
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -36,6 +36,9 @@
protected int consumerTimeoutSeconds;
protected DestinationServiceManager serviceManager;
+ protected static final int ACKNOWLEDGED = 0x01;
+ protected static final int SELECTOR_SET = 0x02;
+
public DestinationServiceManager getServiceManager()
{
return serviceManager;
@@ -78,6 +81,7 @@
private Object timeoutLock = new Object();
+ @Override
public void testTimeout(String target)
{
synchronized (timeoutLock)
@@ -107,32 +111,41 @@
@POST
public Response createSubscription(@FormParam("autoAck") @DefaultValue("true") boolean autoAck,
+ @FormParam("selector") String selector,
@Context UriInfo uriInfo)
{
try
{
QueueConsumer consumer = null;
+ int attributes = 0;
+ if (selector != null)
+ {
+ attributes = attributes | SELECTOR_SET;
+ }
+
if (autoAck)
{
- consumer = createConsumer();
+ consumer = createConsumer(selector);
}
else
{
- consumer = createAcknowledgedConsumer();
+ attributes |= ACKNOWLEDGED;
+ consumer = createAcknowledgedConsumer(selector);
}
+ String attributesSegment = "attributes-" + attributes;
UriBuilder location = uriInfo.getAbsolutePathBuilder();
- if (autoAck) location.path("auto-ack");
- else location.path("acknowledged");
+ location.path(attributesSegment);
location.path(consumer.getId());
Response.ResponseBuilder builder = Response.created(location.build());
+
if (autoAck)
{
- QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/auto-ack/" + consumer.getId(), "-1");
+ QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment +"/" + consumer.getId(), "-1");
}
else
{
- AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + consumer.getId(), "-1");
+ AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment +"/" + consumer.getId(), "-1");
}
return builder.build();
@@ -147,11 +160,11 @@
}
}
- public QueueConsumer createConsumer()
+ public QueueConsumer createConsumer(String selector)
throws HornetQException
{
String genId = sessionCounter.getAndIncrement() + "-queue-" + destination + "-" + startup;
- QueueConsumer consumer = new QueueConsumer(sessionFactory, destination, genId, serviceManager);
+ QueueConsumer consumer = new QueueConsumer(sessionFactory, destination, genId, serviceManager, selector);
synchronized (timeoutLock)
{
queueConsumers.put(genId, consumer);
@@ -160,11 +173,11 @@
return consumer;
}
- public QueueConsumer createAcknowledgedConsumer()
+ public QueueConsumer createAcknowledgedConsumer(String selector)
throws HornetQException
{
String genId = sessionCounter.getAndIncrement() + "-queue-" + destination + "-" + startup;
- QueueConsumer consumer = new AcknowledgedQueueConsumer(sessionFactory, destination, genId, serviceManager);
+ QueueConsumer consumer = new AcknowledgedQueueConsumer(sessionFactory, destination, genId, serviceManager, selector);
synchronized (timeoutLock)
{
queueConsumers.put(genId, consumer);
@@ -173,85 +186,81 @@
return consumer;
}
- @Path("auto-ack/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
@GET
- public Response getConsumer(@PathParam("consumer-id") String consumerId,
+ public Response getConsumer(@PathParam("attributes") int attributes,
+ @PathParam("consumer-id") String consumerId,
@Context UriInfo uriInfo) throws Exception
{
- return headConsumer(consumerId, uriInfo);
+ return headConsumer(attributes, consumerId, uriInfo);
}
- @Path("auto-ack/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
@HEAD
- public Response headConsumer(@PathParam("consumer-id") String consumerId,
+ public Response headConsumer(@PathParam("attributes") int attributes,
+ @PathParam("consumer-id") String consumerId,
@Context UriInfo uriInfo) throws Exception
{
- QueueConsumer consumer = findConsumer(consumerId);
+ QueueConsumer consumer = findConsumer(attributes, consumerId, uriInfo);
Response.ResponseBuilder builder = Response.noContent();
// we synchronize just in case a failed request is still processing
synchronized (consumer)
{
- QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()));
+ if ( (attributes & ACKNOWLEDGED) > 0)
+ {
+ AcknowledgedQueueConsumer ackedConsumer = (AcknowledgedQueueConsumer)consumer;
+ Acknowledgement ack = ackedConsumer.getAck();
+ if (ack == null || ack.wasSet())
+ {
+ AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/attributes-" + attributes + "/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()));
+ }
+ else
+ {
+ ackedConsumer.setAcknowledgementLink(builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/attributes-" + attributes + "/" + consumer.getId());
+ }
+
+ }
+ else
+ {
+ QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/attributes-" + attributes + "/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()));
+ }
}
return builder.build();
}
- @Path("auto-ack/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
public QueueConsumer findConsumer(
- @PathParam("consumer-id") String consumerId) throws Exception
+ @PathParam("attributes") int attributes,
+ @PathParam("consumer-id") String consumerId,
+ @Context UriInfo uriInfo) throws Exception
{
QueueConsumer consumer = queueConsumers.get(consumerId);
if (consumer == null)
{
- QueueConsumer tmp = new QueueConsumer(sessionFactory, destination, consumerId, serviceManager);
- consumer = addConsumerToMap(consumerId, tmp);
- }
- return consumer;
- }
+ if ( (attributes & SELECTOR_SET) > 0)
+ {
- @Path("acknowledged/{consumer-id}")
- @GET
- public Response getAcknowledgedConsumer(@PathParam("consumer-id") String consumerId,
- @Context UriInfo uriInfo) throws Exception
- {
- return headAcknowledgedConsumer(consumerId, uriInfo);
- }
+ Response.ResponseBuilder builder = Response.status(Response.Status.GONE)
+ .entity("Cannot reconnect to selector-based consumer. You must recreate the consumer session.")
+ .type("text/plain");
+ UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
+ uriBuilder.path(uriInfo.getMatchedURIs().get(1));
+ serviceManager.getLinkStrategy().setLinkHeader(builder, "pull-consumers", "pull-consumers", uriBuilder.build().toString(), null);
+ throw new WebApplicationException(builder.build());
+
+ }
+ if ( (attributes & ACKNOWLEDGED) > 0)
+ {
+ QueueConsumer tmp = new AcknowledgedQueueConsumer(sessionFactory, destination, consumerId, serviceManager, null);
+ consumer = addConsumerToMap(consumerId, tmp);
- @Path("acknowledged/{consumer-id}")
- @HEAD
- public Response headAcknowledgedConsumer(@PathParam("consumer-id") String consumerId,
- @Context UriInfo uriInfo) throws Exception
- {
- AcknowledgedQueueConsumer consumer = (AcknowledgedQueueConsumer) findAcknowledgedConsumer(consumerId);
- Response.ResponseBuilder builder = Response.ok();
- // we synchronize just in case a failed request is still processing
- synchronized (consumer)
- {
- Acknowledgement ack = consumer.getAck();
- if (ack == null || ack.wasSet())
- {
- AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()));
}
else
{
- consumer.setAcknowledgementLink(builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + consumer.getId());
+ QueueConsumer tmp = new QueueConsumer(sessionFactory, destination, consumerId, serviceManager, null);
+ consumer = addConsumerToMap(consumerId, tmp);
}
}
- return builder.build();
- }
-
-
- @Path("acknowledged/{consumer-id}")
- public QueueConsumer findAcknowledgedConsumer(
- @PathParam("consumer-id") String consumerId) throws Exception
- {
- QueueConsumer consumer = queueConsumers.get(consumerId);
- if (consumer == null)
- {
- QueueConsumer tmp = new AcknowledgedQueueConsumer(sessionFactory, destination, consumerId, serviceManager);
- ;
- consumer = addConsumerToMap(consumerId, tmp);
- }
return consumer;
}
@@ -275,16 +284,8 @@
}
- @Path("acknowledged/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
@DELETE
- public void closeAcknowledgedSession(
- @PathParam("consumer-id") String consumerId)
- {
- closeSession(consumerId);
- }
-
- @Path("auto-ack/{consumer-id}")
- @DELETE
public void closeSession(
@PathParam("consumer-id") String consumerId)
{
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -1,12 +1,12 @@
package org.hornetq.rest.queue;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Message;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.rest.util.HttpMessageHelper;
+import org.hornetq.api.core.Message;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -5,6 +5,7 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.rest.util.Constants;
import org.hornetq.rest.util.LinkStrategy;
@@ -37,6 +38,7 @@
protected long lastPing = System.currentTimeMillis();
protected DestinationServiceManager serviceManager;
protected boolean autoAck = true;
+ protected String selector;
/**
* token used to create consume-next links
@@ -70,14 +72,15 @@
lastPing = System.currentTimeMillis();
}
- public QueueConsumer(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager) throws HornetQException
+ public QueueConsumer(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager, String selector) throws HornetQException
{
this.factory = factory;
this.destination = destination;
this.id = id;
this.serviceManager = serviceManager;
+ this.selector = selector;
- createSession(factory, destination);
+ createSession();
}
public String getId()
@@ -191,11 +194,18 @@
}
}
- protected void createSession(ClientSessionFactory factory, String destination)
+ protected void createSession()
throws HornetQException
{
session = factory.createSession(true, true);
- consumer = session.createConsumer(destination);
+ if (selector == null)
+ {
+ consumer = session.createConsumer(destination);
+ }
+ else
+ {
+ consumer = session.createConsumer(destination, SelectorTranslator.convertToHornetQFilterString(selector));
+ }
session.start();
}
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -7,6 +7,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.rest.queue.push.xml.PushRegistration;
/**
@@ -68,7 +69,14 @@
strategy.start();
session = factory.createSession(false, false);
- consumer = session.createConsumer(destination);
+ if (registration.getSelector() != null)
+ {
+ consumer = session.createConsumer(destination, SelectorTranslator.convertToHornetQFilterString(registration.getSelector()));
+ }
+ else
+ {
+ consumer = session.createConsumer(destination);
+ }
consumer.setMessageHandler(this);
session.start();
log.info("Push consumer started for: " + registration.getTarget());
@@ -100,6 +108,7 @@
}
}
+ @Override
public void onMessage(ClientMessage clientMessage)
{
if (strategy.push(clientMessage) == false)
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -18,7 +18,7 @@
*/
@XmlRootElement(name = "push-registration")
@XmlAccessorType(XmlAccessType.PROPERTY)
-@XmlType(propOrder = {"destination", "durable", "target", "authenticationMechanism", "headers"})
+@XmlType(propOrder = {"destination", "durable", "selector", "target", "authenticationMechanism", "headers"})
public class PushRegistration implements Serializable
{
private String id;
@@ -28,6 +28,7 @@
private List<XmlHttpHeader> headers = new ArrayList<XmlHttpHeader>();
private String destination;
private Object loadedFrom;
+ private String selector;
@XmlTransient
public Object getLoadedFrom()
@@ -73,6 +74,16 @@
this.durable = durable;
}
+ public String getSelector()
+ {
+ return selector;
+ }
+
+ public void setSelector(String selector)
+ {
+ this.selector = selector;
+ }
+
@XmlElementRef
public XmlLink getTarget()
{
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -13,10 +13,10 @@
{
private boolean durable;
- public AcknowledgedSubscriptionResource(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager)
+ public AcknowledgedSubscriptionResource(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager, String selector)
throws HornetQException
{
- super(factory, destination, id, serviceManager);
+ super(factory, destination, id, serviceManager, selector);
}
public boolean isDurable()
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -18,6 +18,7 @@
super(dirname);
}
+ @Override
public synchronized List<PushTopicRegistration> getByTopic(String topic)
{
List<PushTopicRegistration> list = new ArrayList<PushTopicRegistration>();
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -13,10 +13,10 @@
{
boolean durable;
- public SubscriptionResource(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager)
+ public SubscriptionResource(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager, String selector)
throws HornetQException
{
- super(factory, destination, id, serviceManager);
+ super(factory, destination, id, serviceManager, selector);
}
public boolean isDurable()
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -84,6 +84,7 @@
private Object timeoutLock = new Object();
+ @Override
public void testTimeout(String target)
{
synchronized (timeoutLock)
@@ -127,6 +128,7 @@
public Response createSubscription(@FormParam("durable") @DefaultValue("false") boolean durable,
@FormParam("autoAck") @DefaultValue("true") boolean autoAck,
@FormParam("name") String subscriptionName,
+ @FormParam("selector") String selector,
@Context UriInfo uriInfo)
{
if (subscriptionName != null)
@@ -185,7 +187,7 @@
session.createTemporaryQueue(destination, subscriptionName);
}
}
- QueueConsumer consumer = createConsumer(durable, autoAck, subscriptionName);
+ QueueConsumer consumer = createConsumer(durable, autoAck, subscriptionName, selector);
queueConsumers.put(consumer.getId(), consumer);
serviceManager.getTimeoutTask().add(this, consumer.getId());
@@ -225,19 +227,19 @@
}
}
- protected QueueConsumer createConsumer(boolean durable, boolean autoAck, String subscriptionName)
+ protected QueueConsumer createConsumer(boolean durable, boolean autoAck, String subscriptionName, String selector)
throws HornetQException
{
QueueConsumer consumer;
if (autoAck)
{
- SubscriptionResource subscription = new SubscriptionResource(sessionFactory, subscriptionName, subscriptionName, serviceManager);
+ SubscriptionResource subscription = new SubscriptionResource(sessionFactory, subscriptionName, subscriptionName, serviceManager, selector);
subscription.setDurable(durable);
consumer = subscription;
}
else
{
- AcknowledgedSubscriptionResource subscription = new AcknowledgedSubscriptionResource(sessionFactory, subscriptionName, subscriptionName, serviceManager);
+ AcknowledgedSubscriptionResource subscription = new AcknowledgedSubscriptionResource(sessionFactory, subscriptionName, subscriptionName, serviceManager, selector);
subscription.setDurable(durable);
consumer = subscription;
}
@@ -375,7 +377,7 @@
QueueConsumer tmp = null;
try
{
- tmp = createConsumer(true, autoAck, subscriptionId);
+ tmp = createConsumer(true, autoAck, subscriptionId, null);
}
catch (HornetQException e)
{
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -81,17 +81,20 @@
int size = message.getBodySize();
if (size > 0)
{
- byte[] body = new byte[size];
- message.getBodyBuffer().readBytes(body);
Boolean aBoolean = message.getBooleanProperty(POSTED_AS_HTTP_MESSAGE);
if (aBoolean != null && aBoolean.booleanValue())
{
+ byte[] body = new byte[size];
+ message.getBodyBuffer().readBytes(body);
//System.out.println("Building Message from HTTP message");
request.body(contentType, body);
}
else
{
// assume posted as a JMS or HornetQ object message
+ size = message.getBodyBuffer().readInt();
+ byte[] body = new byte[size];
+ message.getBodyBuffer().readBytes(body);
ByteArrayInputStream bais = new ByteArrayInputStream(body);
Object obj = null;
try
Added: trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java (rev 0)
+++ trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -0,0 +1,304 @@
+package org.hornetq.rest.test;
+
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.rest.HttpHeaderProperty;
+import org.hornetq.rest.queue.push.xml.XmlLink;
+import org.hornetq.rest.topic.PushTopicRegistration;
+import org.hornetq.rest.topic.TopicDeployment;
+import org.jboss.resteasy.client.ClientRequest;
+import org.jboss.resteasy.client.ClientResponse;
+import org.jboss.resteasy.spi.Link;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+import static org.jboss.resteasy.test.TestPortProvider.*;
+
+/**
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class SelectorTest extends MessageTestBase
+{
+ public static ConnectionFactory connectionFactory;
+ public static String topicName = HornetQDestination.createQueueAddressFromName("testTopic").toString();
+
+ @BeforeClass
+ public static void setup() throws Exception
+ {
+ connectionFactory = new HornetQJMSConnectionFactory(manager.getQueueManager().getSessionFactory());
+ System.out.println("Queue name: " + topicName);
+ TopicDeployment deployment = new TopicDeployment();
+ deployment.setDuplicatesAllowed(true);
+ deployment.setDurableSend(false);
+ deployment.setName(topicName);
+ manager.getTopicManager().deploy(deployment);
+ }
+
+ @XmlRootElement
+ public static class Order implements Serializable
+ {
+ private String name;
+ private String amount;
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public String getAmount()
+ {
+ return amount;
+ }
+
+ public void setAmount(String amount)
+ {
+ this.amount = amount;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Order order = (Order) o;
+
+ if (!amount.equals(order.amount)) return false;
+ if (!name.equals(order.name)) return false;
+
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Order{" +
+ "name='" + name + '\'' +
+ '}';
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = name.hashCode();
+ result = 31 * result + amount.hashCode();
+ return result;
+ }
+ }
+
+ public static Destination createDestination(String dest)
+ {
+ HornetQDestination destination = (HornetQDestination) HornetQDestination.fromAddress(dest);
+ System.out.println("SimpleAddress: " + destination.getSimpleAddress());
+ return destination;
+ }
+
+ public static void publish(String dest, Serializable object, String contentType, String tag) throws Exception
+ {
+ Connection conn = connectionFactory.createConnection();
+ try
+ {
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = createDestination(dest);
+ MessageProducer producer = session.createProducer(destination);
+ ObjectMessage message = session.createObjectMessage();
+
+ if (contentType != null)
+ {
+ message.setStringProperty(HttpHeaderProperty.CONTENT_TYPE, contentType);
+ }
+ if (tag != null)
+ {
+ message.setStringProperty("MyTag", tag);
+ }
+ message.setObject(object);
+
+ producer.send(message);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ @Path("/push")
+ public static class PushReceiver
+ {
+ public static Order oneOrder;
+ public static Order twoOrder;
+
+ @POST
+ @Path("one")
+ public void one(Order order)
+ {
+ oneOrder = order;
+ }
+
+ @POST
+ @Path("two")
+ public void two(Order order)
+ {
+ twoOrder = order;
+ }
+
+
+ }
+
+ @Test
+ public void testPush() throws Exception
+ {
+ server.getJaxrsServer().getDeployment().getRegistry().addPerRequestResource(PushReceiver.class);
+ ClientRequest request = new ClientRequest(generateURL("/topics/" + topicName));
+
+ ClientResponse response = request.head();
+ Assert.assertEquals(200, response.getStatus());
+ Link consumers = MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response, "push-subscriptions");
+ System.out.println("push: " + consumers);
+
+ PushTopicRegistration oneReg = new PushTopicRegistration();
+ oneReg.setDurable(false);
+ XmlLink target = new XmlLink();
+ target.setMethod("post");
+ target.setHref(generateURL("/push/one"));
+ target.setType("application/xml");
+ oneReg.setTarget(target);
+ oneReg.setSelector("MyTag = '1'");
+ response = consumers.request().body("application/xml", oneReg).post();
+ Link oneSubscription = response.getLocation();
+
+ PushTopicRegistration twoReg = new PushTopicRegistration();
+ twoReg.setDurable(false);
+ target = new XmlLink();
+ target.setMethod("post");
+ target.setHref(generateURL("/push/two"));
+ target.setType("application/xml");
+ twoReg.setTarget(target);
+ twoReg.setSelector("MyTag = '2'");
+ response = consumers.request().body("application/xml", twoReg).post();
+ Link twoSubscription = response.getLocation();
+
+ Order order = new Order();
+ order.setName("1");
+ order.setAmount("$5.00");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ order.setName("2");
+ publish(topicName, order, null, "2");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.twoOrder);
+
+ order.setName("3");
+ publish(topicName, order, null, "2");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.twoOrder);
+
+ order.setName("4");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ order.setName("5");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ order.setName("6");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ oneSubscription.request().delete();
+ twoSubscription.request().delete();
+
+
+ }
+
+
+ @Test
+ public void testPull() throws Exception
+ {
+ ClientRequest request = new ClientRequest(generateURL("/topics/" + topicName));
+
+ ClientResponse response = request.head();
+ Assert.assertEquals(200, response.getStatus());
+ Link consumers = MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response, "pull-subscriptions");
+ System.out.println("pull: " + consumers);
+ response = consumers.request().formParameter("autoAck", "true")
+ .formParameter("selector", "MyTag = '1'").post();
+
+ Link consumeOne = MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response, "consume-next");
+ System.out.println("consumeOne: " + consumeOne);
+ response = consumers.request().formParameter("autoAck", "true")
+ .formParameter("selector", "MyTag = '2'").post();
+ Link consumeTwo = MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response, "consume-next");
+ System.out.println("consumeTwo: " + consumeTwo);
+
+
+ // test that Accept header is used to set content-type
+ {
+ Order order = new Order();
+ order.setName("1");
+ order.setAmount("$5.00");
+ publish(topicName, order, null, "1");
+ order.setName("2");
+ publish(topicName, order, null, "2");
+ order.setName("3");
+ publish(topicName, order, null, "2");
+ order.setName("4");
+ publish(topicName, order, null, "1");
+ order.setName("5");
+ publish(topicName, order, null, "1");
+ order.setName("6");
+ publish(topicName, order, null, "1");
+
+ {
+ order.setName("1");
+ consumeOne = consumeOrder(order, consumeOne);
+ order.setName("2");
+ consumeTwo = consumeOrder(order, consumeTwo);
+ order.setName("3");
+ consumeTwo = consumeOrder(order, consumeTwo);
+ order.setName("4");
+ consumeOne = consumeOrder(order, consumeOne);
+ order.setName("5");
+ consumeOne = consumeOrder(order, consumeOne);
+ order.setName("6");
+ consumeOne = consumeOrder(order, consumeOne);
+ }
+ }
+ }
+
+ private Link consumeOrder(Order order, Link consumeNext)
+ throws Exception
+ {
+ ClientResponse res = consumeNext.request().header("Accept-Wait", "4").accept("application/xml").post(String.class);
+ Assert.assertEquals(200, res.getStatus());
+ Assert.assertEquals("application/xml", res.getHeaders().getFirst("Content-Type").toString().toLowerCase());
+ Order order2 = (Order) res.getEntity(Order.class);
+ Assert.assertEquals(order, order2);
+ consumeNext = MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), res, "consume-next");
+ Assert.assertNotNull(consumeNext);
+ return consumeNext;
+ }
+}
\ No newline at end of file
13 years, 6 months
JBoss hornetq SVN: r9811 - trunk/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-25 11:49:13 -0400 (Mon, 25 Oct 2010)
New Revision: 9811
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
oops
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-25 15:48:41 UTC (rev 9810)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-25 15:49:13 UTC (rev 9811)
@@ -584,7 +584,6 @@
if (sm != null)
{
sm.validateUser(login, passcode);
- server.getSecurityManager().validateUser(login, passcode);
}
connection.setLogin(login);
13 years, 6 months
JBoss hornetq SVN: r9810 - trunk/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-25 11:48:41 -0400 (Mon, 25 Oct 2010)
New Revision: 9810
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
Avoiding NPE because of security manager
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-22 19:40:19 UTC (rev 9809)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-25 15:48:41 UTC (rev 9810)
@@ -39,6 +39,7 @@
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.UUIDGenerator;
/**
@@ -577,7 +578,14 @@
String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
- server.getSecurityManager().validateUser(login, passcode);
+ HornetQSecurityManager sm = server.getSecurityManager();
+
+ // The sm will be null case security is not enabled...
+ if (sm != null)
+ {
+ sm.validateUser(login, passcode);
+ server.getSecurityManager().validateUser(login, passcode);
+ }
connection.setLogin(login);
connection.setPasscode(passcode);
13 years, 6 months
JBoss hornetq SVN: r9809 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-22 15:40:19 -0400 (Fri, 22 Oct 2010)
New Revision: 9809
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Adding filter on cursor
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-22 01:08:55 UTC (rev 9808)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-22 19:40:19 UTC (rev 9809)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
@@ -50,11 +51,13 @@
*/
PageCursor getPersistentCursor(long queueId);
+ PageCursor createPersistentCursor(long queueId, Filter filter);
+
/**
* Create a non persistent cursor, usually associated with browsing
* @return
*/
- PageCursor createNonPersistentCursor();
+ PageCursor createNonPersistentCursor(Filter filter);
Pair<PagePosition, PagedMessage> getNext(PageCursor cursor, PagePosition pos) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-22 01:08:55 UTC (rev 9808)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-22 19:40:19 UTC (rev 9809)
@@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
@@ -70,6 +71,8 @@
private final StorageManager store;
private final long cursorId;
+
+ private final Filter filter;
private final PagingStore pageStore;
@@ -96,6 +99,7 @@
final PagingStore pageStore,
final StorageManager store,
final Executor executor,
+ final Filter filter,
final long cursorId)
{
this.pageStore = pageStore;
@@ -103,6 +107,7 @@
this.cursorProvider = cursorProvider;
this.cursorId = cursorId;
this.executor = executor;
+ this.filter = filter;
}
// Public --------------------------------------------------------
@@ -472,8 +477,14 @@
protected boolean match(final ServerMessage message)
{
- // To be used with expressions
- return true;
+ if (filter == null)
+ {
+ return true;
+ }
+ else
+ {
+ return filter.match(message);
+ }
}
// Private -------------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-22 01:08:55 UTC (rev 9808)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-22 19:40:19 UTC (rev 9809)
@@ -19,6 +19,7 @@
import java.util.concurrent.Executor;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -92,32 +93,34 @@
{
return pagingStore;
}
+
+ public PageCursor createPersistentCursor(long cursorID, Filter filter)
+ {
+ PageCursor activeCursor = activeCursors.get(cursorID);
+ if (activeCursor != null)
+ {
+ throw new IllegalStateException("Cursor " + cursorID + " had already been created");
+ }
+
+ activeCursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), filter, cursorID);
+ activeCursors.put(cursorID, activeCursor);
+ return activeCursor;
+ }
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
public PageCursor getPersistentCursor(long cursorID)
{
- PageCursor activeCursor = activeCursors.get(cursorID);
- if (activeCursor == null)
- {
- activeCursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), cursorID);
- PageCursor previousValue = activeCursors.putIfAbsent(cursorID, activeCursor);
- if (previousValue != null)
- {
- activeCursor = previousValue;
- }
- }
-
- return activeCursor;
+ return activeCursors.get(cursorID);
}
/**
* this will create a non-persistent cursor
*/
- public PageCursor createNonPersistentCursor()
+ public PageCursor createNonPersistentCursor(Filter filter)
{
- PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), 0);
+ PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), filter, 0);
nonPersistentCursors.add(cursor);
return cursor;
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-22 01:08:55 UTC (rev 9808)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-22 19:40:19 UTC (rev 9809)
@@ -21,8 +21,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+ import java.util.Map.Entry;
import java.util.Set;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -61,7 +61,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
@@ -1214,6 +1213,8 @@
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
+
+ pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createPersistentCursor(queue.getID(), filter);
}
for (GroupingInfo groupingInfo : groupingInfos)
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-22 01:08:55 UTC (rev 9808)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-22 19:40:19 UTC (rev 9809)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
@@ -63,7 +64,7 @@
private SimpleString ADDRESS = new SimpleString("test-add");
private HornetQServer server;
-
+
private Queue queue;
private static final int PAGE_MAX = -1;
@@ -86,7 +87,9 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
+ PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS),
+ server.getStorageManager(),
+ server.getExecutorFactory());
for (int i = 0; i < numberOfPages; i++)
{
@@ -94,15 +97,14 @@
System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
}
-
+
forceGC();
-
+
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
+
System.out.println("Cache size = " + cursorProvider.getCacheSize());
}
-
public void testSimpleCursor() throws Exception
{
@@ -111,11 +113,11 @@
int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
System.out.println("NumberOfPages = " + numberOfPages);
-
+
PageCursor cursor = createNonPersistentCursor();
-
+
Pair<PagePosition, PagedMessage> msg;
-
+
int key = 0;
while ((msg = cursor.moveNext()) != null)
{
@@ -123,20 +125,104 @@
cursor.ack(msg.a);
}
assertEquals(NUM_MESSAGES, key);
-
-
+
forceGC();
-
+
assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
+ public void testSimpleCursorWithFilter() throws Exception
+ {
+ final int NUM_MESSAGES = 100;
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursor cursorEven = createNonPersistentCursor(new Filter()
+ {
+
+ public boolean match(ServerMessage message)
+ {
+ Boolean property = message.getBooleanProperty("even");
+ if (property == null)
+ {
+ return false;
+ }
+ else
+ {
+ return property.booleanValue();
+ }
+ }
+
+ public SimpleString getFilterString()
+ {
+ return new SimpleString("even=true");
+ }
+
+ });
+
+ PageCursor cursorOdd = createNonPersistentCursor(new Filter()
+ {
+
+ public boolean match(ServerMessage message)
+ {
+ Boolean property = message.getBooleanProperty("even");
+ if (property == null)
+ {
+ return false;
+ }
+ else
+ {
+ return !property.booleanValue();
+ }
+ }
+
+ public SimpleString getFilterString()
+ {
+ return new SimpleString("even=true");
+ }
+
+ });
+
+ Pair<PagePosition, PagedMessage> msg;
+
+ int key = 0;
+ while ((msg = cursorEven.moveNext()) != null)
+ {
+ assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
+ assertTrue(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+ key += 2;
+ cursorEven.ack(msg.a);
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+ key = 1;
+ while ((msg = cursorOdd.moveNext()) != null)
+ {
+ assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
+ assertFalse(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+ key += 2;
+ cursorOdd.ack(msg.a);
+ }
+ assertEquals(NUM_MESSAGES + 1, key);
+
+ forceGC();
+
+ assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
+
+ server.stop();
+ createServer();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+ }
+
public void testReadNextPage() throws Exception
{
@@ -147,196 +233,226 @@
System.out.println("NumberOfPages = " + numberOfPages);
PageCursorProvider cursorProvider = lookupCursorProvider();
-
- PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2,0));
-
+
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2, 0));
+
assertNull(cache);
}
-
-
public void testRestart() throws Exception
{
final int NUM_MESSAGES = 1000;
int numberOfPages = addMessages(NUM_MESSAGES, 100 * 1024);
-
+
System.out.println("Number of pages = " + numberOfPages);
-
+
PageCursorProvider cursorProvider = lookupCursorProvider();
-
-
- PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
- PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager().getPageStore(ADDRESS).getFirstPage(), 0));
+ // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .createPersistentCursor(queue.getID(), null);
+
+ PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getFirstPage(), 0));
+
int firstPageSize = firstPage.getNumberOfMessages();
-
+
firstPage = null;
-
+
System.out.println("Cursor: " + cursor);
cursorProvider.printDebug();
- for (int i = 0 ; i < 1000 ; i++)
+ for (int i = 0; i < 1000; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertNotNull(msg);
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-
+
if (i < firstPageSize)
{
cursor.ack(msg.a);
}
}
cursorProvider.printDebug();
-
+
// needs to clear the context since we are using the same thread over two distinct servers
// otherwise we will get the old executor on the factory
OperationContextImpl.clearContext();
-
+
server.stop();
-
+
server.start();
-
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getPersistentCursor(queue.getID());
+
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
System.out.println("Received " + i);
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertNotNull(msg);
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
-
+
cursor.ack(msg.a);
-
+
OperationContextImpl.getContext(null).waitCompletion();
-
+
}
- OperationContextImpl.getContext(null).waitCompletion();
+ OperationContextImpl.getContext(null).waitCompletion();
((PageCursorImpl)cursor).printDebug();
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
-
+
public void testRestartWithHoleOnAck() throws Exception
{
final int NUM_MESSAGES = 1000;
int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
-
+
System.out.println("Number of pages = " + numberOfPages);
-
+
PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
-
- PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .createPersistentCursor(queue.getID(), null);
+
System.out.println("Cursor: " + cursor);
- for (int i = 0 ; i < 100 ; i++)
+ for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
cursor.ack(msg.a);
}
}
-
+
server.stop();
-
+
OperationContextImpl.clearContext();
-
+
server.start();
-
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getPersistentCursor(queue.getID());
+
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
-
-
+
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
-
-
+
public void testRestartWithHoleOnAckAndTransaction() throws Exception
{
final int NUM_MESSAGES = 1000;
int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
-
+
System.out.println("Number of pages = " + numberOfPages);
-
+
PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
-
- PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .createPersistentCursor(queue.getID(), null);
+
System.out.println("Cursor: " + cursor);
-
+
Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
- for (int i = 0 ; i < 100 ; i++)
+ for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
cursor.ackTx(tx, msg.a);
}
}
-
+
tx.commit();
-
+
server.stop();
-
+
OperationContextImpl.clearContext();
-
+
server.start();
-
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getPersistentCursor(queue.getID());
+
tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
-
+
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ackTx(tx,msg.a);
+ cursor.ackTx(tx, msg.a);
}
-
+
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ackTx(tx,msg.a);
+ cursor.ackTx(tx, msg.a);
}
-
+
tx.commit();
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
+
}
-
+
public void testConsumeLivePage() throws Exception
{
PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
@@ -344,117 +460,131 @@
pageStore.startPaging();
final int NUM_MESSAGES = 100;
-
+
final int messageSize = 1024 * 1024;
-
-
+
PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
-
- PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .createPersistentCursor(queue.getID(), null);
+
System.out.println("Cursor: " + cursor);
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
- if (i % 100 == 0) System.out.println("Paged " + i);
-
+ if (i % 100 == 0)
+ System.out.println("Paged " + i);
+
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
msg.putIntProperty("key", i);
-
+
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
Assert.assertTrue(pageStore.page(msg));
-
+
Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
-
+
assertNotNull(readMessage);
-
+
assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
-
+
assertNull(cursor.moveNext());
}
-
+
server.stop();
-
+
OperationContextImpl.clearContext();
-
+
createServer();
-
+
pageStore = lookupPageStore(ADDRESS);
-
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getPersistentCursor(queue.getID());
+
for (int i = 0; i < NUM_MESSAGES * 2; i++)
{
- if (i % 100 == 0) System.out.println("Paged " + i);
+ if (i % 100 == 0)
+ System.out.println("Paged " + i);
if (i >= NUM_MESSAGES)
{
-
+
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
+
ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
msg.putIntProperty("key", i);
-
+
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
+
Assert.assertTrue(pageStore.page(msg));
}
-
+
Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
-
+
assertNotNull(readMessage);
-
+
assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
}
server.stop();
-
+
OperationContextImpl.clearContext();
-
+
createServer();
-
+
pageStore = lookupPageStore(ADDRESS);
-
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .getPersistentCursor(queue.getID());
+
for (int i = 0; i < NUM_MESSAGES * 3; i++)
{
- if (i % 100 == 0) System.out.println("Paged " + i);
+ if (i % 100 == 0)
+ System.out.println("Paged " + i);
if (i >= NUM_MESSAGES * 2)
{
-
+
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
-
+
ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
msg.putIntProperty("key", i);
-
+
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
-
+
Assert.assertTrue(pageStore.page(msg));
}
-
+
Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
-
+
assertNotNull(readMessage);
-
+
cursor.ack(readMessage.a);
-
+
assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
}
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
+
}
-
-
+
public void testPrepareScenarios() throws Exception
{
PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
@@ -462,40 +592,46 @@
pageStore.startPaging();
final int NUM_MESSAGES = 100;
-
+
final int messageSize = 100 * 1024;
-
-
+
PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
-
- PageCursor cursor = pageStore.getCursorProvier().getPersistentCursor(queue.getID());
-
+
+ // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager()
+ .getPageStore(ADDRESS)
+ .getCursorProvier()
+ .createPersistentCursor(queue.getID(), null);
+
System.out.println("Cursor: " + cursor);
-
+
StorageManager storage = this.server.getStorageManager();
-
+
PageTransactionInfoImpl pgtxRollback = new PageTransactionInfoImpl(storage.generateUniqueID());
PageTransactionInfoImpl pgtxForgotten = new PageTransactionInfoImpl(storage.generateUniqueID());
PageTransactionInfoImpl pgtxCommit = new PageTransactionInfoImpl(storage.generateUniqueID());
-
+
System.out.println("Forgetting tx " + pgtxForgotten.getTransactionID());
-
+
this.server.getPagingManager().addTransaction(pgtxRollback);
this.server.getPagingManager().addTransaction(pgtxCommit);
-
+
pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
pageStore.forceAnotherPage();
pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
pageStore.forceAnotherPage();
pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
pageStore.forceAnotherPage();
-
+
addMessages(300, NUM_MESSAGES, messageSize);
-
+
System.out.println("Number of pages - " + pageStore.getNumberOfPages());
-
// First consume what's already there without any tx as nothing was committed
for (int i = 300; i < 400; i++)
{
@@ -506,10 +642,10 @@
}
assertNull(cursor.moveNext());
-
+
cursor.printDebug();
pgtxRollback.rollback();
-
+
this.server.getPagingManager().removeTransaction(pgtxRollback.getTransactionID());
pgtxCommit.commit();
@@ -521,16 +657,15 @@
assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
cursor.ack(pos.a);
}
-
+
assertNull(cursor.moveNext());
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-
+
}
-
+
public void testCloseNonPersistentConsumer() throws Exception
{
@@ -541,12 +676,12 @@
System.out.println("NumberOfPages = " + numberOfPages);
PageCursorProvider cursorProvider = lookupCursorProvider();
-
- PageCursor cursor = cursorProvider.createNonPersistentCursor();
- PageCursorImpl cursor2 = (PageCursorImpl)cursorProvider.createNonPersistentCursor();
-
+
+ PageCursor cursor = cursorProvider.createNonPersistentCursor(null);
+ PageCursorImpl cursor2 = (PageCursorImpl)cursorProvider.createNonPersistentCursor(null);
+
Pair<PagePosition, PagedMessage> msg;
-
+
int key = 0;
while ((msg = cursor.moveNext()) != null)
{
@@ -554,18 +689,17 @@
cursor.ack(msg.a);
}
assertEquals(NUM_MESSAGES, key);
-
-
+
forceGC();
-
+
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
- for (int i = 0 ; i < 10; i++)
+
+ for (int i = 0; i < 10; i++)
{
msg = cursor2.moveNext();
assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
}
-
+
assertSame(cursor2.getProvider(), cursorProvider);
cursor2.close();
@@ -575,13 +709,12 @@
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
-
-
+
public void testLeavePageStateAndRestart() throws Exception
{
- // Validate the cursor are working fine when all the pages are gone, and then paging being restarted
+ // Validate the cursor are working fine when all the pages are gone, and then paging being restarted
}
-
+
public void testFirstMessageInTheMiddle() throws Exception
{
@@ -592,20 +725,20 @@
System.out.println("NumberOfPages = " + numberOfPages);
PageCursorProvider cursorProvider = lookupCursorProvider();
-
+
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
- PageCursor cursor = cursorProvider.createNonPersistentCursor();
- PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+ PageCursor cursor = cursorProvider.createNonPersistentCursor(null);
+ PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
cursor.bookmark(startingPos);
PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
msg.initMessage(server.getStorageManager());
int key = msg.getMessage().getIntProperty("key").intValue();
-
+
msg = null;
-
+
cache = null;
-
+
Pair<PagePosition, PagedMessage> msgCursor = null;
while ((msgCursor = cursor.moveNext()) != null)
{
@@ -613,18 +746,16 @@
cursor.ack(msgCursor.a);
}
assertEquals(NUM_MESSAGES, key);
-
-
+
forceGC();
-
+
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
-
-
+
public void testFirstMessageInTheMiddlePersistent() throws Exception
{
@@ -635,35 +766,39 @@
System.out.println("NumberOfPages = " + numberOfPages);
PageCursorProvider cursorProvider = lookupCursorProvider();
-
+
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
- PageCursor cursor = cursorProvider.getPersistentCursor(queue.getID());
- PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+ // TODO: We should be using getPersisentCursor here but I can't change the method here until createQueue is not
+ // creating the cursor also
+ // need to change this after some integration
+ // PageCursor cursor =
+ // this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+ PageCursor cursor = cursorProvider.createPersistentCursor(queue.getID(), null);
+ PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
cursor.bookmark(startingPos);
PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
msg.initMessage(server.getStorageManager());
int initialKey = msg.getMessage().getIntProperty("key").intValue();
int key = initialKey;
-
+
msg = null;
-
+
cache = null;
-
+
Pair<PagePosition, PagedMessage> msgCursor = null;
while ((msgCursor = cursor.moveNext()) != null)
{
assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
}
assertEquals(NUM_MESSAGES, key);
-
-
+
server.stop();
-
+
OperationContextImpl.clearContext();
-
+
createServer();
-
+
cursorProvider = lookupCursorProvider();
cursor = cursorProvider.getPersistentCursor(queue.getID());
key = initialKey;
@@ -672,18 +807,17 @@
assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msgCursor.a);
}
-
-
+
forceGC();
-
+
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
-
+
server.stop();
createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
-
+
private int addMessages(final int numMessages, final int messageSize) throws Exception
{
return addMessages(0, numMessages, messageSize);
@@ -702,17 +836,20 @@
for (int i = start; i < start + numMessages; i++)
{
- if (i % 100 == 0) System.out.println("Paged " + i);
+ if (i % 100 == 0)
+ System.out.println("Paged " + i);
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
msg.putIntProperty("key", i);
-
+ // to be used on tests that are validating filters
+ msg.putBooleanProperty("even", i % 2 == 0);
+
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
Assert.assertTrue(pageStore.page(msg));
}
-
+
return pageStore.getNumberOfPages();
}
@@ -734,53 +871,56 @@
super.setUp();
OperationContextImpl.clearContext();
System.out.println("Tmp:" + getTemporaryDir());
-
+
createServer();
-
- //createQueue(ADDRESS.toString(), ADDRESS.toString());
}
-
/**
* @throws Exception
*/
private void createServer() throws Exception
{
OperationContextImpl.clearContext();
-
+
Configuration config = createDefaultConfig();
-
+
config.setJournalSyncNonTransactional(true);
- server = createServer(true,
- config,
- PAGE_SIZE,
- PAGE_MAX,
- new HashMap<String, AddressSettings>());
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
server.start();
-
+
try
{
queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
}
catch (Exception ignored)
{
- }
+ }
}
+
/**
* @return
* @throws Exception
*/
private PageCursor createNonPersistentCursor() throws Exception
{
- return lookupCursorProvider().createNonPersistentCursor();
+ return lookupCursorProvider().createNonPersistentCursor(null);
}
/**
* @return
* @throws Exception
*/
+ private PageCursor createNonPersistentCursor(Filter filter) throws Exception
+ {
+ return lookupCursorProvider().createNonPersistentCursor(filter);
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
private PageCursorProvider lookupCursorProvider() throws Exception
{
return lookupPageStore(ADDRESS).getCursorProvier();
@@ -803,8 +943,8 @@
final int messageSize) throws Exception
{
List<ServerMessage> messages = new ArrayList<ServerMessage>();
-
- for (int i = start ; i < start + NUM_MESSAGES; i++)
+
+ for (int i = start; i < start + NUM_MESSAGES; i++)
{
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
@@ -812,10 +952,9 @@
msg.putIntProperty("key", i);
messages.add(msg);
}
-
+
pageStore.page(messages, pgParameter.getTransactionID());
}
-
protected void tearDown() throws Exception
{
13 years, 6 months
JBoss hornetq SVN: r9808 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-21 21:08:55 -0400 (Thu, 21 Oct 2010)
New Revision: 9808
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
few tests
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-21 21:02:05 UTC (rev 9807)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-22 01:08:55 UTC (rev 9808)
@@ -111,23 +111,23 @@
{
return cursorProvider;
}
-
+
public void bookmark(PagePosition position) throws Exception
{
if (lastPosition != null)
{
throw new RuntimeException("Bookmark can only be done at the time of the cursor's creation");
}
-
+
lastPosition = position;
-
+
PageCursorInfo cursorInfo = getPageInfo(position);
-
+
if (position.getMessageNr() > 0)
{
cursorInfo.confirmed.addAndGet(position.getMessageNr());
}
-
+
ack(position);
}
@@ -358,7 +358,7 @@
PageCursorImpl.trace("********** processing reload!!!!!!!");
}
Collections.sort(recoveredACK);
-
+
boolean first = true;
PagePosition previousPos = null;
@@ -667,7 +667,7 @@
@Override
public String toString()
{
- return "PageCursorInfo::PageID=" + pageId + " numberOfMessage = " + numberOfMessages;
+ return "PageCursorInfo::PageID=" + pageId + " numberOfMessage = " + numberOfMessages + ", confirmed = " + confirmed;
}
public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-21 21:02:05 UTC (rev 9807)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-22 01:08:55 UTC (rev 9808)
@@ -321,6 +321,7 @@
public void printDebug()
{
+ System.out.println("Debug information for PageCursorProviderImpl:");
for (PageCache cache : softCache.values())
{
System.out.println("Cache " + cache);
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-21 21:02:05 UTC (rev 9807)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-22 01:08:55 UTC (rev 9808)
@@ -111,10 +111,8 @@
int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
System.out.println("NumberOfPages = " + numberOfPages);
-
- PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl)createNonPersistentCursor();
- PageCursor cursor = cursorProvider.createNonPersistentCursor();
+ PageCursor cursor = createNonPersistentCursor();
Pair<PagePosition, PagedMessage> msg;
@@ -129,7 +127,11 @@
forceGC();
- assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+ assertTrue(lookupCursorProvider().getCacheSize() < numberOfPages);
+
+ server.stop();
+ createServer();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -214,7 +216,10 @@
OperationContextImpl.getContext(null).waitCompletion();
((PageCursorImpl)cursor).printDebug();
-
+ server.stop();
+ createServer();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
}
public void testRestartWithHoleOnAck() throws Exception
@@ -265,6 +270,10 @@
cursor.ack(msg.a);
}
+ server.stop();
+ createServer();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
}
@@ -322,6 +331,10 @@
tx.commit();
+ server.stop();
+ createServer();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
}
public void testConsumeLivePage() throws Exception
@@ -435,7 +448,10 @@
assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
}
-
+ server.stop();
+ createServer();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
}
@@ -447,7 +463,7 @@
final int NUM_MESSAGES = 100;
- final int messageSize = 10 * 1024;
+ final int messageSize = 100 * 1024;
PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
@@ -476,7 +492,8 @@
pageStore.forceAnotherPage();
addMessages(300, NUM_MESSAGES, messageSize);
- pageStore.forceAnotherPage();
+
+ System.out.println("Number of pages - " + pageStore.getNumberOfPages());
// First consume what's already there without any tx as nothing was committed
@@ -490,9 +507,12 @@
assertNull(cursor.moveNext());
+ cursor.printDebug();
pgtxRollback.rollback();
+
this.server.getPagingManager().removeTransaction(pgtxRollback.getTransactionID());
pgtxCommit.commit();
+
// Second:after pgtxCommit was done
for (int i = 200; i < 300; i++)
{
@@ -502,16 +522,15 @@
cursor.ack(pos.a);
}
+ assertNull(cursor.moveNext());
+ server.stop();
+ createServer();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
+
+
}
-
-
- public void testCleanupScenarios() throws Exception
- {
- // Validate the pages are being cleared (with multiple cursors)
- }
-
public void testCloseNonPersistentConsumer() throws Exception
{
@@ -550,8 +569,10 @@
assertSame(cursor2.getProvider(), cursorProvider);
cursor2.close();
-
+
server.stop();
+ createServer();
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -560,12 +581,7 @@
{
// Validate the cursor are working fine when all the pages are gone, and then paging being restarted
}
-
- public void testRedeliveryWithCleanup() throws Exception
- {
-
- }
-
+
public void testFirstMessageInTheMiddle() throws Exception
{
@@ -604,14 +620,8 @@
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
server.stop();
-
- server.start();
-
- Thread.sleep(1000);
+ createServer();
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
-
-
-
}
@@ -668,12 +678,8 @@
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
- // This is to make sure all the pending files will be deleted
server.stop();
- OperationContextImpl.clearContext();
-
createServer();
-
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -740,6 +746,8 @@
*/
private void createServer() throws Exception
{
+ OperationContextImpl.clearContext();
+
Configuration config = createDefaultConfig();
config.setJournalSyncNonTransactional(true);
13 years, 6 months
JBoss hornetq SVN: r9807 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-21 17:02:05 -0400 (Thu, 21 Oct 2010)
New Revision: 9807
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Live cursors update
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-21 14:45:43 UTC (rev 9806)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-21 21:02:05 UTC (rev 9807)
@@ -105,8 +105,9 @@
{
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
+ size.set((int)file.size());
// Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
- ByteBuffer buffer2 = ByteBuffer.allocateDirect((int)file.size());
+ ByteBuffer buffer2 = ByteBuffer.allocateDirect(size.get());
file.position(0);
file.read(buffer2);
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-21 14:45:43 UTC (rev 9806)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-21 21:02:05 UTC (rev 9807)
@@ -20,9 +20,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -472,9 +470,32 @@
firstPageId = fileId;
}
}
-
- if (numberOfPages != 0)
+
+ if (currentPageId != 0)
{
+ currentPage = createPage(currentPageId);
+ currentPage.open();
+
+ List<PagedMessage> messages = currentPage.read();
+
+ LivePageCache pageCache = new LivePageCacheImpl(currentPage);
+
+ for (PagedMessage msg : messages)
+ {
+ msg.initMessage(storageManager);
+ pageCache.addLiveMessage(msg);
+ }
+
+ currentPage.setLiveCache(pageCache);
+
+ currentPageSize.set(currentPage.getSize());
+
+ cursorProvider.addPageCache(pageCache);
+ }
+
+ if (currentPage != null)
+ {
+
startPaging();
}
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-21 14:45:43 UTC (rev 9806)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-21 21:02:05 UTC (rev 9807)
@@ -134,16 +134,7 @@
}
- /**
- * @return
- * @throws Exception
- */
- private PageCursor createNonPersistentCursor() throws Exception
- {
- return lookupCursorProvider().createNonPersistentCursor();
- }
-
public void testReadNextPage() throws Exception
{
@@ -160,15 +151,6 @@
assertNull(cache);
}
-
- /**
- * @return
- * @throws Exception
- */
- private PageCursorProvider lookupCursorProvider() throws Exception
- {
- return lookupPageStore(ADDRESS).getCursorProvier();
- }
public void testRestart() throws Exception
@@ -348,7 +330,7 @@
pageStore.startPaging();
- final int NUM_MESSAGES = 1000;
+ final int NUM_MESSAGES = 100;
final int messageSize = 1024 * 1024;
@@ -378,13 +360,82 @@
assertNotNull(readMessage);
- cursor.ack(readMessage.a);
-
assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
assertNull(cursor.moveNext());
}
+ server.stop();
+
+ OperationContextImpl.clearContext();
+
+ createServer();
+
+ pageStore = lookupPageStore(ADDRESS);
+
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+
+ for (int i = 0; i < NUM_MESSAGES * 2; i++)
+ {
+ if (i % 100 == 0) System.out.println("Paged " + i);
+
+ if (i >= NUM_MESSAGES)
+ {
+
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg));
+ }
+
+ Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+
+ assertNotNull(readMessage);
+
+ assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
+ }
+
+ server.stop();
+
+ OperationContextImpl.clearContext();
+
+ createServer();
+
+ pageStore = lookupPageStore(ADDRESS);
+
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
+
+ for (int i = 0; i < NUM_MESSAGES * 3; i++)
+ {
+ if (i % 100 == 0) System.out.println("Paged " + i);
+
+ if (i >= NUM_MESSAGES * 2)
+ {
+
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+
+ ServerMessage msg = new ServerMessageImpl(i, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+
+ Assert.assertTrue(pageStore.page(msg));
+ }
+
+ Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
+
+ assertNotNull(readMessage);
+
+ cursor.ack(readMessage.a);
+
+ assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
+ }
+
+
}
@@ -455,36 +506,6 @@
}
- /**
- * @param storage
- * @param pageStore
- * @param pgParameter
- * @param start
- * @param NUM_MESSAGES
- * @param messageSize
- * @throws Exception
- */
- private void pgMessages(StorageManager storage,
- PagingStoreImpl pageStore,
- PageTransactionInfo pgParameter,
- int start,
- final int NUM_MESSAGES,
- final int messageSize) throws Exception
- {
- List<ServerMessage> messages = new ArrayList<ServerMessage>();
-
- for (int i = start ; i < start + NUM_MESSAGES; i++)
- {
- HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
- ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
- msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- msg.putIntProperty("key", i);
- messages.add(msg);
- }
-
- pageStore.page(messages, pgParameter.getTransactionID());
- }
-
public void testCleanupScenarios() throws Exception
{
// Validate the pages are being cleared (with multiple cursors)
@@ -587,7 +608,7 @@
server.start();
Thread.sleep(1000);
- assertEquals(2, lookupPageStore(ADDRESS).getNumberOfPages());
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
@@ -631,7 +652,7 @@
OperationContextImpl.clearContext();
- server.start();
+ createServer();
cursorProvider = lookupCursorProvider();
cursor = cursorProvider.getPersistentCursor(queue.getID());
@@ -649,11 +670,11 @@
// This is to make sure all the pending files will be deleted
server.stop();
+ OperationContextImpl.clearContext();
+
+ createServer();
- server.start();
-
- // TODO: this should be exact 2
- assertTrue(lookupPageStore(ADDRESS).getNumberOfPages() <= 3);
+ assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
@@ -708,6 +729,17 @@
OperationContextImpl.clearContext();
System.out.println("Tmp:" + getTemporaryDir());
+ createServer();
+
+ //createQueue(ADDRESS.toString(), ADDRESS.toString());
+ }
+
+
+ /**
+ * @throws Exception
+ */
+ private void createServer() throws Exception
+ {
Configuration config = createDefaultConfig();
config.setJournalSyncNonTransactional(true);
@@ -720,11 +752,63 @@
server.start();
- queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+ try
+ {
+ queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PageCursor createNonPersistentCursor() throws Exception
+ {
+ return lookupCursorProvider().createNonPersistentCursor();
+ }
- //createQueue(ADDRESS.toString(), ADDRESS.toString());
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PageCursorProvider lookupCursorProvider() throws Exception
+ {
+ return lookupPageStore(ADDRESS).getCursorProvier();
}
+ /**
+ * @param storage
+ * @param pageStore
+ * @param pgParameter
+ * @param start
+ * @param NUM_MESSAGES
+ * @param messageSize
+ * @throws Exception
+ */
+ private void pgMessages(StorageManager storage,
+ PagingStoreImpl pageStore,
+ PageTransactionInfo pgParameter,
+ int start,
+ final int NUM_MESSAGES,
+ final int messageSize) throws Exception
+ {
+ List<ServerMessage> messages = new ArrayList<ServerMessage>();
+
+ for (int i = start ; i < start + NUM_MESSAGES; i++)
+ {
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+ ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+ messages.add(msg);
+ }
+
+ pageStore.page(messages, pgParameter.getTransactionID());
+ }
+
+
protected void tearDown() throws Exception
{
server.stop();
13 years, 6 months
JBoss hornetq SVN: r9806 - trunk.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-21 10:45:43 -0400 (Thu, 21 Oct 2010)
New Revision: 9806
Modified:
trunk/build-hornetq.properties
trunk/build-hornetq.xml
Log:
applying utf-8 patch
Modified: trunk/build-hornetq.properties
===================================================================
--- trunk/build-hornetq.properties 2010-10-21 14:40:43 UTC (rev 9805)
+++ trunk/build-hornetq.properties 2010-10-21 14:45:43 UTC (rev 9806)
@@ -9,6 +9,7 @@
javac.include.ant.runtime=false
javac.include.java.runtime=true
javac.fail.onerror=true
+javac.encoding=utf-8
# JUnit properties
junit.showoutput=true
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-10-21 14:40:43 UTC (rev 9805)
+++ trunk/build-hornetq.xml 2010-10-21 14:45:43 UTC (rev 9806)
@@ -449,6 +449,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${build.src.dir}"/>
@@ -493,6 +494,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${build.src.dir}"/>
@@ -531,6 +533,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -551,6 +554,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -571,6 +575,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -591,6 +596,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -611,6 +617,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -632,6 +639,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -652,6 +660,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -672,6 +681,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -692,6 +702,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${build.src.dir}"/>
@@ -1586,6 +1597,7 @@
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
failonerror="${javac.fail.onerror}"
+ encoding="${javac.encoding}"
srcdir="${test.src.dir}"
destdir="${test.classes.dir}">
<classpath refid="test.compilation.classpath"/>
@@ -1604,6 +1616,7 @@
includeAntRuntime="true"
includeJavaRuntime="${javac.include.java.runtime}"
failonerror="${javac.fail.onerror}"
+ encoding="${javac.encoding}"
srcdir="${test.jms.src.dir}"
destdir="${test.jms.classes.dir}">
<classpath refid="jms.test.compilation.classpath"/>
@@ -1622,6 +1635,7 @@
includeAntRuntime="true"
includeJavaRuntime="${javac.include.java.runtime}"
failonerror="${javac.fail.onerror}"
+ encoding="${javac.encoding}"
srcdir="${test.joram.src.dir}"
destdir="${test.joram.classes.dir}">
<classpath refid="joram.test.compilation.classpath"/>
13 years, 6 months
JBoss hornetq SVN: r9805 - in branches/hornetq-416: src/main/org/hornetq/core/server/impl and 2 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-21 10:40:43 -0400 (Thu, 21 Oct 2010)
New Revision: 9805
Added:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
Log:
some test
Modified: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java 2010-10-20 22:45:48 UTC (rev 9804)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java 2010-10-21 14:40:43 UTC (rev 9805)
@@ -36,9 +36,10 @@
private final long creationTime;
- // TODO
- // user name
- // client ID
+ private final String clientID;
+
+ private final String username;
+
// Static --------------------------------------------------------
@@ -49,9 +50,14 @@
for (int i = 0; i < array.length(); i++)
{
JSONObject obj = array.getJSONObject(i);
+ String cid = obj.isNull("clientID") ? null : obj.getString("clientID");
+ String uname = obj.isNull("principal") ? null : obj.getString("principal");
+
JMSConnectionInfo info = new JMSConnectionInfo(obj.getString("connectionID"),
obj.getString("clientAddress"),
- obj.getLong("creationTime"));
+ obj.getLong("creationTime"),
+ cid,
+ uname);
infos[i] = info;
}
return infos;
@@ -61,11 +67,15 @@
private JMSConnectionInfo(final String connectionID,
final String clientAddress,
- final long creationTime)
+ final long creationTime,
+ final String clientID,
+ final String username)
{
this.connectionID = connectionID;
this.clientAddress = clientAddress;
this.creationTime = creationTime;
+ this.clientID = clientID;
+ this.username = username;
}
// Public --------------------------------------------------------
@@ -85,6 +95,16 @@
return creationTime;
}
+ public String getClientID()
+ {
+ return clientID;
+ }
+
+ public String getUsername()
+ {
+ return username;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-10-20 22:45:48 UTC (rev 9804)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-10-21 14:40:43 UTC (rev 9805)
@@ -270,4 +270,10 @@
*/
@Operation(desc = "Gets the sessions creation time", impact = MBeanOperationInfo.INFO)
String getSessionCreationTime(@Parameter(desc = "session name", name = "sessionID") String sessionID) throws Exception;
+
+ /**
+ * Lists all the sessions IDs for the specified connection ID.
+ */
+ @Operation(desc = "List the sessions for the given connectionID", impact = MBeanOperationInfo.INFO)
+ String listSessionsAsJSON(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID) throws Exception;
}
Added: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java (rev 0)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java 2010-10-21 14:40:43 UTC (rev 9805)
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.jms.management;
+
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONException;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * A JMSSessionInfo
+ *
+ * @author howard
+ *
+ *
+ */
+public class JMSSessionInfo
+{
+ private final String sessionID;
+
+ private final long creationTime;
+
+ public JMSSessionInfo(String sessionID, long creationTime)
+ {
+ this.sessionID = sessionID;
+ this.creationTime = creationTime;
+ }
+
+ public static JMSSessionInfo[] from(final String jsonString) throws JSONException
+ {
+ JSONArray array = new JSONArray(jsonString);
+ JMSSessionInfo[] infos = new JMSSessionInfo[array.length()];
+ for (int i = 0; i < array.length(); i++)
+ {
+ JSONObject obj = array.getJSONObject(i);
+
+ JMSSessionInfo info = new JMSSessionInfo(obj.getString("sessionID"),
+ obj.getLong("creationTime"));
+ infos[i] = info;
+ }
+ return infos;
+ }
+
+ public String getSessionID()
+ {
+ return sessionID;
+ }
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
+}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-20 22:45:48 UTC (rev 9804)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-21 14:40:43 UTC (rev 9805)
@@ -1229,7 +1229,12 @@
public String getLastSentMessageID(String address)
{
- return targetAddressInfos.get(SimpleString.toSimpleString(address)).toString();
+ UUID id = targetAddressInfos.get(SimpleString.toSimpleString(address));
+ if (id != null)
+ {
+ return id.toString();
+ }
+ return null;
}
public long getCreationTime()
Modified: branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-20 22:45:48 UTC (rev 9804)
+++ branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-21 14:40:43 UTC (rev 9805)
@@ -987,4 +987,30 @@
}
return null;
}
+
+ public String listSessionsAsJSON(final String connectionID) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ JSONArray array = new JSONArray();
+ try
+ {
+ List<ServerSession> sessions = server.getHornetQServer().getSessions(connectionID);
+ for (ServerSession sess : sessions)
+ {
+ JSONObject obj = new JSONObject();
+ obj.put("sessionID", sess.getName());
+ obj.put("creationTime", sess.getCreationTime());
+ array.put(obj);
+ }
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ return array.toString();
+ }
+
}
Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-10-20 22:45:48 UTC (rev 9804)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-10-21 14:40:43 UTC (rev 9805)
@@ -22,10 +22,12 @@
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
import javax.jms.Topic;
import junit.framework.Assert;
@@ -35,6 +37,7 @@
import org.hornetq.api.jms.management.JMSConnectionInfo;
import org.hornetq.api.jms.management.JMSConsumerInfo;
import org.hornetq.api.jms.management.JMSServerControl;
+import org.hornetq.api.jms.management.JMSSessionInfo;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
@@ -348,6 +351,101 @@
}
}
}
+
+ //https://jira.jboss.org/browse/HORNETQ-416
+ public void testProducerInfo() throws Exception
+ {
+ String queueName = RandomUtil.randomString();
+
+ System.out.println("queueName is: " + queueName);
+
+ try
+ {
+ startHornetQServer(NettyAcceptorFactory.class.getName());
+ serverManager.createQueue(false, queueName, null, true, queueName);
+ Queue queue = HornetQJMSClient.createQueue(queueName);
+
+ JMSServerControl control = createManagementControl();
+
+ long startTime = System.currentTimeMillis();
+
+ ConnectionFactory cf1 = JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
+ JMSServerControl2Test.CONNECTION_TTL,
+ JMSServerControl2Test.PING_PERIOD);
+ Connection connection = cf1.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+
+ for (int i = 0; i < 10; i++)
+ {
+ TextMessage msg = session.createTextMessage("mymessage-" + i);
+ producer.send(msg);
+ }
+
+ connection.start();
+
+ // create a regular message consumer
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ TextMessage receivedMsg = null;
+ for (int i = 0; i < 10; i++)
+ {
+ receivedMsg = (TextMessage)consumer.receive(3000);
+ System.out.println("receiveMsg: " + receivedMsg);
+ }
+
+ String lastMsgID = receivedMsg.getJMSMessageID();
+ System.out.println("Last mid: " + lastMsgID);
+
+ String jsonStr = control.listConnectionsAsJSON();
+ JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+
+ JMSConnectionInfo connInfo = infos[0];
+
+ String sessionsStr = control.listSessionsAsJSON(connInfo.getConnectionID());
+ JMSSessionInfo[] sessInfos = JMSSessionInfo.from(sessionsStr);
+
+ assertTrue(sessInfos.length > 0);
+ boolean lastMsgFound = false;
+ for (JMSSessionInfo sInfo : sessInfos)
+ {
+ System.out.println("Session name: " + sInfo.getSessionID());
+ assertNotNull(sInfo.getSessionID());
+ long createTime = sInfo.getCreationTime();
+ assertTrue(startTime <= createTime && createTime <= System.currentTimeMillis());
+ String lastID = control.getLastSentMessageID(sInfo.getSessionID(), "jms.queue." + queueName);
+ if (lastID != null)
+ {
+ assertEquals(lastMsgID, lastID);
+ lastMsgFound = true;
+ }
+ }
+ assertTrue(lastMsgFound);
+
+ consumer.close();
+
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ if (serverManager != null)
+ {
+ serverManager.destroyQueue(queueName);
+ serverManager.stop();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -460,6 +558,8 @@
assertNotNull(info.getConnectionID());
assertNotNull(info.getClientAddress());
assertTrue(startTime <= info.getCreationTime() && info.getCreationTime() <= System.currentTimeMillis());
+ assertNull(info.getClientID());
+ assertNull(info.getUsername());
}
connection.close();
@@ -470,9 +570,22 @@
waitForConnectionIDs(0, control);
+ Connection connection3 = cf2.createConnection("guest", "guest");
+ connection3.setClientID("MyClient");
+
jsonStr = control.listConnectionsAsJSON();
assertNotNull(jsonStr);
+
infos = JMSConnectionInfo.from(jsonStr);
+ JMSConnectionInfo info = infos[0];
+ assertEquals("MyClient", info.getClientID());
+ assertEquals("guest", info.getUsername());
+
+ connection3.close();
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
assertEquals(0, infos.length);
}
finally
Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-10-20 22:45:48 UTC (rev 9804)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-10-21 14:40:43 UTC (rev 9805)
@@ -298,6 +298,12 @@
return null;
}
+ public String listSessionsAsJSON(String connectionID) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
};
}
// Public --------------------------------------------------------
13 years, 6 months
JBoss hornetq SVN: r9804 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-20 18:45:48 -0400 (Wed, 20 Oct 2010)
New Revision: 9804
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
more on bookmarking cursors
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-20 21:09:02 UTC (rev 9803)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-20 22:45:48 UTC (rev 9804)
@@ -358,11 +358,21 @@
PageCursorImpl.trace("********** processing reload!!!!!!!");
}
Collections.sort(recoveredACK);
+
+ boolean first = true;
PagePosition previousPos = null;
for (PagePosition pos : recoveredACK)
{
PageCursorInfo positions = getPageInfo(pos);
+ if (first)
+ {
+ first = false;
+ if (pos.getMessageNr() > 0)
+ {
+ positions.confirmed.addAndGet(pos.getMessageNr());
+ }
+ }
positions.addACK(pos);
@@ -556,7 +566,7 @@
for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
{
PageCursorInfo info = entry.getValue();
- if (info.isDone() && !info.isPendingDelete())
+ if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-20 21:09:02 UTC (rev 9803)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-20 22:45:48 UTC (rev 9804)
@@ -130,7 +130,7 @@
while (true)
{
- Pair<PagePosition, PagedMessage> retPos = internalAfter(cursorPos);
+ Pair<PagePosition, PagedMessage> retPos = internalGetNext(cursorPos);
if (retPos == null)
{
@@ -165,7 +165,7 @@
}
}
- private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
+ private Pair<PagePosition, PagedMessage> internalGetNext(final PagePosition pos)
{
PagePosition retPos = pos.nextMessage();
@@ -308,6 +308,7 @@
Page page = pagingStore.depage();
if (page != null)
{
+ System.out.println("Deleting " + page);
page.delete();
}
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-20 21:09:02 UTC (rev 9803)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-20 22:45:48 UTC (rev 9804)
@@ -583,9 +583,80 @@
assertTrue(cursorProvider.getCacheSize() < numberOfPages);
server.stop();
+
+ server.start();
+
+ Thread.sleep(1000);
+ assertEquals(2, lookupPageStore(ADDRESS).getNumberOfPages());
+
+
}
+
+ public void testFirstMessageInTheMiddlePersistent() throws Exception
+ {
+
+ final int NUM_MESSAGES = 100;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
+
+ PageCursor cursor = cursorProvider.getPersistentCursor(queue.getID());
+ PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+ cursor.bookmark(startingPos);
+ PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
+ msg.initMessage(server.getStorageManager());
+ int initialKey = msg.getMessage().getIntProperty("key").intValue();
+ int key = initialKey;
+
+ msg = null;
+
+ cache = null;
+
+ Pair<PagePosition, PagedMessage> msgCursor = null;
+ while ((msgCursor = cursor.moveNext()) != null)
+ {
+ assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+
+ server.stop();
+
+ OperationContextImpl.clearContext();
+
+ server.start();
+
+ cursorProvider = lookupCursorProvider();
+ cursor = cursorProvider.getPersistentCursor(queue.getID());
+ key = initialKey;
+ while ((msgCursor = cursor.moveNext()) != null)
+ {
+ assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msgCursor.a);
+ }
+
+
+ forceGC();
+
+ assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ // This is to make sure all the pending files will be deleted
+ server.stop();
+
+ server.start();
+
+ // TODO: this should be exact 2
+ assertTrue(lookupPageStore(ADDRESS).getNumberOfPages() <= 3);
+
+ }
+
private int addMessages(final int numMessages, final int messageSize) throws Exception
{
return addMessages(0, numMessages, messageSize);
13 years, 6 months