From do-not-reply at jboss.org Mon Oct 25 15:35:11 2010
Content-Type: multipart/mixed; boundary="===============7188948122459675551=="
MIME-Version: 1.0
From: do-not-reply at jboss.org
To: hornetq-commits at lists.jboss.org
Subject: [hornetq-commits] JBoss hornetq SVN: r9812 - in trunk:
hornetq-rest/docbook/reference/en and 6 other directories.
Date: Mon, 25 Oct 2010 15:35:11 -0400
Message-ID: <201010251935.o9PJZBng027029@svn01.web.mwc.hst.phx2.redhat.com>
--===============7188948122459675551==
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: quoted-printable
Author: bill.burke(a)jboss.com
Date: 2010-10-25 15:35:10 -0400 (Mon, 25 Oct 2010)
New Revision: 9812
Added:
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/Sele=
ctorTest.java
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/embedding-hornetq.xml
trunk/hornetq-rest/docbook/reference/en/master.xml
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Ack=
nowledgedQueueConsumer.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Con=
sumersResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Pos=
tMessage.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Que=
ueConsumer.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pus=
h/PushConsumer.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pus=
h/xml/PushRegistration.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Ack=
nowledgedSubscriptionResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Fil=
eTopicPushStore.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Sub=
scriptionResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Sub=
scriptionsResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/Http=
MessageHelper.java
Log:
fix docs, merge REST from beta2 release
Modified: trunk/docs/user-manual/en/configuration-index.xml
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/docs/user-manual/en/configuration-index.xml 2010-10-25 15:49:13 U=
TC (rev 9811)
+++ trunk/docs/user-manual/en/configuration-index.xml 2010-10-25 19:35:10 U=
TC (rev 9812)
@@ -1026,7 +1026,7 @@
generic
-
+
connection-factory.xa
Boolean
Modified: trunk/docs/user-manual/en/embedding-hornetq.xml
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/docs/user-manual/en/embedding-hornetq.xml 2010-10-25 15:49:13 UTC=
(rev 9811)
+++ trunk/docs/user-manual/en/embedding-hornetq.xml 2010-10-25 19:35:10 UTC=
(rev 9812)
@@ -46,19 +46,8 @@
two different helper classes for this depending on whether your using =
the
HornetQ Core API or JMS.
=
-<<<<<<< .mine
Core API Only
-=3D=3D=3D=3D=3D=3D=3D
-config.setAcceptorConfigurations(transports);
- You need to instantiate and start HornetQ server. The class =
org.hornetq.api.core.server.HornetQ has a few s=
tatic methods for creating
- servers with common configurations.
- import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
- =
->>>>>>> .r9629
-
For instantiating a core HornetQ Server only, the steps are pr=
etty
simple. The example requires that you have defined a configuration f=
ile
hornetq-configuration.xml in your
@@ -66,38 +55,9 @@
=
...
=
-<<<<<<< .mine
EmbeddedHornetQ embedded =3D new EmbeddedHornetQ();
embedded.start();
-=3D=3D=3D=3D=3D=3D=3D
-HornetQServer server =3D HornetQServers.newHornetQServer(config);
->>>>>>> .r9629
=
-<<<<<<< .mine
-// Assuming you defined an "in-vm" acceptor within your hornetq-configurat=
ion.xml file
-=3D=3D=3D=3D=3D=3D=3D
-server.start();
- You also have the option of instantiating HornetQSe=
rverImpl
- directly:
- HornetQServer server =3D new HornetQServerImpl(con=
fig);
-server.start();
-
-
- Dependency Frameworks
- You may also choose to use a dependency injection framework =
such as JBoss
- Micro Container or Spring Framework=
.
- HornetQ standalone uses JBoss Micro Container as the injecti=
on framework. HornetQBootstrapServer and hornetq-bea=
ns.xml which are
- part of the HornetQ distribution provide a very complete imple=
mentation of what's needed
- to bootstrap the server using JBoss Micro Container.
- When using JBoss Micro Container, you need to provide an XML=
file declaring the
- HornetQServer and Configuratio=
n object, you
- can also inject a security manager and a MBean server if you w=
ant, but those are
- optional.
- A very basic XML Bean declaration for the JBoss Micro Contai=
ner would be:
- <?xml version=3D"1.0" encoding=3D"UTF-8"?>
->>>>>>> .r9629
-
ClientSessionFactory nettyFactory =3D HornetQClient.createClientSessionFa=
ctory(
new TransportConfiguration(
InVMConnectorFactory.class.getN=
ame()));
Modified: trunk/hornetq-rest/docbook/reference/en/master.xml
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/docbook/reference/en/master.xml 2010-10-25 15:49:13 =
UTC (rev 9811)
+++ trunk/hornetq-rest/docbook/reference/en/master.xml 2010-10-25 19:35:10 =
UTC (rev 9812)
@@ -870,6 +870,16 @@
the server. Only usable on topics.
+
+ selector
+
+
+ This is an optional JMS selector string. The HornetQ REST
+ interface adds HTTP headers to the JMS message for REST produced
+ messages. HTTP headers are prefixed with "http_" and every '-'
+ charactor is converted to a '$'.
+
+
=
@@ -1480,6 +1490,10 @@
=
<push-registration>
<durable>false</durable>
+ <selector><![CDATA[ =
+ SomeAttribute > 1 =
+ ]]>
+ </selector>
<link rel=3D"push" href=3D"http://somewhere.com" type=3D"application=
/json" method=3D"PUT"/>
</push-registration>
@@ -1493,6 +1507,10 @@
queue-push-store-dir config variable defined in
Chapter 2. (topic-push-store-dir for topics).
=
+ The selector element is optional and define=
s a
+ JMS message selector. You should enclose it within CDATA blocks as s=
ome
+ of the selector characters are illegal XML.
+
The link element specifies the basis of the
interaction. The href attribute contains the URL =
you
want to interact with. It is the only required attribute. The
@@ -1562,11 +1580,16 @@
The Topic Push Subscription XML
=
The push XML for a topic is the same except the root element is
- push-topic-registration. The rest of the document is the same. Here'=
s an
+ push-topic-registration. (Also remember the selector
+ element is optional). The rest of the document is the same. Here's =
an
example of a template registration:
=
<push-topic-registration>
<durable>true</durable>
+ <selector><![CDATA[ =
+ SomeAttribute > 1 =
+ ]]>
+ </selector>
<link rel=3D"template" href=3D"http://somewhere.com/resources/{id}/m=
essages" method=3D"POST"/>
</push-topic registration>
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu=
eue/AcknowledgedQueueConsumer.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Ac=
knowledgedQueueConsumer.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Ac=
knowledgedQueueConsumer.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -31,10 +31,10 @@
protected String startup =3D Long.toString(System.currentTimeMillis());
protected volatile Acknowledgement ack;
=
- public AcknowledgedQueueConsumer(ClientSessionFactory factory, String d=
estination, String id, DestinationServiceManager serviceManager)
+ public AcknowledgedQueueConsumer(ClientSessionFactory factory, String d=
estination, String id, DestinationServiceManager serviceManager, String sel=
ector)
throws HornetQException
{
- super(factory, destination, id, serviceManager);
+ super(factory, destination, id, serviceManager, selector);
autoAck =3D false;
}
=
@@ -187,9 +187,7 @@
=
try
{
- session =3D factory.createSession();
- consumer =3D session.createConsumer(destination);
- session.start();
+ createSession();
}
catch (Exception e)
{
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu=
eue/ConsumersResource.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Co=
nsumersResource.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Co=
nsumersResource.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -36,6 +36,9 @@
protected int consumerTimeoutSeconds;
protected DestinationServiceManager serviceManager;
=
+ protected static final int ACKNOWLEDGED =3D 0x01;
+ protected static final int SELECTOR_SET =3D 0x02;
+
public DestinationServiceManager getServiceManager()
{
return serviceManager;
@@ -78,6 +81,7 @@
=
private Object timeoutLock =3D new Object();
=
+ @Override
public void testTimeout(String target)
{
synchronized (timeoutLock)
@@ -107,32 +111,41 @@
=
@POST
public Response createSubscription(@FormParam("autoAck") @DefaultValue(=
"true") boolean autoAck,
+ @FormParam("selector") String select=
or,
@Context UriInfo uriInfo)
{
try
{
QueueConsumer consumer =3D null;
+ int attributes =3D 0;
+ if (selector !=3D null)
+ {
+ attributes =3D attributes | SELECTOR_SET;
+ }
+ =
if (autoAck)
{
- consumer =3D createConsumer();
+ consumer =3D createConsumer(selector);
}
else
{
- consumer =3D createAcknowledgedConsumer();
+ attributes |=3D ACKNOWLEDGED;
+ consumer =3D createAcknowledgedConsumer(selector);
}
=
+ String attributesSegment =3D "attributes-" + attributes;
UriBuilder location =3D uriInfo.getAbsolutePathBuilder();
- if (autoAck) location.path("auto-ack");
- else location.path("acknowledged");
+ location.path(attributesSegment);
location.path(consumer.getId());
Response.ResponseBuilder builder =3D Response.created(location.bu=
ild());
+
if (autoAck)
{
- QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrateg=
y(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/auto-ack/" + con=
sumer.getId(), "-1");
+ QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrateg=
y(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSe=
gment +"/" + consumer.getId(), "-1");
}
else
{
- AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManage=
r.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/=
acknowledged/" + consumer.getId(), "-1");
+ AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManage=
r.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/=
" + attributesSegment +"/" + consumer.getId(), "-1");
=
}
return builder.build();
@@ -147,11 +160,11 @@
}
}
=
- public QueueConsumer createConsumer()
+ public QueueConsumer createConsumer(String selector)
throws HornetQException
{
String genId =3D sessionCounter.getAndIncrement() + "-queue-" + dest=
ination + "-" + startup;
- QueueConsumer consumer =3D new QueueConsumer(sessionFactory, destina=
tion, genId, serviceManager);
+ QueueConsumer consumer =3D new QueueConsumer(sessionFactory, destina=
tion, genId, serviceManager, selector);
synchronized (timeoutLock)
{
queueConsumers.put(genId, consumer);
@@ -160,11 +173,11 @@
return consumer;
}
=
- public QueueConsumer createAcknowledgedConsumer()
+ public QueueConsumer createAcknowledgedConsumer(String selector)
throws HornetQException
{
String genId =3D sessionCounter.getAndIncrement() + "-queue-" + dest=
ination + "-" + startup;
- QueueConsumer consumer =3D new AcknowledgedQueueConsumer(sessionFact=
ory, destination, genId, serviceManager);
+ QueueConsumer consumer =3D new AcknowledgedQueueConsumer(sessionFact=
ory, destination, genId, serviceManager, selector);
synchronized (timeoutLock)
{
queueConsumers.put(genId, consumer);
@@ -173,85 +186,81 @@
return consumer;
}
=
- @Path("auto-ack/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
@GET
- public Response getConsumer(@PathParam("consumer-id") String consumerId,
+ public Response getConsumer(@PathParam("attributes") int attributes,
+ @PathParam("consumer-id") String consumerId,
@Context UriInfo uriInfo) throws Exception
{
- return headConsumer(consumerId, uriInfo);
+ return headConsumer(attributes, consumerId, uriInfo);
}
=
- @Path("auto-ack/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
@HEAD
- public Response headConsumer(@PathParam("consumer-id") String consumerI=
d,
+ public Response headConsumer(@PathParam("attributes") int attributes,
+ @PathParam("consumer-id") String consumerI=
d,
@Context UriInfo uriInfo) throws Exception
{
- QueueConsumer consumer =3D findConsumer(consumerId);
+ QueueConsumer consumer =3D findConsumer(attributes, consumerId, uriI=
nfo);
Response.ResponseBuilder builder =3D Response.noContent();
// we synchronize just in case a failed request is still processing
synchronized (consumer)
{
- QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy()=
, builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + co=
nsumer.getId(), Long.toString(consumer.getConsumeIndex()));
+ if ( (attributes & ACKNOWLEDGED) > 0)
+ {
+ AcknowledgedQueueConsumer ackedConsumer =3D (AcknowledgedQueue=
Consumer)consumer;
+ Acknowledgement ack =3D ackedConsumer.getAck();
+ if (ack =3D=3D null || ack.wasSet())
+ {
+ AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceMan=
ager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) +=
"/attributes-" + attributes + "/" + consumer.getId(), Long.toString(consum=
er.getConsumeIndex()));
+ }
+ else
+ {
+ ackedConsumer.setAcknowledgementLink(builder, uriInfo, uriI=
nfo.getMatchedURIs().get(1) + "/attributes-" + attributes + "/" + consumer.=
getId());
+ }
+
+ }
+ else
+ {
+ QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrateg=
y(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/attributes-" + a=
ttributes + "/" + consumer.getId(), Long.toString(consumer.getConsumeIndex(=
)));
+ }
}
return builder.build();
}
=
- @Path("auto-ack/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
public QueueConsumer findConsumer(
- @PathParam("consumer-id") String consumerId) throws Exception
+ @PathParam("attributes") int attributes,
+ @PathParam("consumer-id") String consumerId,
+ @Context UriInfo uriInfo) throws Exception
{
QueueConsumer consumer =3D queueConsumers.get(consumerId);
if (consumer =3D=3D null)
{
- QueueConsumer tmp =3D new QueueConsumer(sessionFactory, destinati=
on, consumerId, serviceManager);
- consumer =3D addConsumerToMap(consumerId, tmp);
- }
- return consumer;
- }
+ if ( (attributes & SELECTOR_SET) > 0)
+ {
=
- @Path("acknowledged/{consumer-id}")
- @GET
- public Response getAcknowledgedConsumer(@PathParam("consumer-id") Strin=
g consumerId,
- @Context UriInfo uriInfo) throw=
s Exception
- {
- return headAcknowledgedConsumer(consumerId, uriInfo);
- }
+ Response.ResponseBuilder builder =3D Response.status(Response.=
Status.GONE)
+ .entity("Cannot reconnect to selector-based consumer. =
You must recreate the consumer session.")
+ .type("text/plain");
+ UriBuilder uriBuilder =3D uriInfo.getBaseUriBuilder();
+ uriBuilder.path(uriInfo.getMatchedURIs().get(1));
+ serviceManager.getLinkStrategy().setLinkHeader(builder, "pull-=
consumers", "pull-consumers", uriBuilder.build().toString(), null);
+ throw new WebApplicationException(builder.build());
+ =
+ }
+ if ( (attributes & ACKNOWLEDGED) > 0)
+ {
+ QueueConsumer tmp =3D new AcknowledgedQueueConsumer(sessionFac=
tory, destination, consumerId, serviceManager, null);
+ consumer =3D addConsumerToMap(consumerId, tmp);
=
- @Path("acknowledged/{consumer-id}")
- @HEAD
- public Response headAcknowledgedConsumer(@PathParam("consumer-id") Stri=
ng consumerId,
- @Context UriInfo uriInfo) thro=
ws Exception
- {
- AcknowledgedQueueConsumer consumer =3D (AcknowledgedQueueConsumer) f=
indAcknowledgedConsumer(consumerId);
- Response.ResponseBuilder builder =3D Response.ok();
- // we synchronize just in case a failed request is still processing
- synchronized (consumer)
- {
- Acknowledgement ack =3D consumer.getAck();
- if (ack =3D=3D null || ack.wasSet())
- {
- AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManage=
r.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/=
acknowledged/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()=
));
}
else
{
- consumer.setAcknowledgementLink(builder, uriInfo, uriInfo.getM=
atchedURIs().get(1) + "/acknowledged/" + consumer.getId());
+ QueueConsumer tmp =3D new QueueConsumer(sessionFactory, destin=
ation, consumerId, serviceManager, null);
+ consumer =3D addConsumerToMap(consumerId, tmp);
}
}
- return builder.build();
- }
-
-
- @Path("acknowledged/{consumer-id}")
- public QueueConsumer findAcknowledgedConsumer(
- @PathParam("consumer-id") String consumerId) throws Exception
- {
- QueueConsumer consumer =3D queueConsumers.get(consumerId);
- if (consumer =3D=3D null)
- {
- QueueConsumer tmp =3D new AcknowledgedQueueConsumer(sessionFactor=
y, destination, consumerId, serviceManager);
- ;
- consumer =3D addConsumerToMap(consumerId, tmp);
- }
return consumer;
}
=
@@ -275,16 +284,8 @@
}
=
=
- @Path("acknowledged/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
@DELETE
- public void closeAcknowledgedSession(
- @PathParam("consumer-id") String consumerId)
- {
- closeSession(consumerId);
- }
-
- @Path("auto-ack/{consumer-id}")
- @DELETE
public void closeSession(
@PathParam("consumer-id") String consumerId)
{
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu=
eue/PostMessage.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po=
stMessage.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po=
stMessage.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -1,12 +1,12 @@
package org.hornetq.rest.queue;
=
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Message;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.rest.util.HttpMessageHelper;
+import org.hornetq.api.core.Message;
=
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu=
eue/QueueConsumer.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Qu=
eueConsumer.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Qu=
eueConsumer.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -5,6 +5,7 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.rest.util.Constants;
import org.hornetq.rest.util.LinkStrategy;
=
@@ -37,6 +38,7 @@
protected long lastPing =3D System.currentTimeMillis();
protected DestinationServiceManager serviceManager;
protected boolean autoAck =3D true;
+ protected String selector;
=
/**
* token used to create consume-next links
@@ -70,14 +72,15 @@
lastPing =3D System.currentTimeMillis();
}
=
- public QueueConsumer(ClientSessionFactory factory, String destination, =
String id, DestinationServiceManager serviceManager) throws HornetQException
+ public QueueConsumer(ClientSessionFactory factory, String destination, =
String id, DestinationServiceManager serviceManager, String selector) throw=
s HornetQException
{
this.factory =3D factory;
this.destination =3D destination;
this.id =3D id;
this.serviceManager =3D serviceManager;
+ this.selector =3D selector;
=
- createSession(factory, destination);
+ createSession();
}
=
public String getId()
@@ -191,11 +194,18 @@
}
}
=
- protected void createSession(ClientSessionFactory factory, String desti=
nation)
+ protected void createSession()
throws HornetQException
{
session =3D factory.createSession(true, true);
- consumer =3D session.createConsumer(destination);
+ if (selector =3D=3D null)
+ {
+ consumer =3D session.createConsumer(destination);
+ }
+ else
+ {
+ consumer =3D session.createConsumer(destination, SelectorTranslat=
or.convertToHornetQFilterString(selector));
+ }
session.start();
}
=
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu=
eue/push/PushConsumer.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pu=
sh/PushConsumer.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pu=
sh/PushConsumer.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -7,6 +7,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.rest.queue.push.xml.PushRegistration;
=
/**
@@ -68,7 +69,14 @@
strategy.start();
=
session =3D factory.createSession(false, false);
- consumer =3D session.createConsumer(destination);
+ if (registration.getSelector() !=3D null)
+ {
+ consumer =3D session.createConsumer(destination, SelectorTranslat=
or.convertToHornetQFilterString(registration.getSelector()));
+ }
+ else
+ {
+ consumer =3D session.createConsumer(destination);
+ }
consumer.setMessageHandler(this);
session.start();
log.info("Push consumer started for: " + registration.getTarget());
@@ -100,6 +108,7 @@
}
}
=
+ @Override
public void onMessage(ClientMessage clientMessage)
{
if (strategy.push(clientMessage) =3D=3D false)
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu=
eue/push/xml/PushRegistration.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pu=
sh/xml/PushRegistration.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pu=
sh/xml/PushRegistration.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -18,7 +18,7 @@
*/
@XmlRootElement(name =3D "push-registration")
@XmlAccessorType(XmlAccessType.PROPERTY)
-(a)XmlType(propOrder =3D {"destination", "durable", "target", "authenticat=
ionMechanism", "headers"})
+(a)XmlType(propOrder =3D {"destination", "durable", "selector", "target", =
"authenticationMechanism", "headers"})
public class PushRegistration implements Serializable
{
private String id;
@@ -28,6 +28,7 @@
private List headers =3D new ArrayList();
private String destination;
private Object loadedFrom;
+ private String selector;
=
@XmlTransient
public Object getLoadedFrom()
@@ -73,6 +74,16 @@
this.durable =3D durable;
}
=
+ public String getSelector()
+ {
+ return selector;
+ }
+
+ public void setSelector(String selector)
+ {
+ this.selector =3D selector;
+ }
+
@XmlElementRef
public XmlLink getTarget()
{
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/to=
pic/AcknowledgedSubscriptionResource.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Ac=
knowledgedSubscriptionResource.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Ac=
knowledgedSubscriptionResource.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -13,10 +13,10 @@
{
private boolean durable;
=
- public AcknowledgedSubscriptionResource(ClientSessionFactory factory, S=
tring destination, String id, DestinationServiceManager serviceManager)
+ public AcknowledgedSubscriptionResource(ClientSessionFactory factory, S=
tring destination, String id, DestinationServiceManager serviceManager, Str=
ing selector)
throws HornetQException
{
- super(factory, destination, id, serviceManager);
+ super(factory, destination, id, serviceManager, selector);
}
=
public boolean isDurable()
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/to=
pic/FileTopicPushStore.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Fi=
leTopicPushStore.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Fi=
leTopicPushStore.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -18,6 +18,7 @@
super(dirname);
}
=
+ @Override
public synchronized List getByTopic(String topic)
{
List list =3D new ArrayList();
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/to=
pic/SubscriptionResource.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su=
bscriptionResource.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su=
bscriptionResource.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -13,10 +13,10 @@
{
boolean durable;
=
- public SubscriptionResource(ClientSessionFactory factory, String destin=
ation, String id, DestinationServiceManager serviceManager)
+ public SubscriptionResource(ClientSessionFactory factory, String destin=
ation, String id, DestinationServiceManager serviceManager, String selector)
throws HornetQException
{
- super(factory, destination, id, serviceManager);
+ super(factory, destination, id, serviceManager, selector);
}
=
public boolean isDurable()
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/to=
pic/SubscriptionsResource.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su=
bscriptionsResource.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su=
bscriptionsResource.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -84,6 +84,7 @@
=
private Object timeoutLock =3D new Object();
=
+ @Override
public void testTimeout(String target)
{
synchronized (timeoutLock)
@@ -127,6 +128,7 @@
public Response createSubscription(@FormParam("durable") @DefaultValue(=
"false") boolean durable,
@FormParam("autoAck") @DefaultValue(=
"true") boolean autoAck,
@FormParam("name") String subscripti=
onName,
+ @FormParam("selector") String select=
or,
@Context UriInfo uriInfo)
{
if (subscriptionName !=3D null)
@@ -185,7 +187,7 @@
session.createTemporaryQueue(destination, subscriptionName);
}
}
- QueueConsumer consumer =3D createConsumer(durable, autoAck, subsc=
riptionName);
+ QueueConsumer consumer =3D createConsumer(durable, autoAck, subsc=
riptionName, selector);
queueConsumers.put(consumer.getId(), consumer);
serviceManager.getTimeoutTask().add(this, consumer.getId());
=
@@ -225,19 +227,19 @@
}
}
=
- protected QueueConsumer createConsumer(boolean durable, boolean autoAck=
, String subscriptionName)
+ protected QueueConsumer createConsumer(boolean durable, boolean autoAck=
, String subscriptionName, String selector)
throws HornetQException
{
QueueConsumer consumer;
if (autoAck)
{
- SubscriptionResource subscription =3D new SubscriptionResource(se=
ssionFactory, subscriptionName, subscriptionName, serviceManager);
+ SubscriptionResource subscription =3D new SubscriptionResource(se=
ssionFactory, subscriptionName, subscriptionName, serviceManager, selector);
subscription.setDurable(durable);
consumer =3D subscription;
}
else
{
- AcknowledgedSubscriptionResource subscription =3D new Acknowledge=
dSubscriptionResource(sessionFactory, subscriptionName, subscriptionName, s=
erviceManager);
+ AcknowledgedSubscriptionResource subscription =3D new Acknowledge=
dSubscriptionResource(sessionFactory, subscriptionName, subscriptionName, s=
erviceManager, selector);
subscription.setDurable(durable);
consumer =3D subscription;
}
@@ -375,7 +377,7 @@
QueueConsumer tmp =3D null;
try
{
- tmp =3D createConsumer(true, autoAck, subscriptionId);
+ tmp =3D createConsumer(true, autoAck, subscriptionId, null);
}
catch (HornetQException e)
{
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/ut=
il/HttpMessageHelper.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/Htt=
pMessageHelper.java 2010-10-25 15:49:13 UTC (rev 9811)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/Htt=
pMessageHelper.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -81,17 +81,20 @@
int size =3D message.getBodySize();
if (size > 0)
{
- byte[] body =3D new byte[size];
- message.getBodyBuffer().readBytes(body);
Boolean aBoolean =3D message.getBooleanProperty(POSTED_AS_HTTP_ME=
SSAGE);
if (aBoolean !=3D null && aBoolean.booleanValue())
{
+ byte[] body =3D new byte[size];
+ message.getBodyBuffer().readBytes(body);
//System.out.println("Building Message from HTTP message");
request.body(contentType, body);
}
else
{
// assume posted as a JMS or HornetQ object message
+ size =3D message.getBodyBuffer().readInt();
+ byte[] body =3D new byte[size];
+ message.getBodyBuffer().readBytes(body);
ByteArrayInputStream bais =3D new ByteArrayInputStream(body);
Object obj =3D null;
try
Added: trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/=
SelectorTest.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/Sel=
ectorTest.java (rev 0)
+++ trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/Sel=
ectorTest.java 2010-10-25 19:35:10 UTC (rev 9812)
@@ -0,0 +1,304 @@
+package org.hornetq.rest.test;
+
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.rest.HttpHeaderProperty;
+import org.hornetq.rest.queue.push.xml.XmlLink;
+import org.hornetq.rest.topic.PushTopicRegistration;
+import org.hornetq.rest.topic.TopicDeployment;
+import org.jboss.resteasy.client.ClientRequest;
+import org.jboss.resteasy.client.ClientResponse;
+import org.jboss.resteasy.spi.Link;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+import static org.jboss.resteasy.test.TestPortProvider.*;
+
+/**
+ * @author Bill Burke
+ * @version $Revision: 1 $
+ */
+public class SelectorTest extends MessageTestBase
+{
+ public static ConnectionFactory connectionFactory;
+ public static String topicName =3D HornetQDestination.createQueueAddres=
sFromName("testTopic").toString();
+
+ @BeforeClass
+ public static void setup() throws Exception
+ {
+ connectionFactory =3D new HornetQJMSConnectionFactory(manager.getQue=
ueManager().getSessionFactory());
+ System.out.println("Queue name: " + topicName);
+ TopicDeployment deployment =3D new TopicDeployment();
+ deployment.setDuplicatesAllowed(true);
+ deployment.setDurableSend(false);
+ deployment.setName(topicName);
+ manager.getTopicManager().deploy(deployment);
+ }
+
+ @XmlRootElement
+ public static class Order implements Serializable
+ {
+ private String name;
+ private String amount;
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name =3D name;
+ }
+
+ public String getAmount()
+ {
+ return amount;
+ }
+
+ public void setAmount(String amount)
+ {
+ this.amount =3D amount;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this =3D=3D o) return true;
+ if (o =3D=3D null || getClass() !=3D o.getClass()) return false;
+
+ Order order =3D (Order) o;
+
+ if (!amount.equals(order.amount)) return false;
+ if (!name.equals(order.name)) return false;
+
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Order{" +
+ "name=3D'" + name + '\'' +
+ '}';
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result =3D name.hashCode();
+ result =3D 31 * result + amount.hashCode();
+ return result;
+ }
+ }
+
+ public static Destination createDestination(String dest)
+ {
+ HornetQDestination destination =3D (HornetQDestination) HornetQDesti=
nation.fromAddress(dest);
+ System.out.println("SimpleAddress: " + destination.getSimpleAddress(=
));
+ return destination;
+ }
+
+ public static void publish(String dest, Serializable object, String con=
tentType, String tag) throws Exception
+ {
+ Connection conn =3D connectionFactory.createConnection();
+ try
+ {
+ Session session =3D conn.createSession(false, Session.AUTO_ACKNOW=
LEDGE);
+ Destination destination =3D createDestination(dest);
+ MessageProducer producer =3D session.createProducer(destination);
+ ObjectMessage message =3D session.createObjectMessage();
+
+ if (contentType !=3D null)
+ {
+ message.setStringProperty(HttpHeaderProperty.CONTENT_TYPE, con=
tentType);
+ }
+ if (tag !=3D null)
+ {
+ message.setStringProperty("MyTag", tag);
+ }
+ message.setObject(object);
+
+ producer.send(message);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ @Path("/push")
+ public static class PushReceiver
+ {
+ public static Order oneOrder;
+ public static Order twoOrder;
+
+ @POST
+ @Path("one")
+ public void one(Order order)
+ {
+ oneOrder =3D order;
+ }
+
+ @POST
+ @Path("two")
+ public void two(Order order)
+ {
+ twoOrder =3D order; =
+ }
+
+
+ }
+
+ @Test
+ public void testPush() throws Exception
+ {
+ server.getJaxrsServer().getDeployment().getRegistry().addPerRequestR=
esource(PushReceiver.class);
+ ClientRequest request =3D new ClientRequest(generateURL("/topics/" +=
topicName));
+
+ ClientResponse response =3D request.head();
+ Assert.assertEquals(200, response.getStatus());
+ Link consumers =3D MessageTestBase.getLinkByTitle(manager.getQueueMa=
nager().getLinkStrategy(), response, "push-subscriptions");
+ System.out.println("push: " + consumers);
+
+ PushTopicRegistration oneReg =3D new PushTopicRegistration();
+ oneReg.setDurable(false);
+ XmlLink target =3D new XmlLink();
+ target.setMethod("post");
+ target.setHref(generateURL("/push/one"));
+ target.setType("application/xml");
+ oneReg.setTarget(target);
+ oneReg.setSelector("MyTag =3D '1'");
+ response =3D consumers.request().body("application/xml", oneReg).pos=
t();
+ Link oneSubscription =3D response.getLocation();
+
+ PushTopicRegistration twoReg =3D new PushTopicRegistration();
+ twoReg.setDurable(false);
+ target =3D new XmlLink();
+ target.setMethod("post");
+ target.setHref(generateURL("/push/two"));
+ target.setType("application/xml");
+ twoReg.setTarget(target);
+ twoReg.setSelector("MyTag =3D '2'");
+ response =3D consumers.request().body("application/xml", twoReg).pos=
t();
+ Link twoSubscription =3D response.getLocation();
+
+ Order order =3D new Order();
+ order.setName("1");
+ order.setAmount("$5.00");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ order.setName("2");
+ publish(topicName, order, null, "2");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.twoOrder);
+
+ order.setName("3");
+ publish(topicName, order, null, "2");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.twoOrder);
+
+ order.setName("4");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ order.setName("5");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ order.setName("6");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ oneSubscription.request().delete();
+ twoSubscription.request().delete();
+
+
+ }
+
+
+ @Test
+ public void testPull() throws Exception
+ {
+ ClientRequest request =3D new ClientRequest(generateURL("/topics/" +=
topicName));
+
+ ClientResponse response =3D request.head();
+ Assert.assertEquals(200, response.getStatus());
+ Link consumers =3D MessageTestBase.getLinkByTitle(manager.getQueueMa=
nager().getLinkStrategy(), response, "pull-subscriptions");
+ System.out.println("pull: " + consumers);
+ response =3D consumers.request().formParameter("autoAck", "true")
+ .formParameter("selector", "MyTag =3D '1'").post();
+
+ Link consumeOne =3D MessageTestBase.getLinkByTitle(manager.getQueueM=
anager().getLinkStrategy(), response, "consume-next");
+ System.out.println("consumeOne: " + consumeOne);
+ response =3D consumers.request().formParameter("autoAck", "true")
+ .formParameter("selector", "MyTag =3D '2'").post();
+ Link consumeTwo =3D MessageTestBase.getLinkByTitle(manager.getQueueM=
anager().getLinkStrategy(), response, "consume-next");
+ System.out.println("consumeTwo: " + consumeTwo);
+
+
+ // test that Accept header is used to set content-type
+ {
+ Order order =3D new Order();
+ order.setName("1");
+ order.setAmount("$5.00");
+ publish(topicName, order, null, "1");
+ order.setName("2");
+ publish(topicName, order, null, "2");
+ order.setName("3");
+ publish(topicName, order, null, "2");
+ order.setName("4");
+ publish(topicName, order, null, "1");
+ order.setName("5");
+ publish(topicName, order, null, "1");
+ order.setName("6");
+ publish(topicName, order, null, "1");
+
+ {
+ order.setName("1");
+ consumeOne =3D consumeOrder(order, consumeOne);
+ order.setName("2");
+ consumeTwo =3D consumeOrder(order, consumeTwo);
+ order.setName("3");
+ consumeTwo =3D consumeOrder(order, consumeTwo);
+ order.setName("4");
+ consumeOne =3D consumeOrder(order, consumeOne);
+ order.setName("5");
+ consumeOne =3D consumeOrder(order, consumeOne);
+ order.setName("6");
+ consumeOne =3D consumeOrder(order, consumeOne);
+ }
+ }
+ }
+
+ private Link consumeOrder(Order order, Link consumeNext)
+ throws Exception
+ {
+ ClientResponse res =3D consumeNext.request().header("Accept-Wait", "=
4").accept("application/xml").post(String.class);
+ Assert.assertEquals(200, res.getStatus());
+ Assert.assertEquals("application/xml", res.getHeaders().getFirst("Co=
ntent-Type").toString().toLowerCase());
+ Order order2 =3D (Order) res.getEntity(Order.class);
+ Assert.assertEquals(order, order2);
+ consumeNext =3D MessageTestBase.getLinkByTitle(manager.getQueueManag=
er().getLinkStrategy(), res, "consume-next");
+ Assert.assertNotNull(consumeNext);
+ return consumeNext;
+ }
+}
\ No newline at end of file
--===============7188948122459675551==--