From do-not-reply at jboss.org Mon Jan 3 19:35:40 2011 Content-Type: multipart/mixed; boundary="===============6160395679801446487==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r10105 - in trunk: hornetq-rest/docbook/reference/en and 3 other directories. Date: Mon, 03 Jan 2011 19:35:39 -0500 Message-ID: <201101040035.p040Zd4q015502@svn01.web.mwc.hst.phx2.redhat.com> --===============6160395679801446487== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: bill.burke(a)jboss.com Date: 2011-01-03 19:35:39 -0500 (Mon, 03 Jan 2011) New Revision: 10105 Modified: trunk/hornetq-rest/docbook/reference/en/master.xml trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Con= sumersResource.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Pos= tMessage.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Pos= tMessageDupsOk.java trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Sub= scriptionsResource.java trunk/pom.xml trunk/src/Hornetq.iml Log: deadlock bug and expiration/priority support Modified: trunk/hornetq-rest/docbook/reference/en/master.xml =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/docbook/reference/en/master.xml 2011-01-02 05:04:27 = UTC (rev 10104) +++ trunk/hornetq-rest/docbook/reference/en/master.xml 2011-01-04 00:35:39 = UTC (rev 10105) @@ -605,7 +605,7 @@ msg-create header. = POST /queues/jms.queue.bar/create -Host: example.xom +Host: example.com Content-Type: application/xml = <order> @@ -687,7 +687,7 @@ msg-create header. = POST /queues/jms.queue.bar/create -Host: example.xom +Host: example.com Content-Type: application/xml = <order> @@ -799,7 +799,7 @@ that. = POST /queues/jms.queue.bar/create?durable=3Dtrue -Host: example.xom +Host: example.com Content-Type: application/xml = <order> @@ -809,6 +809,20 @@ </order> + + Expiration and Priority + You can set he expiration and the priority of the message in th= e queue or topic by setting an additional query parameter. The ex= piration query parameter is an integer expressing the time in mil= liseconds until the message should be expired. The priority is another query parameter with an integer value between 0 and 9 expre= ssing the priority of the message. i.e.: + POST /queues/jms.queue.bar/create?expiration=3D30000= &priority=3D3 +Host: example.com +Content-Type: application/xml + +<order> + <name>Bill</name> + <item>iPhone4</item> + <cost>$199.99</cost> +</order> + + = Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu= eue/ConsumersResource.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Co= nsumersResource.java 2011-01-02 05:04:27 UTC (rev 10104) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Co= nsumersResource.java 2011-01-04 00:35:39 UTC (rev 10105) @@ -79,24 +79,18 @@ this.consumerTimeoutSeconds =3D consumerTimeoutSeconds; } = - private Object timeoutLock =3D new Object(); - - @Override public void testTimeout(String target) { - synchronized (timeoutLock) + QueueConsumer consumer =3D queueConsumers.get(target); + if (consumer =3D=3D null) return; + synchronized (consumer) { - QueueConsumer consumer =3D queueConsumers.get(target); - if (consumer =3D=3D null) return; - synchronized (consumer) + if (System.currentTimeMillis() - consumer.getLastPingTime() > con= sumerTimeoutSeconds * 1000) { - if (System.currentTimeMillis() - consumer.getLastPingTime() > = consumerTimeoutSeconds * 1000) - { - log.warn("shutdown REST consumer because of timeout for: " = + consumer.getId()); - consumer.shutdown(); - queueConsumers.remove(consumer.getId()); - serviceManager.getTimeoutTask().remove(consumer.getId()); - } + log.warn("shutdown REST consumer because of timeout for: " + c= onsumer.getId()); + consumer.shutdown(); + queueConsumers.remove(consumer.getId()); + serviceManager.getTimeoutTask().remove(consumer.getId()); } } } @@ -122,7 +116,7 @@ { attributes =3D attributes | SELECTOR_SET; } - = + if (autoAck) { consumer =3D createConsumer(selector); @@ -141,11 +135,11 @@ = if (autoAck) { - QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrateg= y(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSe= gment +"/" + consumer.getId(), "-1"); + QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrateg= y(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSe= gment + "/" + consumer.getId(), "-1"); } else { - AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManage= r.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/= " + attributesSegment +"/" + consumer.getId(), "-1"); + AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManage= r.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/= " + attributesSegment + "/" + consumer.getId(), "-1"); = } return builder.build(); @@ -160,16 +154,18 @@ } } = + protected void addConsumer(QueueConsumer consumer) + { + queueConsumers.put(consumer.getId(), consumer); + serviceManager.getTimeoutTask().add(this, consumer.getId()); + } + public QueueConsumer createConsumer(String selector) throws HornetQException { String genId =3D sessionCounter.getAndIncrement() + "-queue-" + dest= ination + "-" + startup; QueueConsumer consumer =3D new QueueConsumer(sessionFactory, destina= tion, genId, serviceManager, selector); - synchronized (timeoutLock) - { - queueConsumers.put(genId, consumer); - serviceManager.getTimeoutTask().add(this, consumer.getId()); - } + addConsumer(consumer); return consumer; } = @@ -178,11 +174,7 @@ { String genId =3D sessionCounter.getAndIncrement() + "-queue-" + dest= ination + "-" + startup; QueueConsumer consumer =3D new AcknowledgedQueueConsumer(sessionFact= ory, destination, genId, serviceManager, selector); - synchronized (timeoutLock) - { - queueConsumers.put(genId, consumer); - serviceManager.getTimeoutTask().add(this, consumer.getId()); - } + addConsumer(consumer); return consumer; } = @@ -206,9 +198,9 @@ // we synchronize just in case a failed request is still processing synchronized (consumer) { - if ( (attributes & ACKNOWLEDGED) > 0) + if ((attributes & ACKNOWLEDGED) > 0) { - AcknowledgedQueueConsumer ackedConsumer =3D (AcknowledgedQueue= Consumer)consumer; + AcknowledgedQueueConsumer ackedConsumer =3D (AcknowledgedQueue= Consumer) consumer; Acknowledgement ack =3D ackedConsumer.getAck(); if (ack =3D=3D null || ack.wasSet()) { @@ -237,7 +229,7 @@ QueueConsumer consumer =3D queueConsumers.get(consumerId); if (consumer =3D=3D null) { - if ( (attributes & SELECTOR_SET) > 0) + if ((attributes & SELECTOR_SET) > 0) { = Response.ResponseBuilder builder =3D Response.status(Response.= Status.GONE) @@ -247,40 +239,37 @@ uriBuilder.path(uriInfo.getMatchedURIs().get(1)); serviceManager.getLinkStrategy().setLinkHeader(builder, "pull-= consumers", "pull-consumers", uriBuilder.build().toString(), null); throw new WebApplicationException(builder.build()); - = + } - if ( (attributes & ACKNOWLEDGED) > 0) + if ((attributes & ACKNOWLEDGED) > 0) { QueueConsumer tmp =3D new AcknowledgedQueueConsumer(sessionFac= tory, destination, consumerId, serviceManager, null); - consumer =3D addConsumerToMap(consumerId, tmp); + consumer =3D addReconnectedConsumerToMap(consumerId, tmp); = } else { QueueConsumer tmp =3D new QueueConsumer(sessionFactory, destin= ation, consumerId, serviceManager, null); - consumer =3D addConsumerToMap(consumerId, tmp); + consumer =3D addReconnectedConsumerToMap(consumerId, tmp); } } return consumer; } = - private QueueConsumer addConsumerToMap(String consumerId, QueueConsumer= tmp) + private QueueConsumer addReconnectedConsumerToMap(String consumerId, Qu= eueConsumer tmp) { - synchronized (timeoutLock) + QueueConsumer consumer; + consumer =3D queueConsumers.putIfAbsent(consumerId, tmp); + if (consumer !=3D null) { - QueueConsumer consumer; - consumer =3D queueConsumers.putIfAbsent(consumerId, tmp); - if (consumer !=3D null) - { - tmp.shutdown(); - } - else - { - consumer =3D tmp; - serviceManager.getTimeoutTask().add(this, consumer.getId()); - } - return consumer; + tmp.shutdown(); } + else + { + consumer =3D tmp; + serviceManager.getTimeoutTask().add(this, consumer.getId()); + } + return consumer; } = = Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu= eue/PostMessage.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po= stMessage.java 2011-01-02 05:04:27 UTC (rev 10104) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po= stMessage.java 2011-01-04 00:35:39 UTC (rev 10105) @@ -42,13 +42,16 @@ return startupTime + Long.toString(counter.incrementAndGet()); } = - public void publish(HttpHeaders headers, byte[] body, String dup, boole= an durable) throws Exception + public void publish(HttpHeaders headers, byte[] body, String dup, + boolean durable, + Long expiration, + Integer priority) throws Exception { Pooled pooled =3D getPooled(); try { ClientProducer producer =3D pooled.producer; - ClientMessage message =3D createHornetQMessage(headers, body, dur= able, pooled.session); + ClientMessage message =3D createHornetQMessage(headers, body, dur= able, expiration, priority, pooled.session); message.putStringProperty(ClientMessage.HDR_DUPLICATE_DETECTION_I= D.toString(), dup); producer.send(message); pool.add(pooled); @@ -69,14 +72,20 @@ = @PUT @Path("{id}") - public Response putWithId(@PathParam("id") String dupId, @QueryParam("d= urable") Boolean durable, @Context HttpHeaders headers, @Context UriInfo ur= iInfo, byte[] body) + public Response putWithId(@PathParam("id") String dupId, @QueryParam("d= urable") Boolean durable, + @QueryParam("expiration") Long expiration, + @QueryParam("priority") Integer priority, + @Context HttpHeaders headers, @Context UriInf= o uriInfo, byte[] body) { - return postWithId(dupId, durable, headers, uriInfo, body); + return postWithId(dupId, durable, expiration, priority, headers, uri= Info, body); } = @POST @Path("{id}") - public Response postWithId(@PathParam("id") String dupId, @QueryParam("= durable") Boolean durable, @Context HttpHeaders headers, @Context UriInfo u= riInfo, byte[] body) + public Response postWithId(@PathParam("id") String dupId, @QueryParam("= durable") Boolean durable, + @QueryParam("expiration") Long expiration, + @QueryParam("priority") Integer priority, + @Context HttpHeaders headers, @Context UriIn= fo uriInfo, byte[] body) { String matched =3D uriInfo.getMatchedURIs().get(1); UriBuilder nextBuilder =3D uriInfo.getBaseUriBuilder(); @@ -91,7 +100,7 @@ } try { - publish(headers, body, dupId, isDurable); + publish(headers, body, dupId, isDurable, expiration, priority); } catch (Exception e) { @@ -217,10 +226,27 @@ } = = - protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[= ] body, boolean durable, ClientSession session) throws Exception + protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[= ] body, + boolean durable, + Long expiration, + Integer priority, + ClientSession session) thr= ows Exception { ClientMessage message =3D session.createMessage(Message.BYTES_TYPE, = durable); + if (expiration !=3D null) + { + message.setExpiration(expiration.longValue()); + } + if (priority !=3D null) + { + byte p =3D priority.byteValue(); + if (p >=3D 0 && p <=3D9) + { + message.setPriority(p); + } + } HttpMessageHelper.writeHttpMessage(headers, body, message); return message; } + } Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu= eue/PostMessageDupsOk.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po= stMessageDupsOk.java 2011-01-02 05:04:27 UTC (rev 10104) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po= stMessageDupsOk.java 2011-01-04 00:35:39 UTC (rev 10105) @@ -23,13 +23,15 @@ public class PostMessageDupsOk extends PostMessage { = - public void publish(HttpHeaders headers, byte[] body, boolean durable) = throws Exception + public void publish(HttpHeaders headers, byte[] body, boolean durable, + Long expiration, + Integer priority) throws Exception { Pooled pooled =3D getPooled(); try { ClientProducer producer =3D pooled.producer; - ClientMessage message =3D createHornetQMessage(headers, body, dur= able, pooled.session); + ClientMessage message =3D createHornetQMessage(headers, body, dur= able, expiration, priority, pooled.session); producer.send(message); pool.add(pooled); } @@ -50,6 +52,8 @@ @POST public Response create(@Context HttpHeaders headers, @QueryParam("durable") Boolean durable, + @QueryParam("expiration") Long expiration, + @QueryParam("priority") Integer priority, @Context UriInfo uriInfo, byte[] body) { @@ -60,7 +64,7 @@ { isDurable =3D durable.booleanValue(); } - publish(headers, body, isDurable); + publish(headers, body, isDurable, expiration, priority); } catch (Exception e) { Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/to= pic/SubscriptionsResource.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su= bscriptionsResource.java 2011-01-02 05:04:27 UTC (rev 10104) +++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su= bscriptionsResource.java 2011-01-04 00:35:39 UTC (rev 10105) @@ -82,24 +82,19 @@ this.destination =3D destination; } = - private Object timeoutLock =3D new Object(); = - @Override public void testTimeout(String target) { - synchronized (timeoutLock) + QueueConsumer consumer =3D queueConsumers.get(target); + if (consumer =3D=3D null) return; + synchronized (consumer) { - QueueConsumer consumer =3D queueConsumers.get(target); - if (consumer =3D=3D null) return; - synchronized (consumer) + if (System.currentTimeMillis() - consumer.getLastPingTime() > con= sumerTimeoutSeconds * 1000) { - if (System.currentTimeMillis() - consumer.getLastPingTime() > = consumerTimeoutSeconds * 1000) - { - log.warn("shutdown REST consumer because of session timeout= for: " + consumer.getId()); - consumer.shutdown(); - queueConsumers.remove(consumer.getId()); - serviceManager.getTimeoutTask().remove(consumer.getId()); - } + log.warn("shutdown REST consumer because of session timeout fo= r: " + consumer.getId()); + consumer.shutdown(); + queueConsumers.remove(consumer.getId()); + serviceManager.getTimeoutTask().remove(consumer.getId()); } } } @@ -372,28 +367,25 @@ QueueConsumer consumer; if (subscriptionExists(subscriptionId)) { - synchronized (timeoutLock) + QueueConsumer tmp =3D null; + try { - QueueConsumer tmp =3D null; - try - { - tmp =3D createConsumer(true, autoAck, subscriptionId, null); - } - catch (HornetQException e) - { - throw new RuntimeException(e); - } - consumer =3D queueConsumers.putIfAbsent(subscriptionId, tmp); - if (consumer =3D=3D null) - { - consumer =3D tmp; - serviceManager.getTimeoutTask().add(this, subscriptionId); - } - else - { - tmp.shutdown(); - } + tmp =3D createConsumer(true, autoAck, subscriptionId, null); } + catch (HornetQException e) + { + throw new RuntimeException(e); + } + consumer =3D queueConsumers.putIfAbsent(subscriptionId, tmp); + if (consumer =3D=3D null) + { + consumer =3D tmp; + serviceManager.getTimeoutTask().add(this, subscriptionId); + } + else + { + tmp.shutdown(); + } } else { Modified: trunk/pom.xml =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/pom.xml 2011-01-02 05:04:27 UTC (rev 10104) +++ trunk/pom.xml 2011-01-04 00:35:39 UTC (rev 10105) @@ -22,7 +22,7 @@ 2.2.0.CR1 = - 2.0.1.GA + 2.1.0.GA = HornetQ Modified: trunk/src/Hornetq.iml =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/Hornetq.iml 2011-01-02 05:04:27 UTC (rev 10104) +++ trunk/src/Hornetq.iml 2011-01-04 00:35:39 UTC (rev 10105) @@ -7,28 +7,31 @@ + + - + - + + + - + - - + @@ -38,6 +41,8 @@ + + @@ -47,8 +52,6 @@ - - @@ -58,9 +61,6 @@ - - - = --===============6160395679801446487==--