From do-not-reply at jboss.org Mon Jan 3 19:35:40 2011
Content-Type: multipart/mixed; boundary="===============4416222408873919115=="
MIME-Version: 1.0
From: do-not-reply at jboss.org
To: hornetq-commits at lists.jboss.org
Subject: [hornetq-commits] JBoss hornetq SVN: r10105 - in trunk:
hornetq-rest/docbook/reference/en and 3 other directories.
Date: Mon, 03 Jan 2011 19:35:39 -0500
Message-ID: <201101040035.p040Zd4q015502@svn01.web.mwc.hst.phx2.redhat.com>
--===============4416222408873919115==
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: quoted-printable
Author: bill.burke(a)jboss.com
Date: 2011-01-03 19:35:39 -0500 (Mon, 03 Jan 2011)
New Revision: 10105
Modified:
trunk/hornetq-rest/docbook/reference/en/master.xml
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Con=
sumersResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Pos=
tMessage.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Pos=
tMessageDupsOk.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Sub=
scriptionsResource.java
trunk/pom.xml
trunk/src/Hornetq.iml
Log:
deadlock bug and expiration/priority support
Modified: trunk/hornetq-rest/docbook/reference/en/master.xml
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/docbook/reference/en/master.xml 2011-01-02 05:04:27 =
UTC (rev 10104)
+++ trunk/hornetq-rest/docbook/reference/en/master.xml 2011-01-04 00:35:39 =
UTC (rev 10105)
@@ -605,7 +605,7 @@
msg-create header.
=
POST /queues/jms.queue.bar/create
-Host: example.xom
+Host: example.com
Content-Type: application/xml
=
<order>
@@ -687,7 +687,7 @@
msg-create header.
=
POST /queues/jms.queue.bar/create
-Host: example.xom
+Host: example.com
Content-Type: application/xml
=
<order>
@@ -799,7 +799,7 @@
that.
=
POST /queues/jms.queue.bar/create?durable=3Dtrue
-Host: example.xom
+Host: example.com
Content-Type: application/xml
=
<order>
@@ -809,6 +809,20 @@
</order>
+
+ Expiration and Priority
+ You can set he expiration and the priority of the message in th=
e queue or topic by setting an additional query parameter. The ex=
piration query parameter is an integer expressing the time in mil=
liseconds until the message should be expired. The priority is another query parameter with an integer value between 0 and 9 expre=
ssing the priority of the message. i.e.:
+ POST /queues/jms.queue.bar/create?expiration=3D30000=
&priority=3D3
+Host: example.com
+Content-Type: application/xml
+
+<order>
+ <name>Bill</name>
+ <item>iPhone4</item>
+ <cost>$199.99</cost>
+</order>
+
+
=
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu=
eue/ConsumersResource.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Co=
nsumersResource.java 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Co=
nsumersResource.java 2011-01-04 00:35:39 UTC (rev 10105)
@@ -79,24 +79,18 @@
this.consumerTimeoutSeconds =3D consumerTimeoutSeconds;
}
=
- private Object timeoutLock =3D new Object();
-
- @Override
public void testTimeout(String target)
{
- synchronized (timeoutLock)
+ QueueConsumer consumer =3D queueConsumers.get(target);
+ if (consumer =3D=3D null) return;
+ synchronized (consumer)
{
- QueueConsumer consumer =3D queueConsumers.get(target);
- if (consumer =3D=3D null) return;
- synchronized (consumer)
+ if (System.currentTimeMillis() - consumer.getLastPingTime() > con=
sumerTimeoutSeconds * 1000)
{
- if (System.currentTimeMillis() - consumer.getLastPingTime() > =
consumerTimeoutSeconds * 1000)
- {
- log.warn("shutdown REST consumer because of timeout for: " =
+ consumer.getId());
- consumer.shutdown();
- queueConsumers.remove(consumer.getId());
- serviceManager.getTimeoutTask().remove(consumer.getId());
- }
+ log.warn("shutdown REST consumer because of timeout for: " + c=
onsumer.getId());
+ consumer.shutdown();
+ queueConsumers.remove(consumer.getId());
+ serviceManager.getTimeoutTask().remove(consumer.getId());
}
}
}
@@ -122,7 +116,7 @@
{
attributes =3D attributes | SELECTOR_SET;
}
- =
+
if (autoAck)
{
consumer =3D createConsumer(selector);
@@ -141,11 +135,11 @@
=
if (autoAck)
{
- QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrateg=
y(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSe=
gment +"/" + consumer.getId(), "-1");
+ QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrateg=
y(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSe=
gment + "/" + consumer.getId(), "-1");
}
else
{
- AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManage=
r.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/=
" + attributesSegment +"/" + consumer.getId(), "-1");
+ AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManage=
r.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/=
" + attributesSegment + "/" + consumer.getId(), "-1");
=
}
return builder.build();
@@ -160,16 +154,18 @@
}
}
=
+ protected void addConsumer(QueueConsumer consumer)
+ {
+ queueConsumers.put(consumer.getId(), consumer);
+ serviceManager.getTimeoutTask().add(this, consumer.getId());
+ }
+
public QueueConsumer createConsumer(String selector)
throws HornetQException
{
String genId =3D sessionCounter.getAndIncrement() + "-queue-" + dest=
ination + "-" + startup;
QueueConsumer consumer =3D new QueueConsumer(sessionFactory, destina=
tion, genId, serviceManager, selector);
- synchronized (timeoutLock)
- {
- queueConsumers.put(genId, consumer);
- serviceManager.getTimeoutTask().add(this, consumer.getId());
- }
+ addConsumer(consumer);
return consumer;
}
=
@@ -178,11 +174,7 @@
{
String genId =3D sessionCounter.getAndIncrement() + "-queue-" + dest=
ination + "-" + startup;
QueueConsumer consumer =3D new AcknowledgedQueueConsumer(sessionFact=
ory, destination, genId, serviceManager, selector);
- synchronized (timeoutLock)
- {
- queueConsumers.put(genId, consumer);
- serviceManager.getTimeoutTask().add(this, consumer.getId());
- }
+ addConsumer(consumer);
return consumer;
}
=
@@ -206,9 +198,9 @@
// we synchronize just in case a failed request is still processing
synchronized (consumer)
{
- if ( (attributes & ACKNOWLEDGED) > 0)
+ if ((attributes & ACKNOWLEDGED) > 0)
{
- AcknowledgedQueueConsumer ackedConsumer =3D (AcknowledgedQueue=
Consumer)consumer;
+ AcknowledgedQueueConsumer ackedConsumer =3D (AcknowledgedQueue=
Consumer) consumer;
Acknowledgement ack =3D ackedConsumer.getAck();
if (ack =3D=3D null || ack.wasSet())
{
@@ -237,7 +229,7 @@
QueueConsumer consumer =3D queueConsumers.get(consumerId);
if (consumer =3D=3D null)
{
- if ( (attributes & SELECTOR_SET) > 0)
+ if ((attributes & SELECTOR_SET) > 0)
{
=
Response.ResponseBuilder builder =3D Response.status(Response.=
Status.GONE)
@@ -247,40 +239,37 @@
uriBuilder.path(uriInfo.getMatchedURIs().get(1));
serviceManager.getLinkStrategy().setLinkHeader(builder, "pull-=
consumers", "pull-consumers", uriBuilder.build().toString(), null);
throw new WebApplicationException(builder.build());
- =
+
}
- if ( (attributes & ACKNOWLEDGED) > 0)
+ if ((attributes & ACKNOWLEDGED) > 0)
{
QueueConsumer tmp =3D new AcknowledgedQueueConsumer(sessionFac=
tory, destination, consumerId, serviceManager, null);
- consumer =3D addConsumerToMap(consumerId, tmp);
+ consumer =3D addReconnectedConsumerToMap(consumerId, tmp);
=
}
else
{
QueueConsumer tmp =3D new QueueConsumer(sessionFactory, destin=
ation, consumerId, serviceManager, null);
- consumer =3D addConsumerToMap(consumerId, tmp);
+ consumer =3D addReconnectedConsumerToMap(consumerId, tmp);
}
}
return consumer;
}
=
- private QueueConsumer addConsumerToMap(String consumerId, QueueConsumer=
tmp)
+ private QueueConsumer addReconnectedConsumerToMap(String consumerId, Qu=
eueConsumer tmp)
{
- synchronized (timeoutLock)
+ QueueConsumer consumer;
+ consumer =3D queueConsumers.putIfAbsent(consumerId, tmp);
+ if (consumer !=3D null)
{
- QueueConsumer consumer;
- consumer =3D queueConsumers.putIfAbsent(consumerId, tmp);
- if (consumer !=3D null)
- {
- tmp.shutdown();
- }
- else
- {
- consumer =3D tmp;
- serviceManager.getTimeoutTask().add(this, consumer.getId());
- }
- return consumer;
+ tmp.shutdown();
}
+ else
+ {
+ consumer =3D tmp;
+ serviceManager.getTimeoutTask().add(this, consumer.getId());
+ }
+ return consumer;
}
=
=
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu=
eue/PostMessage.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po=
stMessage.java 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po=
stMessage.java 2011-01-04 00:35:39 UTC (rev 10105)
@@ -42,13 +42,16 @@
return startupTime + Long.toString(counter.incrementAndGet());
}
=
- public void publish(HttpHeaders headers, byte[] body, String dup, boole=
an durable) throws Exception
+ public void publish(HttpHeaders headers, byte[] body, String dup,
+ boolean durable,
+ Long expiration,
+ Integer priority) throws Exception
{
Pooled pooled =3D getPooled();
try
{
ClientProducer producer =3D pooled.producer;
- ClientMessage message =3D createHornetQMessage(headers, body, dur=
able, pooled.session);
+ ClientMessage message =3D createHornetQMessage(headers, body, dur=
able, expiration, priority, pooled.session);
message.putStringProperty(ClientMessage.HDR_DUPLICATE_DETECTION_I=
D.toString(), dup);
producer.send(message);
pool.add(pooled);
@@ -69,14 +72,20 @@
=
@PUT
@Path("{id}")
- public Response putWithId(@PathParam("id") String dupId, @QueryParam("d=
urable") Boolean durable, @Context HttpHeaders headers, @Context UriInfo ur=
iInfo, byte[] body)
+ public Response putWithId(@PathParam("id") String dupId, @QueryParam("d=
urable") Boolean durable,
+ @QueryParam("expiration") Long expiration,
+ @QueryParam("priority") Integer priority,
+ @Context HttpHeaders headers, @Context UriInf=
o uriInfo, byte[] body)
{
- return postWithId(dupId, durable, headers, uriInfo, body);
+ return postWithId(dupId, durable, expiration, priority, headers, uri=
Info, body);
}
=
@POST
@Path("{id}")
- public Response postWithId(@PathParam("id") String dupId, @QueryParam("=
durable") Boolean durable, @Context HttpHeaders headers, @Context UriInfo u=
riInfo, byte[] body)
+ public Response postWithId(@PathParam("id") String dupId, @QueryParam("=
durable") Boolean durable,
+ @QueryParam("expiration") Long expiration,
+ @QueryParam("priority") Integer priority,
+ @Context HttpHeaders headers, @Context UriIn=
fo uriInfo, byte[] body)
{
String matched =3D uriInfo.getMatchedURIs().get(1);
UriBuilder nextBuilder =3D uriInfo.getBaseUriBuilder();
@@ -91,7 +100,7 @@
}
try
{
- publish(headers, body, dupId, isDurable);
+ publish(headers, body, dupId, isDurable, expiration, priority);
}
catch (Exception e)
{
@@ -217,10 +226,27 @@
}
=
=
- protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[=
] body, boolean durable, ClientSession session) throws Exception
+ protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[=
] body,
+ boolean durable,
+ Long expiration,
+ Integer priority,
+ ClientSession session) thr=
ows Exception
{
ClientMessage message =3D session.createMessage(Message.BYTES_TYPE, =
durable);
+ if (expiration !=3D null)
+ {
+ message.setExpiration(expiration.longValue());
+ }
+ if (priority !=3D null)
+ {
+ byte p =3D priority.byteValue();
+ if (p >=3D 0 && p <=3D9)
+ {
+ message.setPriority(p);
+ }
+ }
HttpMessageHelper.writeHttpMessage(headers, body, message);
return message;
}
+
}
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/qu=
eue/PostMessageDupsOk.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po=
stMessageDupsOk.java 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/Po=
stMessageDupsOk.java 2011-01-04 00:35:39 UTC (rev 10105)
@@ -23,13 +23,15 @@
public class PostMessageDupsOk extends PostMessage
{
=
- public void publish(HttpHeaders headers, byte[] body, boolean durable) =
throws Exception
+ public void publish(HttpHeaders headers, byte[] body, boolean durable,
+ Long expiration,
+ Integer priority) throws Exception
{
Pooled pooled =3D getPooled();
try
{
ClientProducer producer =3D pooled.producer;
- ClientMessage message =3D createHornetQMessage(headers, body, dur=
able, pooled.session);
+ ClientMessage message =3D createHornetQMessage(headers, body, dur=
able, expiration, priority, pooled.session);
producer.send(message);
pool.add(pooled);
}
@@ -50,6 +52,8 @@
@POST
public Response create(@Context HttpHeaders headers,
@QueryParam("durable") Boolean durable,
+ @QueryParam("expiration") Long expiration,
+ @QueryParam("priority") Integer priority,
@Context UriInfo uriInfo,
byte[] body)
{
@@ -60,7 +64,7 @@
{
isDurable =3D durable.booleanValue();
}
- publish(headers, body, isDurable);
+ publish(headers, body, isDurable, expiration, priority);
}
catch (Exception e)
{
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/to=
pic/SubscriptionsResource.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su=
bscriptionsResource.java 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Su=
bscriptionsResource.java 2011-01-04 00:35:39 UTC (rev 10105)
@@ -82,24 +82,19 @@
this.destination =3D destination;
}
=
- private Object timeoutLock =3D new Object();
=
- @Override
public void testTimeout(String target)
{
- synchronized (timeoutLock)
+ QueueConsumer consumer =3D queueConsumers.get(target);
+ if (consumer =3D=3D null) return;
+ synchronized (consumer)
{
- QueueConsumer consumer =3D queueConsumers.get(target);
- if (consumer =3D=3D null) return;
- synchronized (consumer)
+ if (System.currentTimeMillis() - consumer.getLastPingTime() > con=
sumerTimeoutSeconds * 1000)
{
- if (System.currentTimeMillis() - consumer.getLastPingTime() > =
consumerTimeoutSeconds * 1000)
- {
- log.warn("shutdown REST consumer because of session timeout=
for: " + consumer.getId());
- consumer.shutdown();
- queueConsumers.remove(consumer.getId());
- serviceManager.getTimeoutTask().remove(consumer.getId());
- }
+ log.warn("shutdown REST consumer because of session timeout fo=
r: " + consumer.getId());
+ consumer.shutdown();
+ queueConsumers.remove(consumer.getId());
+ serviceManager.getTimeoutTask().remove(consumer.getId());
}
}
}
@@ -372,28 +367,25 @@
QueueConsumer consumer;
if (subscriptionExists(subscriptionId))
{
- synchronized (timeoutLock)
+ QueueConsumer tmp =3D null;
+ try
{
- QueueConsumer tmp =3D null;
- try
- {
- tmp =3D createConsumer(true, autoAck, subscriptionId, null);
- }
- catch (HornetQException e)
- {
- throw new RuntimeException(e);
- }
- consumer =3D queueConsumers.putIfAbsent(subscriptionId, tmp);
- if (consumer =3D=3D null)
- {
- consumer =3D tmp;
- serviceManager.getTimeoutTask().add(this, subscriptionId);
- }
- else
- {
- tmp.shutdown();
- }
+ tmp =3D createConsumer(true, autoAck, subscriptionId, null);
}
+ catch (HornetQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ consumer =3D queueConsumers.putIfAbsent(subscriptionId, tmp);
+ if (consumer =3D=3D null)
+ {
+ consumer =3D tmp;
+ serviceManager.getTimeoutTask().add(this, subscriptionId);
+ }
+ else
+ {
+ tmp.shutdown();
+ }
}
else
{
Modified: trunk/pom.xml
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/pom.xml 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/pom.xml 2011-01-04 00:35:39 UTC (rev 10105)
@@ -22,7 +22,7 @@
2.2.0.CR1
=
- 2.0.1.GA
+ 2.1.0.GA
=
HornetQ
Modified: trunk/src/Hornetq.iml
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/Hornetq.iml 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/src/Hornetq.iml 2011-01-04 00:35:39 UTC (rev 10105)
@@ -7,28 +7,31 @@
+
+
-
+
-
+
+
+
-
+
-
-
+
@@ -38,6 +41,8 @@
+
+
@@ -47,8 +52,6 @@
-
-
@@ -58,9 +61,6 @@
-
-
-
=
--===============4416222408873919115==--