Author: bill.burke(a)jboss.com
Date: 2011-01-03 19:35:39 -0500 (Mon, 03 Jan 2011)
New Revision: 10105
Modified:
trunk/hornetq-rest/docbook/reference/en/master.xml
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
trunk/pom.xml
trunk/src/Hornetq.iml
Log:
deadlock bug and expiration/priority support
Modified: trunk/hornetq-rest/docbook/reference/en/master.xml
===================================================================
--- trunk/hornetq-rest/docbook/reference/en/master.xml 2011-01-02 05:04:27 UTC (rev
10104)
+++ trunk/hornetq-rest/docbook/reference/en/master.xml 2011-01-04 00:35:39 UTC (rev
10105)
@@ -605,7 +605,7 @@
<literal>msg-create</literal> header.</para>
<para><programlisting>POST /queues/jms.queue.bar/create
-Host: example.xom
+Host:
example.com
Content-Type: application/xml
<order>
@@ -687,7 +687,7 @@
<literal>msg-create</literal> header.</para>
<para><programlisting>POST /queues/jms.queue.bar/create
-Host: example.xom
+Host:
example.com
Content-Type: application/xml
<order>
@@ -799,7 +799,7 @@
that.</para>
<programlisting>POST /queues/jms.queue.bar/create?durable=true
-Host: example.xom
+Host:
example.com
Content-Type: application/xml
<order>
@@ -809,6 +809,20 @@
</order>
</programlisting>
</sect1>
+ <sect1>
+ <title>Expiration and Priority</title>
+ <para>You can set he expiration and the priority of the message in the queue
or topic by setting an additional query parameter. The
<literal>expiration</literal> query parameter is an integer expressing the
time in milliseconds until the message should be expired. The
<literal>priority</literal> is another query parameter with an integer value
between 0 and 9 expressing the priority of the message. i.e.:</para>
+ <programlisting>POST
/queues/jms.queue.bar/create?expiration=30000&priority=3
+Host:
example.com
+Content-Type: application/xml
+
+<order>
+ <name>Bill</name>
+ <item>iPhone4</item>
+ <cost>$199.99</cost>
+</order>
+</programlisting>
+ </sect1>
</chapter>
<chapter>
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2011-01-02
05:04:27 UTC (rev 10104)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2011-01-04
00:35:39 UTC (rev 10105)
@@ -79,24 +79,18 @@
this.consumerTimeoutSeconds = consumerTimeoutSeconds;
}
- private Object timeoutLock = new Object();
-
- @Override
public void testTimeout(String target)
{
- synchronized (timeoutLock)
+ QueueConsumer consumer = queueConsumers.get(target);
+ if (consumer == null) return;
+ synchronized (consumer)
{
- QueueConsumer consumer = queueConsumers.get(target);
- if (consumer == null) return;
- synchronized (consumer)
+ if (System.currentTimeMillis() - consumer.getLastPingTime() >
consumerTimeoutSeconds * 1000)
{
- if (System.currentTimeMillis() - consumer.getLastPingTime() >
consumerTimeoutSeconds * 1000)
- {
- log.warn("shutdown REST consumer because of timeout for: " +
consumer.getId());
- consumer.shutdown();
- queueConsumers.remove(consumer.getId());
- serviceManager.getTimeoutTask().remove(consumer.getId());
- }
+ log.warn("shutdown REST consumer because of timeout for: " +
consumer.getId());
+ consumer.shutdown();
+ queueConsumers.remove(consumer.getId());
+ serviceManager.getTimeoutTask().remove(consumer.getId());
}
}
}
@@ -122,7 +116,7 @@
{
attributes = attributes | SELECTOR_SET;
}
-
+
if (autoAck)
{
consumer = createConsumer(selector);
@@ -141,11 +135,11 @@
if (autoAck)
{
- QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder,
uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment
+"/" + consumer.getId(), "-1");
+ QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder,
uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment +
"/" + consumer.getId(), "-1");
}
else
{
-
AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(),
builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment
+"/" + consumer.getId(), "-1");
+
AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(),
builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment +
"/" + consumer.getId(), "-1");
}
return builder.build();
@@ -160,16 +154,18 @@
}
}
+ protected void addConsumer(QueueConsumer consumer)
+ {
+ queueConsumers.put(consumer.getId(), consumer);
+ serviceManager.getTimeoutTask().add(this, consumer.getId());
+ }
+
public QueueConsumer createConsumer(String selector)
throws HornetQException
{
String genId = sessionCounter.getAndIncrement() + "-queue-" + destination
+ "-" + startup;
QueueConsumer consumer = new QueueConsumer(sessionFactory, destination, genId,
serviceManager, selector);
- synchronized (timeoutLock)
- {
- queueConsumers.put(genId, consumer);
- serviceManager.getTimeoutTask().add(this, consumer.getId());
- }
+ addConsumer(consumer);
return consumer;
}
@@ -178,11 +174,7 @@
{
String genId = sessionCounter.getAndIncrement() + "-queue-" + destination
+ "-" + startup;
QueueConsumer consumer = new AcknowledgedQueueConsumer(sessionFactory, destination,
genId, serviceManager, selector);
- synchronized (timeoutLock)
- {
- queueConsumers.put(genId, consumer);
- serviceManager.getTimeoutTask().add(this, consumer.getId());
- }
+ addConsumer(consumer);
return consumer;
}
@@ -206,9 +198,9 @@
// we synchronize just in case a failed request is still processing
synchronized (consumer)
{
- if ( (attributes & ACKNOWLEDGED) > 0)
+ if ((attributes & ACKNOWLEDGED) > 0)
{
- AcknowledgedQueueConsumer ackedConsumer =
(AcknowledgedQueueConsumer)consumer;
+ AcknowledgedQueueConsumer ackedConsumer = (AcknowledgedQueueConsumer)
consumer;
Acknowledgement ack = ackedConsumer.getAck();
if (ack == null || ack.wasSet())
{
@@ -237,7 +229,7 @@
QueueConsumer consumer = queueConsumers.get(consumerId);
if (consumer == null)
{
- if ( (attributes & SELECTOR_SET) > 0)
+ if ((attributes & SELECTOR_SET) > 0)
{
Response.ResponseBuilder builder = Response.status(Response.Status.GONE)
@@ -247,40 +239,37 @@
uriBuilder.path(uriInfo.getMatchedURIs().get(1));
serviceManager.getLinkStrategy().setLinkHeader(builder,
"pull-consumers", "pull-consumers", uriBuilder.build().toString(),
null);
throw new WebApplicationException(builder.build());
-
+
}
- if ( (attributes & ACKNOWLEDGED) > 0)
+ if ((attributes & ACKNOWLEDGED) > 0)
{
QueueConsumer tmp = new AcknowledgedQueueConsumer(sessionFactory,
destination, consumerId, serviceManager, null);
- consumer = addConsumerToMap(consumerId, tmp);
+ consumer = addReconnectedConsumerToMap(consumerId, tmp);
}
else
{
QueueConsumer tmp = new QueueConsumer(sessionFactory, destination,
consumerId, serviceManager, null);
- consumer = addConsumerToMap(consumerId, tmp);
+ consumer = addReconnectedConsumerToMap(consumerId, tmp);
}
}
return consumer;
}
- private QueueConsumer addConsumerToMap(String consumerId, QueueConsumer tmp)
+ private QueueConsumer addReconnectedConsumerToMap(String consumerId, QueueConsumer
tmp)
{
- synchronized (timeoutLock)
+ QueueConsumer consumer;
+ consumer = queueConsumers.putIfAbsent(consumerId, tmp);
+ if (consumer != null)
{
- QueueConsumer consumer;
- consumer = queueConsumers.putIfAbsent(consumerId, tmp);
- if (consumer != null)
- {
- tmp.shutdown();
- }
- else
- {
- consumer = tmp;
- serviceManager.getTimeoutTask().add(this, consumer.getId());
- }
- return consumer;
+ tmp.shutdown();
}
+ else
+ {
+ consumer = tmp;
+ serviceManager.getTimeoutTask().add(this, consumer.getId());
+ }
+ return consumer;
}
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java 2011-01-02
05:04:27 UTC (rev 10104)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java 2011-01-04
00:35:39 UTC (rev 10105)
@@ -42,13 +42,16 @@
return startupTime + Long.toString(counter.incrementAndGet());
}
- public void publish(HttpHeaders headers, byte[] body, String dup, boolean durable)
throws Exception
+ public void publish(HttpHeaders headers, byte[] body, String dup,
+ boolean durable,
+ Long expiration,
+ Integer priority) throws Exception
{
Pooled pooled = getPooled();
try
{
ClientProducer producer = pooled.producer;
- ClientMessage message = createHornetQMessage(headers, body, durable,
pooled.session);
+ ClientMessage message = createHornetQMessage(headers, body, durable, expiration,
priority, pooled.session);
message.putStringProperty(ClientMessage.HDR_DUPLICATE_DETECTION_ID.toString(),
dup);
producer.send(message);
pool.add(pooled);
@@ -69,14 +72,20 @@
@PUT
@Path("{id}")
- public Response putWithId(@PathParam("id") String dupId,
@QueryParam("durable") Boolean durable, @Context HttpHeaders headers, @Context
UriInfo uriInfo, byte[] body)
+ public Response putWithId(@PathParam("id") String dupId,
@QueryParam("durable") Boolean durable,
+ @QueryParam("expiration") Long expiration,
+ @QueryParam("priority") Integer priority,
+ @Context HttpHeaders headers, @Context UriInfo uriInfo,
byte[] body)
{
- return postWithId(dupId, durable, headers, uriInfo, body);
+ return postWithId(dupId, durable, expiration, priority, headers, uriInfo, body);
}
@POST
@Path("{id}")
- public Response postWithId(@PathParam("id") String dupId,
@QueryParam("durable") Boolean durable, @Context HttpHeaders headers, @Context
UriInfo uriInfo, byte[] body)
+ public Response postWithId(@PathParam("id") String dupId,
@QueryParam("durable") Boolean durable,
+ @QueryParam("expiration") Long expiration,
+ @QueryParam("priority") Integer priority,
+ @Context HttpHeaders headers, @Context UriInfo uriInfo,
byte[] body)
{
String matched = uriInfo.getMatchedURIs().get(1);
UriBuilder nextBuilder = uriInfo.getBaseUriBuilder();
@@ -91,7 +100,7 @@
}
try
{
- publish(headers, body, dupId, isDurable);
+ publish(headers, body, dupId, isDurable, expiration, priority);
}
catch (Exception e)
{
@@ -217,10 +226,27 @@
}
- protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[] body, boolean
durable, ClientSession session) throws Exception
+ protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[] body,
+ boolean durable,
+ Long expiration,
+ Integer priority,
+ ClientSession session) throws Exception
{
ClientMessage message = session.createMessage(Message.BYTES_TYPE, durable);
+ if (expiration != null)
+ {
+ message.setExpiration(expiration.longValue());
+ }
+ if (priority != null)
+ {
+ byte p = priority.byteValue();
+ if (p >= 0 && p <=9)
+ {
+ message.setPriority(p);
+ }
+ }
HttpMessageHelper.writeHttpMessage(headers, body, message);
return message;
}
+
}
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java 2011-01-02
05:04:27 UTC (rev 10104)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java 2011-01-04
00:35:39 UTC (rev 10105)
@@ -23,13 +23,15 @@
public class PostMessageDupsOk extends PostMessage
{
- public void publish(HttpHeaders headers, byte[] body, boolean durable) throws
Exception
+ public void publish(HttpHeaders headers, byte[] body, boolean durable,
+ Long expiration,
+ Integer priority) throws Exception
{
Pooled pooled = getPooled();
try
{
ClientProducer producer = pooled.producer;
- ClientMessage message = createHornetQMessage(headers, body, durable,
pooled.session);
+ ClientMessage message = createHornetQMessage(headers, body, durable, expiration,
priority, pooled.session);
producer.send(message);
pool.add(pooled);
}
@@ -50,6 +52,8 @@
@POST
public Response create(@Context HttpHeaders headers,
@QueryParam("durable") Boolean durable,
+ @QueryParam("expiration") Long expiration,
+ @QueryParam("priority") Integer priority,
@Context UriInfo uriInfo,
byte[] body)
{
@@ -60,7 +64,7 @@
{
isDurable = durable.booleanValue();
}
- publish(headers, body, isDurable);
+ publish(headers, body, isDurable, expiration, priority);
}
catch (Exception e)
{
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2011-01-02
05:04:27 UTC (rev 10104)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2011-01-04
00:35:39 UTC (rev 10105)
@@ -82,24 +82,19 @@
this.destination = destination;
}
- private Object timeoutLock = new Object();
- @Override
public void testTimeout(String target)
{
- synchronized (timeoutLock)
+ QueueConsumer consumer = queueConsumers.get(target);
+ if (consumer == null) return;
+ synchronized (consumer)
{
- QueueConsumer consumer = queueConsumers.get(target);
- if (consumer == null) return;
- synchronized (consumer)
+ if (System.currentTimeMillis() - consumer.getLastPingTime() >
consumerTimeoutSeconds * 1000)
{
- if (System.currentTimeMillis() - consumer.getLastPingTime() >
consumerTimeoutSeconds * 1000)
- {
- log.warn("shutdown REST consumer because of session timeout for:
" + consumer.getId());
- consumer.shutdown();
- queueConsumers.remove(consumer.getId());
- serviceManager.getTimeoutTask().remove(consumer.getId());
- }
+ log.warn("shutdown REST consumer because of session timeout for: "
+ consumer.getId());
+ consumer.shutdown();
+ queueConsumers.remove(consumer.getId());
+ serviceManager.getTimeoutTask().remove(consumer.getId());
}
}
}
@@ -372,28 +367,25 @@
QueueConsumer consumer;
if (subscriptionExists(subscriptionId))
{
- synchronized (timeoutLock)
+ QueueConsumer tmp = null;
+ try
{
- QueueConsumer tmp = null;
- try
- {
- tmp = createConsumer(true, autoAck, subscriptionId, null);
- }
- catch (HornetQException e)
- {
- throw new RuntimeException(e);
- }
- consumer = queueConsumers.putIfAbsent(subscriptionId, tmp);
- if (consumer == null)
- {
- consumer = tmp;
- serviceManager.getTimeoutTask().add(this, subscriptionId);
- }
- else
- {
- tmp.shutdown();
- }
+ tmp = createConsumer(true, autoAck, subscriptionId, null);
}
+ catch (HornetQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ consumer = queueConsumers.putIfAbsent(subscriptionId, tmp);
+ if (consumer == null)
+ {
+ consumer = tmp;
+ serviceManager.getTimeoutTask().add(this, subscriptionId);
+ }
+ else
+ {
+ tmp.shutdown();
+ }
}
else
{
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/pom.xml 2011-01-04 00:35:39 UTC (rev 10105)
@@ -22,7 +22,7 @@
<version>2.2.0.CR1</version>
<properties>
- <resteasy.version>2.0.1.GA</resteasy.version>
+ <resteasy.version>2.1.0.GA</resteasy.version>
</properties>
<name>HornetQ</name>
Modified: trunk/src/Hornetq.iml
===================================================================
--- trunk/src/Hornetq.iml 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/src/Hornetq.iml 2011-01-04 00:35:39 UTC (rev 10105)
@@ -7,28 +7,31 @@
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
+ <orderEntry type="library" name="jars"
level="project" />
+ <orderEntry type="library" name="lib"
level="project" />
<orderEntry type="library" name="lib10"
level="project" />
- <orderEntry type="library" name="lib13"
level="project" />
+ <orderEntry type="library" name="lib3"
level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
- <root
url="jar://$MODULE_DIR$/../thirdparty/org/jboss/metadata/lib/jboss-metadata.jar!/"
/>
+ <root
url="jar://$MODULE_DIR$/../thirdparty/org/twitter4j/lib/twitter4j-core.jar!/"
/>
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</orderEntry>
+ <orderEntry type="library" name="lib4"
level="project" />
+ <orderEntry type="library" name="lib7"
level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
- <root
url="jar://$MODULE_DIR$/../thirdparty/org/twitter4j/lib/twitter4j-core.jar!/"
/>
+ <root
url="jar://$MODULE_DIR$/../thirdparty/org/jboss/metadata/lib/jboss-metadata.jar!/"
/>
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</orderEntry>
- <orderEntry type="library" name="lib3"
level="project" />
- <orderEntry type="library" name="lib5"
level="project" />
+ <orderEntry type="library" name="lib2"
level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
@@ -38,6 +41,8 @@
<SOURCES />
</library>
</orderEntry>
+ <orderEntry type="library" name="lib5"
level="project" />
+ <orderEntry type="library" name="lib13"
level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
@@ -47,8 +52,6 @@
<SOURCES />
</library>
</orderEntry>
- <orderEntry type="library" name="jars"
level="project" />
- <orderEntry type="library" name="lib6"
level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
@@ -58,9 +61,6 @@
<SOURCES />
</library>
</orderEntry>
- <orderEntry type="library" name="lib9"
level="project" />
- <orderEntry type="library" name="lib4"
level="project" />
- <orderEntry type="library" name="lib"
level="project" />
</component>
</module>