From do-not-reply at jboss.org Mon Oct 25 15:35:11 2010 Content-Type: multipart/mixed; boundary="===============7188948122459675551==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r9812 - in trunk: hornetq-rest/docbook/reference/en and 6 other directories. Date: Mon, 25 Oct 2010 15:35:11 -0400 Message-ID: <201010251935.o9PJZBng027029@svn01.web.mwc.hst.phx2.redhat.com> --===============7188948122459675551== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: bill.burke(a)jboss.com Date: 2010-10-25 15:35:10 -0400 (Mon, 25 Oct 2010) New Revision: 9812 Added: trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/Sele= ctorTest.java Modified: trunk/docs/user-manual/en/configuration-index.xml trunk/docs/user-manual/en/embedding-hornetq.xml trunk/hornetq-rest/docbook/reference/en/master.xml trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Ack= nowledgedQueueConsumer.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Con= sumersResource.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Pos= tMessage.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Que= ueConsumer.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pus= h/PushConsumer.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pus= h/xml/PushRegistration.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Ack= nowledgedSubscriptionResource.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Fil= eTopicPushStore.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Sub= scriptionResource.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Sub= scriptionsResource.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/Http= MessageHelper.java Log: fix docs, merge REST from beta2 release Modified: trunk/docs/user-manual/en/configuration-index.xml =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/docs/user-manual/en/configuration-index.xml 2010-10-25 15:49:13 U= TC (rev 9811) +++ trunk/docs/user-manual/en/configuration-index.xml 2010-10-25 19:35:10 U= TC (rev 9812) @@ -1026,7 +1026,7 @@ generic - + connection-factory.xa Boolean Modified: trunk/docs/user-manual/en/embedding-hornetq.xml =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/docs/user-manual/en/embedding-hornetq.xml 2010-10-25 15:49:13 UTC= (rev 9811) +++ trunk/docs/user-manual/en/embedding-hornetq.xml 2010-10-25 19:35:10 UTC= (rev 9812) @@ -46,19 +46,8 @@ two different helper classes for this depending on whether your using = the HornetQ Core API or JMS. = -<<<<<<< .mine
Core API Only -=3D=3D=3D=3D=3D=3D=3D -config.setAcceptorConfigurations(transports); - You need to instantiate and start HornetQ server. The class = org.hornetq.api.core.server.HornetQ has a few s= tatic methods for creating - servers with common configurations. - import org.hornetq.core.server.HornetQServer; -import org.hornetq.core.server.HornetQServers; - = ->>>>>>> .r9629 - For instantiating a core HornetQ Server only, the steps are pr= etty simple. The example requires that you have defined a configuration f= ile hornetq-configuration.xml in your @@ -66,38 +55,9 @@ = ... = -<<<<<<< .mine EmbeddedHornetQ embedded =3D new EmbeddedHornetQ(); embedded.start(); -=3D=3D=3D=3D=3D=3D=3D -HornetQServer server =3D HornetQServers.newHornetQServer(config); ->>>>>>> .r9629 = -<<<<<<< .mine -// Assuming you defined an "in-vm" acceptor within your hornetq-configurat= ion.xml file -=3D=3D=3D=3D=3D=3D=3D -server.start(); - You also have the option of instantiating HornetQSe= rverImpl - directly: - HornetQServer server =3D new HornetQServerImpl(con= fig); -server.start(); -
-
- Dependency Frameworks - You may also choose to use a dependency injection framework = such as JBoss - Micro Container or Spring Framework= . - HornetQ standalone uses JBoss Micro Container as the injecti= on framework. HornetQBootstrapServer and hornetq-bea= ns.xml which are - part of the HornetQ distribution provide a very complete imple= mentation of what's needed - to bootstrap the server using JBoss Micro Container. - When using JBoss Micro Container, you need to provide an XML= file declaring the - HornetQServer and Configuratio= n object, you - can also inject a security manager and a MBean server if you w= ant, but those are - optional. - A very basic XML Bean declaration for the JBoss Micro Contai= ner would be: - <?xml version=3D"1.0" encoding=3D"UTF-8"?> ->>>>>>> .r9629 - ClientSessionFactory nettyFactory =3D HornetQClient.createClientSessionFa= ctory( new TransportConfiguration( InVMConnectorFactory.class.getN= ame())); Modified: trunk/hornetq-rest/docbook/reference/en/master.xml =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/docbook/reference/en/master.xml 2010-10-25 15:49:13 = UTC (rev 9811) +++ trunk/hornetq-rest/docbook/reference/en/master.xml 2010-10-25 19:35:10 = UTC (rev 9812) @@ -870,6 +870,16 @@ the server. Only usable on topics. + + selector + + + This is an optional JMS selector string. The HornetQ REST + interface adds HTTP headers to the JMS message for REST produced + messages. HTTP headers are prefixed with "http_" and every '-' + charactor is converted to a '$'. + + = @@ -1480,6 +1490,10 @@ = <push-registration> <durable>false</durable> + <selector><![CDATA[ = + SomeAttribute > 1 = + ]]> + </selector> <link rel=3D"push" href=3D"http://somewhere.com" type=3D"application= /json" method=3D"PUT"/> </push-registration> @@ -1493,6 +1507,10 @@ queue-push-store-dir config variable defined in Chapter 2. (topic-push-store-dir for topics). = + The selector element is optional and define= s a + JMS message selector. You should enclose it within CDATA blocks as s= ome + of the selector characters are illegal XML. + The link element specifies the basis of the interaction. The href attribute contains the URL = you want to interact with. It is the only required attribute. The @@ -1562,11 +1580,16 @@ The Topic Push Subscription XML = The push XML for a topic is the same except the root element is - push-topic-registration. The rest of the document is the same. Here'= s an + push-topic-registration. (Also remember the selector + element is optional). The rest of the document is the same. Here's = an example of a template registration: = <push-topic-registration> <durable>true</durable> + <selector><![CDATA[ = + SomeAttribute > 1 = + ]]> + </selector> <link rel=3D"template" href=3D"http://somewhere.com/resources/{id}/m= essages" method=3D"POST"/> </push-topic registration> Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu= eue/AcknowledgedQueueConsumer.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Ac= knowledgedQueueConsumer.java 2010-10-25 15:49:13 UTC (rev 9811) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Ac= knowledgedQueueConsumer.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -31,10 +31,10 @@ protected String startup =3D Long.toString(System.currentTimeMillis()); protected volatile Acknowledgement ack; = - public AcknowledgedQueueConsumer(ClientSessionFactory factory, String d= estination, String id, DestinationServiceManager serviceManager) + public AcknowledgedQueueConsumer(ClientSessionFactory factory, String d= estination, String id, DestinationServiceManager serviceManager, String sel= ector) throws HornetQException { - super(factory, destination, id, serviceManager); + super(factory, destination, id, serviceManager, selector); autoAck =3D false; } = @@ -187,9 +187,7 @@ = try { - session =3D factory.createSession(); - consumer =3D session.createConsumer(destination); - session.start(); + createSession(); } catch (Exception e) { Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu= eue/ConsumersResource.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Co= nsumersResource.java 2010-10-25 15:49:13 UTC (rev 9811) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Co= nsumersResource.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -36,6 +36,9 @@ protected int consumerTimeoutSeconds; protected DestinationServiceManager serviceManager; = + protected static final int ACKNOWLEDGED =3D 0x01; + protected static final int SELECTOR_SET =3D 0x02; + public DestinationServiceManager getServiceManager() { return serviceManager; @@ -78,6 +81,7 @@ = private Object timeoutLock =3D new Object(); = + @Override public void testTimeout(String target) { synchronized (timeoutLock) @@ -107,32 +111,41 @@ = @POST public Response createSubscription(@FormParam("autoAck") @DefaultValue(= "true") boolean autoAck, + @FormParam("selector") String select= or, @Context UriInfo uriInfo) { try { QueueConsumer consumer =3D null; + int attributes =3D 0; + if (selector !=3D null) + { + attributes =3D attributes | SELECTOR_SET; + } + = if (autoAck) { - consumer =3D createConsumer(); + consumer =3D createConsumer(selector); } else { - consumer =3D createAcknowledgedConsumer(); + attributes |=3D ACKNOWLEDGED; + consumer =3D createAcknowledgedConsumer(selector); } = + String attributesSegment =3D "attributes-" + attributes; UriBuilder location =3D uriInfo.getAbsolutePathBuilder(); - if (autoAck) location.path("auto-ack"); - else location.path("acknowledged"); + location.path(attributesSegment); location.path(consumer.getId()); Response.ResponseBuilder builder =3D Response.created(location.bu= ild()); + if (autoAck) { - QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrateg= y(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/auto-ack/" + con= sumer.getId(), "-1"); + QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrateg= y(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSe= gment +"/" + consumer.getId(), "-1"); } else { - AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManage= r.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/= acknowledged/" + consumer.getId(), "-1"); + AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManage= r.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/= " + attributesSegment +"/" + consumer.getId(), "-1"); = } return builder.build(); @@ -147,11 +160,11 @@ } } = - public QueueConsumer createConsumer() + public QueueConsumer createConsumer(String selector) throws HornetQException { String genId =3D sessionCounter.getAndIncrement() + "-queue-" + dest= ination + "-" + startup; - QueueConsumer consumer =3D new QueueConsumer(sessionFactory, destina= tion, genId, serviceManager); + QueueConsumer consumer =3D new QueueConsumer(sessionFactory, destina= tion, genId, serviceManager, selector); synchronized (timeoutLock) { queueConsumers.put(genId, consumer); @@ -160,11 +173,11 @@ return consumer; } = - public QueueConsumer createAcknowledgedConsumer() + public QueueConsumer createAcknowledgedConsumer(String selector) throws HornetQException { String genId =3D sessionCounter.getAndIncrement() + "-queue-" + dest= ination + "-" + startup; - QueueConsumer consumer =3D new AcknowledgedQueueConsumer(sessionFact= ory, destination, genId, serviceManager); + QueueConsumer consumer =3D new AcknowledgedQueueConsumer(sessionFact= ory, destination, genId, serviceManager, selector); synchronized (timeoutLock) { queueConsumers.put(genId, consumer); @@ -173,85 +186,81 @@ return consumer; } = - @Path("auto-ack/{consumer-id}") + @Path("attributes-{attributes}/{consumer-id}") @GET - public Response getConsumer(@PathParam("consumer-id") String consumerId, + public Response getConsumer(@PathParam("attributes") int attributes, + @PathParam("consumer-id") String consumerId, @Context UriInfo uriInfo) throws Exception { - return headConsumer(consumerId, uriInfo); + return headConsumer(attributes, consumerId, uriInfo); } = - @Path("auto-ack/{consumer-id}") + @Path("attributes-{attributes}/{consumer-id}") @HEAD - public Response headConsumer(@PathParam("consumer-id") String consumerI= d, + public Response headConsumer(@PathParam("attributes") int attributes, + @PathParam("consumer-id") String consumerI= d, @Context UriInfo uriInfo) throws Exception { - QueueConsumer consumer =3D findConsumer(consumerId); + QueueConsumer consumer =3D findConsumer(attributes, consumerId, uriI= nfo); Response.ResponseBuilder builder =3D Response.noContent(); // we synchronize just in case a failed request is still processing synchronized (consumer) { - QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy()= , builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + co= nsumer.getId(), Long.toString(consumer.getConsumeIndex())); + if ( (attributes & ACKNOWLEDGED) > 0) + { + AcknowledgedQueueConsumer ackedConsumer =3D (AcknowledgedQueue= Consumer)consumer; + Acknowledgement ack =3D ackedConsumer.getAck(); + if (ack =3D=3D null || ack.wasSet()) + { + AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceMan= ager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) += "/attributes-" + attributes + "/" + consumer.getId(), Long.toString(consum= er.getConsumeIndex())); + } + else + { + ackedConsumer.setAcknowledgementLink(builder, uriInfo, uriI= nfo.getMatchedURIs().get(1) + "/attributes-" + attributes + "/" + consumer.= getId()); + } + + } + else + { + QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrateg= y(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/attributes-" + a= ttributes + "/" + consumer.getId(), Long.toString(consumer.getConsumeIndex(= ))); + } } return builder.build(); } = - @Path("auto-ack/{consumer-id}") + @Path("attributes-{attributes}/{consumer-id}") public QueueConsumer findConsumer( - @PathParam("consumer-id") String consumerId) throws Exception + @PathParam("attributes") int attributes, + @PathParam("consumer-id") String consumerId, + @Context UriInfo uriInfo) throws Exception { QueueConsumer consumer =3D queueConsumers.get(consumerId); if (consumer =3D=3D null) { - QueueConsumer tmp =3D new QueueConsumer(sessionFactory, destinati= on, consumerId, serviceManager); - consumer =3D addConsumerToMap(consumerId, tmp); - } - return consumer; - } + if ( (attributes & SELECTOR_SET) > 0) + { = - @Path("acknowledged/{consumer-id}") - @GET - public Response getAcknowledgedConsumer(@PathParam("consumer-id") Strin= g consumerId, - @Context UriInfo uriInfo) throw= s Exception - { - return headAcknowledgedConsumer(consumerId, uriInfo); - } + Response.ResponseBuilder builder =3D Response.status(Response.= Status.GONE) + .entity("Cannot reconnect to selector-based consumer. = You must recreate the consumer session.") + .type("text/plain"); + UriBuilder uriBuilder =3D uriInfo.getBaseUriBuilder(); + uriBuilder.path(uriInfo.getMatchedURIs().get(1)); + serviceManager.getLinkStrategy().setLinkHeader(builder, "pull-= consumers", "pull-consumers", uriBuilder.build().toString(), null); + throw new WebApplicationException(builder.build()); + = + } + if ( (attributes & ACKNOWLEDGED) > 0) + { + QueueConsumer tmp =3D new AcknowledgedQueueConsumer(sessionFac= tory, destination, consumerId, serviceManager, null); + consumer =3D addConsumerToMap(consumerId, tmp); = - @Path("acknowledged/{consumer-id}") - @HEAD - public Response headAcknowledgedConsumer(@PathParam("consumer-id") Stri= ng consumerId, - @Context UriInfo uriInfo) thro= ws Exception - { - AcknowledgedQueueConsumer consumer =3D (AcknowledgedQueueConsumer) f= indAcknowledgedConsumer(consumerId); - Response.ResponseBuilder builder =3D Response.ok(); - // we synchronize just in case a failed request is still processing - synchronized (consumer) - { - Acknowledgement ack =3D consumer.getAck(); - if (ack =3D=3D null || ack.wasSet()) - { - AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManage= r.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/= acknowledged/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()= )); } else { - consumer.setAcknowledgementLink(builder, uriInfo, uriInfo.getM= atchedURIs().get(1) + "/acknowledged/" + consumer.getId()); + QueueConsumer tmp =3D new QueueConsumer(sessionFactory, destin= ation, consumerId, serviceManager, null); + consumer =3D addConsumerToMap(consumerId, tmp); } } - return builder.build(); - } - - - @Path("acknowledged/{consumer-id}") - public QueueConsumer findAcknowledgedConsumer( - @PathParam("consumer-id") String consumerId) throws Exception - { - QueueConsumer consumer =3D queueConsumers.get(consumerId); - if (consumer =3D=3D null) - { - QueueConsumer tmp =3D new AcknowledgedQueueConsumer(sessionFactor= y, destination, consumerId, serviceManager); - ; - consumer =3D addConsumerToMap(consumerId, tmp); - } return consumer; } = @@ -275,16 +284,8 @@ } = = - @Path("acknowledged/{consumer-id}") + @Path("attributes-{attributes}/{consumer-id}") @DELETE - public void closeAcknowledgedSession( - @PathParam("consumer-id") String consumerId) - { - closeSession(consumerId); - } - - @Path("auto-ack/{consumer-id}") - @DELETE public void closeSession( @PathParam("consumer-id") String consumerId) { Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu= eue/PostMessage.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po= stMessage.java 2010-10-25 15:49:13 UTC (rev 9811) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po= stMessage.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -1,12 +1,12 @@ package org.hornetq.rest.queue; = import org.hornetq.api.core.HornetQException; -import org.hornetq.api.core.Message; import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientProducer; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.rest.util.HttpMessageHelper; +import org.hornetq.api.core.Message; = import javax.ws.rs.POST; import javax.ws.rs.PUT; Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu= eue/QueueConsumer.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Qu= eueConsumer.java 2010-10-25 15:49:13 UTC (rev 9811) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Qu= eueConsumer.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -5,6 +5,7 @@ import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.jms.client.SelectorTranslator; import org.hornetq.rest.util.Constants; import org.hornetq.rest.util.LinkStrategy; = @@ -37,6 +38,7 @@ protected long lastPing =3D System.currentTimeMillis(); protected DestinationServiceManager serviceManager; protected boolean autoAck =3D true; + protected String selector; = /** * token used to create consume-next links @@ -70,14 +72,15 @@ lastPing =3D System.currentTimeMillis(); } = - public QueueConsumer(ClientSessionFactory factory, String destination, = String id, DestinationServiceManager serviceManager) throws HornetQException + public QueueConsumer(ClientSessionFactory factory, String destination, = String id, DestinationServiceManager serviceManager, String selector) throw= s HornetQException { this.factory =3D factory; this.destination =3D destination; this.id =3D id; this.serviceManager =3D serviceManager; + this.selector =3D selector; = - createSession(factory, destination); + createSession(); } = public String getId() @@ -191,11 +194,18 @@ } } = - protected void createSession(ClientSessionFactory factory, String desti= nation) + protected void createSession() throws HornetQException { session =3D factory.createSession(true, true); - consumer =3D session.createConsumer(destination); + if (selector =3D=3D null) + { + consumer =3D session.createConsumer(destination); + } + else + { + consumer =3D session.createConsumer(destination, SelectorTranslat= or.convertToHornetQFilterString(selector)); + } session.start(); } = Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu= eue/push/PushConsumer.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pu= sh/PushConsumer.java 2010-10-25 15:49:13 UTC (rev 9811) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pu= sh/PushConsumer.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -7,6 +7,7 @@ import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.MessageHandler; import org.hornetq.core.logging.Logger; +import org.hornetq.jms.client.SelectorTranslator; import org.hornetq.rest.queue.push.xml.PushRegistration; = /** @@ -68,7 +69,14 @@ strategy.start(); = session =3D factory.createSession(false, false); - consumer =3D session.createConsumer(destination); + if (registration.getSelector() !=3D null) + { + consumer =3D session.createConsumer(destination, SelectorTranslat= or.convertToHornetQFilterString(registration.getSelector())); + } + else + { + consumer =3D session.createConsumer(destination); + } consumer.setMessageHandler(this); session.start(); log.info("Push consumer started for: " + registration.getTarget()); @@ -100,6 +108,7 @@ } } = + @Override public void onMessage(ClientMessage clientMessage) { if (strategy.push(clientMessage) =3D=3D false) Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu= eue/push/xml/PushRegistration.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pu= sh/xml/PushRegistration.java 2010-10-25 15:49:13 UTC (rev 9811) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/pu= sh/xml/PushRegistration.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -18,7 +18,7 @@ */ @XmlRootElement(name =3D "push-registration") @XmlAccessorType(XmlAccessType.PROPERTY) -(a)XmlType(propOrder =3D {"destination", "durable", "target", "authenticat= ionMechanism", "headers"}) +(a)XmlType(propOrder =3D {"destination", "durable", "selector", "target", = "authenticationMechanism", "headers"}) public class PushRegistration implements Serializable { private String id; @@ -28,6 +28,7 @@ private List headers =3D new ArrayList(); private String destination; private Object loadedFrom; + private String selector; = @XmlTransient public Object getLoadedFrom() @@ -73,6 +74,16 @@ this.durable =3D durable; } = + public String getSelector() + { + return selector; + } + + public void setSelector(String selector) + { + this.selector =3D selector; + } + @XmlElementRef public XmlLink getTarget() { Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/to= pic/AcknowledgedSubscriptionResource.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Ac= knowledgedSubscriptionResource.java 2010-10-25 15:49:13 UTC (rev 9811) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Ac= knowledgedSubscriptionResource.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -13,10 +13,10 @@ { private boolean durable; = - public AcknowledgedSubscriptionResource(ClientSessionFactory factory, S= tring destination, String id, DestinationServiceManager serviceManager) + public AcknowledgedSubscriptionResource(ClientSessionFactory factory, S= tring destination, String id, DestinationServiceManager serviceManager, Str= ing selector) throws HornetQException { - super(factory, destination, id, serviceManager); + super(factory, destination, id, serviceManager, selector); } = public boolean isDurable() Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/to= pic/FileTopicPushStore.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Fi= leTopicPushStore.java 2010-10-25 15:49:13 UTC (rev 9811) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Fi= leTopicPushStore.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -18,6 +18,7 @@ super(dirname); } = + @Override public synchronized List getByTopic(String topic) { List list =3D new ArrayList(); Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/to= pic/SubscriptionResource.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su= bscriptionResource.java 2010-10-25 15:49:13 UTC (rev 9811) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su= bscriptionResource.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -13,10 +13,10 @@ { boolean durable; = - public SubscriptionResource(ClientSessionFactory factory, String destin= ation, String id, DestinationServiceManager serviceManager) + public SubscriptionResource(ClientSessionFactory factory, String destin= ation, String id, DestinationServiceManager serviceManager, String selector) throws HornetQException { - super(factory, destination, id, serviceManager); + super(factory, destination, id, serviceManager, selector); } = public boolean isDurable() Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/to= pic/SubscriptionsResource.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su= bscriptionsResource.java 2010-10-25 15:49:13 UTC (rev 9811) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su= bscriptionsResource.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -84,6 +84,7 @@ = private Object timeoutLock =3D new Object(); = + @Override public void testTimeout(String target) { synchronized (timeoutLock) @@ -127,6 +128,7 @@ public Response createSubscription(@FormParam("durable") @DefaultValue(= "false") boolean durable, @FormParam("autoAck") @DefaultValue(= "true") boolean autoAck, @FormParam("name") String subscripti= onName, + @FormParam("selector") String select= or, @Context UriInfo uriInfo) { if (subscriptionName !=3D null) @@ -185,7 +187,7 @@ session.createTemporaryQueue(destination, subscriptionName); } } - QueueConsumer consumer =3D createConsumer(durable, autoAck, subsc= riptionName); + QueueConsumer consumer =3D createConsumer(durable, autoAck, subsc= riptionName, selector); queueConsumers.put(consumer.getId(), consumer); serviceManager.getTimeoutTask().add(this, consumer.getId()); = @@ -225,19 +227,19 @@ } } = - protected QueueConsumer createConsumer(boolean durable, boolean autoAck= , String subscriptionName) + protected QueueConsumer createConsumer(boolean durable, boolean autoAck= , String subscriptionName, String selector) throws HornetQException { QueueConsumer consumer; if (autoAck) { - SubscriptionResource subscription =3D new SubscriptionResource(se= ssionFactory, subscriptionName, subscriptionName, serviceManager); + SubscriptionResource subscription =3D new SubscriptionResource(se= ssionFactory, subscriptionName, subscriptionName, serviceManager, selector); subscription.setDurable(durable); consumer =3D subscription; } else { - AcknowledgedSubscriptionResource subscription =3D new Acknowledge= dSubscriptionResource(sessionFactory, subscriptionName, subscriptionName, s= erviceManager); + AcknowledgedSubscriptionResource subscription =3D new Acknowledge= dSubscriptionResource(sessionFactory, subscriptionName, subscriptionName, s= erviceManager, selector); subscription.setDurable(durable); consumer =3D subscription; } @@ -375,7 +377,7 @@ QueueConsumer tmp =3D null; try { - tmp =3D createConsumer(true, autoAck, subscriptionId); + tmp =3D createConsumer(true, autoAck, subscriptionId, null); } catch (HornetQException e) { Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/ut= il/HttpMessageHelper.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/Htt= pMessageHelper.java 2010-10-25 15:49:13 UTC (rev 9811) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/Htt= pMessageHelper.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -81,17 +81,20 @@ int size =3D message.getBodySize(); if (size > 0) { - byte[] body =3D new byte[size]; - message.getBodyBuffer().readBytes(body); Boolean aBoolean =3D message.getBooleanProperty(POSTED_AS_HTTP_ME= SSAGE); if (aBoolean !=3D null && aBoolean.booleanValue()) { + byte[] body =3D new byte[size]; + message.getBodyBuffer().readBytes(body); //System.out.println("Building Message from HTTP message"); request.body(contentType, body); } else { // assume posted as a JMS or HornetQ object message + size =3D message.getBodyBuffer().readInt(); + byte[] body =3D new byte[size]; + message.getBodyBuffer().readBytes(body); ByteArrayInputStream bais =3D new ByteArrayInputStream(body); Object obj =3D null; try Added: trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/= SelectorTest.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/Sel= ectorTest.java (rev 0) +++ trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/Sel= ectorTest.java 2010-10-25 19:35:10 UTC (rev 9812) @@ -0,0 +1,304 @@ +package org.hornetq.rest.test; + +import org.hornetq.jms.client.HornetQConnectionFactory; +import org.hornetq.jms.client.HornetQDestination; +import org.hornetq.jms.client.HornetQJMSConnectionFactory; +import org.hornetq.rest.HttpHeaderProperty; +import org.hornetq.rest.queue.push.xml.XmlLink; +import org.hornetq.rest.topic.PushTopicRegistration; +import org.hornetq.rest.topic.TopicDeployment; +import org.jboss.resteasy.client.ClientRequest; +import org.jboss.resteasy.client.ClientResponse; +import org.jboss.resteasy.spi.Link; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; + +import static org.jboss.resteasy.test.TestPortProvider.*; + +/** + * @author Bill Burke + * @version $Revision: 1 $ + */ +public class SelectorTest extends MessageTestBase +{ + public static ConnectionFactory connectionFactory; + public static String topicName =3D HornetQDestination.createQueueAddres= sFromName("testTopic").toString(); + + @BeforeClass + public static void setup() throws Exception + { + connectionFactory =3D new HornetQJMSConnectionFactory(manager.getQue= ueManager().getSessionFactory()); + System.out.println("Queue name: " + topicName); + TopicDeployment deployment =3D new TopicDeployment(); + deployment.setDuplicatesAllowed(true); + deployment.setDurableSend(false); + deployment.setName(topicName); + manager.getTopicManager().deploy(deployment); + } + + @XmlRootElement + public static class Order implements Serializable + { + private String name; + private String amount; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name =3D name; + } + + public String getAmount() + { + return amount; + } + + public void setAmount(String amount) + { + this.amount =3D amount; + } + + @Override + public boolean equals(Object o) + { + if (this =3D=3D o) return true; + if (o =3D=3D null || getClass() !=3D o.getClass()) return false; + + Order order =3D (Order) o; + + if (!amount.equals(order.amount)) return false; + if (!name.equals(order.name)) return false; + + return true; + } + + @Override + public String toString() + { + return "Order{" + + "name=3D'" + name + '\'' + + '}'; + } + + @Override + public int hashCode() + { + int result =3D name.hashCode(); + result =3D 31 * result + amount.hashCode(); + return result; + } + } + + public static Destination createDestination(String dest) + { + HornetQDestination destination =3D (HornetQDestination) HornetQDesti= nation.fromAddress(dest); + System.out.println("SimpleAddress: " + destination.getSimpleAddress(= )); + return destination; + } + + public static void publish(String dest, Serializable object, String con= tentType, String tag) throws Exception + { + Connection conn =3D connectionFactory.createConnection(); + try + { + Session session =3D conn.createSession(false, Session.AUTO_ACKNOW= LEDGE); + Destination destination =3D createDestination(dest); + MessageProducer producer =3D session.createProducer(destination); + ObjectMessage message =3D session.createObjectMessage(); + + if (contentType !=3D null) + { + message.setStringProperty(HttpHeaderProperty.CONTENT_TYPE, con= tentType); + } + if (tag !=3D null) + { + message.setStringProperty("MyTag", tag); + } + message.setObject(object); + + producer.send(message); + } + finally + { + conn.close(); + } + } + + @Path("/push") + public static class PushReceiver + { + public static Order oneOrder; + public static Order twoOrder; + + @POST + @Path("one") + public void one(Order order) + { + oneOrder =3D order; + } + + @POST + @Path("two") + public void two(Order order) + { + twoOrder =3D order; = + } + + + } + + @Test + public void testPush() throws Exception + { + server.getJaxrsServer().getDeployment().getRegistry().addPerRequestR= esource(PushReceiver.class); + ClientRequest request =3D new ClientRequest(generateURL("/topics/" += topicName)); + + ClientResponse response =3D request.head(); + Assert.assertEquals(200, response.getStatus()); + Link consumers =3D MessageTestBase.getLinkByTitle(manager.getQueueMa= nager().getLinkStrategy(), response, "push-subscriptions"); + System.out.println("push: " + consumers); + + PushTopicRegistration oneReg =3D new PushTopicRegistration(); + oneReg.setDurable(false); + XmlLink target =3D new XmlLink(); + target.setMethod("post"); + target.setHref(generateURL("/push/one")); + target.setType("application/xml"); + oneReg.setTarget(target); + oneReg.setSelector("MyTag =3D '1'"); + response =3D consumers.request().body("application/xml", oneReg).pos= t(); + Link oneSubscription =3D response.getLocation(); + + PushTopicRegistration twoReg =3D new PushTopicRegistration(); + twoReg.setDurable(false); + target =3D new XmlLink(); + target.setMethod("post"); + target.setHref(generateURL("/push/two")); + target.setType("application/xml"); + twoReg.setTarget(target); + twoReg.setSelector("MyTag =3D '2'"); + response =3D consumers.request().body("application/xml", twoReg).pos= t(); + Link twoSubscription =3D response.getLocation(); + + Order order =3D new Order(); + order.setName("1"); + order.setAmount("$5.00"); + publish(topicName, order, null, "1"); + Thread.sleep(200); + Assert.assertEquals(order, PushReceiver.oneOrder); + + order.setName("2"); + publish(topicName, order, null, "2"); + Thread.sleep(200); + Assert.assertEquals(order, PushReceiver.twoOrder); + + order.setName("3"); + publish(topicName, order, null, "2"); + Thread.sleep(200); + Assert.assertEquals(order, PushReceiver.twoOrder); + + order.setName("4"); + publish(topicName, order, null, "1"); + Thread.sleep(200); + Assert.assertEquals(order, PushReceiver.oneOrder); + + order.setName("5"); + publish(topicName, order, null, "1"); + Thread.sleep(200); + Assert.assertEquals(order, PushReceiver.oneOrder); + + order.setName("6"); + publish(topicName, order, null, "1"); + Thread.sleep(200); + Assert.assertEquals(order, PushReceiver.oneOrder); + + oneSubscription.request().delete(); + twoSubscription.request().delete(); + + + } + + + @Test + public void testPull() throws Exception + { + ClientRequest request =3D new ClientRequest(generateURL("/topics/" += topicName)); + + ClientResponse response =3D request.head(); + Assert.assertEquals(200, response.getStatus()); + Link consumers =3D MessageTestBase.getLinkByTitle(manager.getQueueMa= nager().getLinkStrategy(), response, "pull-subscriptions"); + System.out.println("pull: " + consumers); + response =3D consumers.request().formParameter("autoAck", "true") + .formParameter("selector", "MyTag =3D '1'").post(); + + Link consumeOne =3D MessageTestBase.getLinkByTitle(manager.getQueueM= anager().getLinkStrategy(), response, "consume-next"); + System.out.println("consumeOne: " + consumeOne); + response =3D consumers.request().formParameter("autoAck", "true") + .formParameter("selector", "MyTag =3D '2'").post(); + Link consumeTwo =3D MessageTestBase.getLinkByTitle(manager.getQueueM= anager().getLinkStrategy(), response, "consume-next"); + System.out.println("consumeTwo: " + consumeTwo); + + + // test that Accept header is used to set content-type + { + Order order =3D new Order(); + order.setName("1"); + order.setAmount("$5.00"); + publish(topicName, order, null, "1"); + order.setName("2"); + publish(topicName, order, null, "2"); + order.setName("3"); + publish(topicName, order, null, "2"); + order.setName("4"); + publish(topicName, order, null, "1"); + order.setName("5"); + publish(topicName, order, null, "1"); + order.setName("6"); + publish(topicName, order, null, "1"); + + { + order.setName("1"); + consumeOne =3D consumeOrder(order, consumeOne); + order.setName("2"); + consumeTwo =3D consumeOrder(order, consumeTwo); + order.setName("3"); + consumeTwo =3D consumeOrder(order, consumeTwo); + order.setName("4"); + consumeOne =3D consumeOrder(order, consumeOne); + order.setName("5"); + consumeOne =3D consumeOrder(order, consumeOne); + order.setName("6"); + consumeOne =3D consumeOrder(order, consumeOne); + } + } + } + + private Link consumeOrder(Order order, Link consumeNext) + throws Exception + { + ClientResponse res =3D consumeNext.request().header("Accept-Wait", "= 4").accept("application/xml").post(String.class); + Assert.assertEquals(200, res.getStatus()); + Assert.assertEquals("application/xml", res.getHeaders().getFirst("Co= ntent-Type").toString().toLowerCase()); + Order order2 =3D (Order) res.getEntity(Order.class); + Assert.assertEquals(order, order2); + consumeNext =3D MessageTestBase.getLinkByTitle(manager.getQueueManag= er().getLinkStrategy(), res, "consume-next"); + Assert.assertNotNull(consumeNext); + return consumeNext; + } +} \ No newline at end of file --===============7188948122459675551==--