[hornetq-commits] JBoss hornetq SVN: r10105 - in trunk: hornetq-rest/docbook/reference/en and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jan 3 19:35:39 EST 2011


Author: bill.burke at jboss.com
Date: 2011-01-03 19:35:39 -0500 (Mon, 03 Jan 2011)
New Revision: 10105

Modified:
   trunk/hornetq-rest/docbook/reference/en/master.xml
   trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
   trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
   trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java
   trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
   trunk/pom.xml
   trunk/src/Hornetq.iml
Log:
deadlock bug and expiration/priority support



Modified: trunk/hornetq-rest/docbook/reference/en/master.xml
===================================================================
--- trunk/hornetq-rest/docbook/reference/en/master.xml	2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/docbook/reference/en/master.xml	2011-01-04 00:35:39 UTC (rev 10105)
@@ -605,7 +605,7 @@
         <literal>msg-create</literal> header.</para>
 
         <para><programlisting>POST /queues/jms.queue.bar/create
-Host: example.xom
+Host: example.com
 Content-Type: application/xml
 
 &lt;order&gt;
@@ -687,7 +687,7 @@
           <literal>msg-create</literal> header.</para>
 
           <para><programlisting>POST /queues/jms.queue.bar/create
-Host: example.xom
+Host: example.com
 Content-Type: application/xml
 
 &lt;order&gt;
@@ -799,7 +799,7 @@
       that.</para>
 
       <programlisting>POST /queues/jms.queue.bar/create?durable=true
-Host: example.xom
+Host: example.com
 Content-Type: application/xml
 
 &lt;order&gt;
@@ -809,6 +809,20 @@
 &lt;/order&gt;
 </programlisting>
     </sect1>
+   <sect1>
+     <title>Expiration and Priority</title>
+     <para>You can set he expiration and the priority of the message in the queue or topic by setting an additional query parameter.  The <literal>expiration</literal> query parameter is an integer expressing the time in milliseconds until the message should be expired.  The <literal>priority</literal> is another query parameter with an integer value between 0 and 9 expressing the priority of the message. i.e.:</para>
+      <programlisting>POST /queues/jms.queue.bar/create?expiration=30000&amp;priority=3
+Host: example.com
+Content-Type: application/xml
+
+&lt;order&gt;
+   &lt;name&gt;Bill&lt;/name&gt;
+   &lt;item&gt;iPhone4&lt;/item&gt;
+   &lt;cost&gt;$199.99&lt;/cost&gt;
+&lt;/order&gt;
+</programlisting>
+   </sect1>
   </chapter>
 
   <chapter>

Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java	2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java	2011-01-04 00:35:39 UTC (rev 10105)
@@ -79,24 +79,18 @@
       this.consumerTimeoutSeconds = consumerTimeoutSeconds;
    }
 
-   private Object timeoutLock = new Object();
-
-   @Override
    public void testTimeout(String target)
    {
-      synchronized (timeoutLock)
+      QueueConsumer consumer = queueConsumers.get(target);
+      if (consumer == null) return;
+      synchronized (consumer)
       {
-         QueueConsumer consumer = queueConsumers.get(target);
-         if (consumer == null) return;
-         synchronized (consumer)
+         if (System.currentTimeMillis() - consumer.getLastPingTime() > consumerTimeoutSeconds * 1000)
          {
-            if (System.currentTimeMillis() - consumer.getLastPingTime() > consumerTimeoutSeconds * 1000)
-            {
-               log.warn("shutdown REST consumer because of timeout for: " + consumer.getId());
-               consumer.shutdown();
-               queueConsumers.remove(consumer.getId());
-               serviceManager.getTimeoutTask().remove(consumer.getId());
-            }
+            log.warn("shutdown REST consumer because of timeout for: " + consumer.getId());
+            consumer.shutdown();
+            queueConsumers.remove(consumer.getId());
+            serviceManager.getTimeoutTask().remove(consumer.getId());
          }
       }
    }
@@ -122,7 +116,7 @@
          {
             attributes = attributes | SELECTOR_SET;
          }
-         
+
          if (autoAck)
          {
             consumer = createConsumer(selector);
@@ -141,11 +135,11 @@
 
          if (autoAck)
          {
-            QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment +"/" + consumer.getId(), "-1");
+            QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment + "/" + consumer.getId(), "-1");
          }
          else
          {
-            AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment +"/" + consumer.getId(), "-1");
+            AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment + "/" + consumer.getId(), "-1");
 
          }
          return builder.build();
@@ -160,16 +154,18 @@
       }
    }
 
+   protected void addConsumer(QueueConsumer consumer)
+   {
+      queueConsumers.put(consumer.getId(), consumer);
+      serviceManager.getTimeoutTask().add(this, consumer.getId());
+   }
+
    public QueueConsumer createConsumer(String selector)
            throws HornetQException
    {
       String genId = sessionCounter.getAndIncrement() + "-queue-" + destination + "-" + startup;
       QueueConsumer consumer = new QueueConsumer(sessionFactory, destination, genId, serviceManager, selector);
-      synchronized (timeoutLock)
-      {
-         queueConsumers.put(genId, consumer);
-         serviceManager.getTimeoutTask().add(this, consumer.getId());
-      }
+      addConsumer(consumer);
       return consumer;
    }
 
@@ -178,11 +174,7 @@
    {
       String genId = sessionCounter.getAndIncrement() + "-queue-" + destination + "-" + startup;
       QueueConsumer consumer = new AcknowledgedQueueConsumer(sessionFactory, destination, genId, serviceManager, selector);
-      synchronized (timeoutLock)
-      {
-         queueConsumers.put(genId, consumer);
-         serviceManager.getTimeoutTask().add(this, consumer.getId());
-      }
+      addConsumer(consumer);
       return consumer;
    }
 
@@ -206,9 +198,9 @@
       // we synchronize just in case a failed request is still processing
       synchronized (consumer)
       {
-         if ( (attributes & ACKNOWLEDGED) > 0)
+         if ((attributes & ACKNOWLEDGED) > 0)
          {
-            AcknowledgedQueueConsumer ackedConsumer = (AcknowledgedQueueConsumer)consumer;
+            AcknowledgedQueueConsumer ackedConsumer = (AcknowledgedQueueConsumer) consumer;
             Acknowledgement ack = ackedConsumer.getAck();
             if (ack == null || ack.wasSet())
             {
@@ -237,7 +229,7 @@
       QueueConsumer consumer = queueConsumers.get(consumerId);
       if (consumer == null)
       {
-         if ( (attributes & SELECTOR_SET) > 0)
+         if ((attributes & SELECTOR_SET) > 0)
          {
 
             Response.ResponseBuilder builder = Response.status(Response.Status.GONE)
@@ -247,40 +239,37 @@
             uriBuilder.path(uriInfo.getMatchedURIs().get(1));
             serviceManager.getLinkStrategy().setLinkHeader(builder, "pull-consumers", "pull-consumers", uriBuilder.build().toString(), null);
             throw new WebApplicationException(builder.build());
-            
+
          }
-         if ( (attributes & ACKNOWLEDGED) > 0)
+         if ((attributes & ACKNOWLEDGED) > 0)
          {
             QueueConsumer tmp = new AcknowledgedQueueConsumer(sessionFactory, destination, consumerId, serviceManager, null);
-            consumer = addConsumerToMap(consumerId, tmp);
+            consumer = addReconnectedConsumerToMap(consumerId, tmp);
 
          }
          else
          {
             QueueConsumer tmp = new QueueConsumer(sessionFactory, destination, consumerId, serviceManager, null);
-            consumer = addConsumerToMap(consumerId, tmp);
+            consumer = addReconnectedConsumerToMap(consumerId, tmp);
          }
       }
       return consumer;
    }
 
-   private QueueConsumer addConsumerToMap(String consumerId, QueueConsumer tmp)
+   private QueueConsumer addReconnectedConsumerToMap(String consumerId, QueueConsumer tmp)
    {
-      synchronized (timeoutLock)
+      QueueConsumer consumer;
+      consumer = queueConsumers.putIfAbsent(consumerId, tmp);
+      if (consumer != null)
       {
-         QueueConsumer consumer;
-         consumer = queueConsumers.putIfAbsent(consumerId, tmp);
-         if (consumer != null)
-         {
-            tmp.shutdown();
-         }
-         else
-         {
-            consumer = tmp;
-            serviceManager.getTimeoutTask().add(this, consumer.getId());
-         }
-         return consumer;
+         tmp.shutdown();
       }
+      else
+      {
+         consumer = tmp;
+         serviceManager.getTimeoutTask().add(this, consumer.getId());
+      }
+      return consumer;
    }
 
 

Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java	2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java	2011-01-04 00:35:39 UTC (rev 10105)
@@ -42,13 +42,16 @@
       return startupTime + Long.toString(counter.incrementAndGet());
    }
 
-   public void publish(HttpHeaders headers, byte[] body, String dup, boolean durable) throws Exception
+   public void publish(HttpHeaders headers, byte[] body, String dup,
+                       boolean durable,
+                       Long expiration,
+                       Integer priority) throws Exception
    {
       Pooled pooled = getPooled();
       try
       {
          ClientProducer producer = pooled.producer;
-         ClientMessage message = createHornetQMessage(headers, body, durable, pooled.session);
+         ClientMessage message = createHornetQMessage(headers, body, durable, expiration, priority, pooled.session);
          message.putStringProperty(ClientMessage.HDR_DUPLICATE_DETECTION_ID.toString(), dup);
          producer.send(message);
          pool.add(pooled);
@@ -69,14 +72,20 @@
 
    @PUT
    @Path("{id}")
-   public Response putWithId(@PathParam("id") String dupId, @QueryParam("durable") Boolean durable, @Context HttpHeaders headers, @Context UriInfo uriInfo, byte[] body)
+   public Response putWithId(@PathParam("id") String dupId, @QueryParam("durable") Boolean durable,
+                             @QueryParam("expiration") Long expiration,
+                             @QueryParam("priority") Integer priority,
+                             @Context HttpHeaders headers, @Context UriInfo uriInfo, byte[] body)
    {
-      return postWithId(dupId, durable, headers, uriInfo, body);
+      return postWithId(dupId, durable, expiration, priority, headers, uriInfo, body);
    }
 
    @POST
    @Path("{id}")
-   public Response postWithId(@PathParam("id") String dupId, @QueryParam("durable") Boolean durable, @Context HttpHeaders headers, @Context UriInfo uriInfo, byte[] body)
+   public Response postWithId(@PathParam("id") String dupId, @QueryParam("durable") Boolean durable,
+                              @QueryParam("expiration") Long expiration,
+                              @QueryParam("priority") Integer priority,
+                              @Context HttpHeaders headers, @Context UriInfo uriInfo, byte[] body)
    {
       String matched = uriInfo.getMatchedURIs().get(1);
       UriBuilder nextBuilder = uriInfo.getBaseUriBuilder();
@@ -91,7 +100,7 @@
       }
       try
       {
-         publish(headers, body, dupId, isDurable);
+         publish(headers, body, dupId, isDurable, expiration, priority);
       }
       catch (Exception e)
       {
@@ -217,10 +226,27 @@
    }
 
 
-   protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[] body, boolean durable, ClientSession session) throws Exception
+   protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[] body,
+                                                boolean durable,
+                                                Long expiration,
+                                                Integer priority,
+                                                ClientSession session) throws Exception
    {
       ClientMessage message = session.createMessage(Message.BYTES_TYPE, durable);
+      if (expiration != null)
+      {
+         message.setExpiration(expiration.longValue());
+      }
+      if (priority != null)
+      {
+         byte p = priority.byteValue();
+         if (p >= 0 && p <=9)
+         {
+            message.setPriority(p);
+         }
+      }
       HttpMessageHelper.writeHttpMessage(headers, body, message);
       return message;
    }
+
 }

Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java	2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java	2011-01-04 00:35:39 UTC (rev 10105)
@@ -23,13 +23,15 @@
 public class PostMessageDupsOk extends PostMessage
 {
 
-   public void publish(HttpHeaders headers, byte[] body, boolean durable) throws Exception
+   public void publish(HttpHeaders headers, byte[] body, boolean durable,
+                       Long expiration,
+                       Integer priority) throws Exception
    {
       Pooled pooled = getPooled();
       try
       {
          ClientProducer producer = pooled.producer;
-         ClientMessage message = createHornetQMessage(headers, body, durable, pooled.session);
+         ClientMessage message = createHornetQMessage(headers, body, durable, expiration, priority, pooled.session);
          producer.send(message);
          pool.add(pooled);
       }
@@ -50,6 +52,8 @@
    @POST
    public Response create(@Context HttpHeaders headers,
                           @QueryParam("durable") Boolean durable,
+                          @QueryParam("expiration") Long expiration,
+                          @QueryParam("priority") Integer priority,
                           @Context UriInfo uriInfo,
                           byte[] body)
    {
@@ -60,7 +64,7 @@
          {
             isDurable = durable.booleanValue();
          }
-         publish(headers, body, isDurable);
+         publish(headers, body, isDurable, expiration, priority);
       }
       catch (Exception e)
       {

Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java	2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java	2011-01-04 00:35:39 UTC (rev 10105)
@@ -82,24 +82,19 @@
       this.destination = destination;
    }
 
-   private Object timeoutLock = new Object();
 
-   @Override
    public void testTimeout(String target)
    {
-      synchronized (timeoutLock)
+      QueueConsumer consumer = queueConsumers.get(target);
+      if (consumer == null) return;
+      synchronized (consumer)
       {
-         QueueConsumer consumer = queueConsumers.get(target);
-         if (consumer == null) return;
-         synchronized (consumer)
+         if (System.currentTimeMillis() - consumer.getLastPingTime() > consumerTimeoutSeconds * 1000)
          {
-            if (System.currentTimeMillis() - consumer.getLastPingTime() > consumerTimeoutSeconds * 1000)
-            {
-               log.warn("shutdown REST consumer because of session timeout for: " + consumer.getId());
-               consumer.shutdown();
-               queueConsumers.remove(consumer.getId());
-               serviceManager.getTimeoutTask().remove(consumer.getId());
-            }
+            log.warn("shutdown REST consumer because of session timeout for: " + consumer.getId());
+            consumer.shutdown();
+            queueConsumers.remove(consumer.getId());
+            serviceManager.getTimeoutTask().remove(consumer.getId());
          }
       }
    }
@@ -372,28 +367,25 @@
       QueueConsumer consumer;
       if (subscriptionExists(subscriptionId))
       {
-         synchronized (timeoutLock)
+         QueueConsumer tmp = null;
+         try
          {
-            QueueConsumer tmp = null;
-            try
-            {
-               tmp = createConsumer(true, autoAck, subscriptionId, null);
-            }
-            catch (HornetQException e)
-            {
-               throw new RuntimeException(e);
-            }
-            consumer = queueConsumers.putIfAbsent(subscriptionId, tmp);
-            if (consumer == null)
-            {
-               consumer = tmp;
-               serviceManager.getTimeoutTask().add(this, subscriptionId);
-            }
-            else
-            {
-               tmp.shutdown();
-            }
+            tmp = createConsumer(true, autoAck, subscriptionId, null);
          }
+         catch (HornetQException e)
+         {
+            throw new RuntimeException(e);
+         }
+         consumer = queueConsumers.putIfAbsent(subscriptionId, tmp);
+         if (consumer == null)
+         {
+            consumer = tmp;
+            serviceManager.getTimeoutTask().add(this, subscriptionId);
+         }
+         else
+         {
+            tmp.shutdown();
+         }
       }
       else
       {

Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml	2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/pom.xml	2011-01-04 00:35:39 UTC (rev 10105)
@@ -22,7 +22,7 @@
    <version>2.2.0.CR1</version>
 
     <properties>
-        <resteasy.version>2.0.1.GA</resteasy.version>
+        <resteasy.version>2.1.0.GA</resteasy.version>
     </properties>
 
    <name>HornetQ</name>

Modified: trunk/src/Hornetq.iml
===================================================================
--- trunk/src/Hornetq.iml	2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/src/Hornetq.iml	2011-01-04 00:35:39 UTC (rev 10105)
@@ -7,28 +7,31 @@
     </content>
     <orderEntry type="inheritedJdk" />
     <orderEntry type="sourceFolder" forTests="false" />
+    <orderEntry type="library" name="jars" level="project" />
+    <orderEntry type="library" name="lib" level="project" />
     <orderEntry type="library" name="lib10" level="project" />
-    <orderEntry type="library" name="lib13" level="project" />
+    <orderEntry type="library" name="lib3" level="project" />
     <orderEntry type="module-library">
       <library>
         <CLASSES>
-          <root url="jar://$MODULE_DIR$/../thirdparty/org/jboss/metadata/lib/jboss-metadata.jar!/" />
+          <root url="jar://$MODULE_DIR$/../thirdparty/org/twitter4j/lib/twitter4j-core.jar!/" />
         </CLASSES>
         <JAVADOC />
         <SOURCES />
       </library>
     </orderEntry>
+    <orderEntry type="library" name="lib4" level="project" />
+    <orderEntry type="library" name="lib7" level="project" />
     <orderEntry type="module-library">
       <library>
         <CLASSES>
-          <root url="jar://$MODULE_DIR$/../thirdparty/org/twitter4j/lib/twitter4j-core.jar!/" />
+          <root url="jar://$MODULE_DIR$/../thirdparty/org/jboss/metadata/lib/jboss-metadata.jar!/" />
         </CLASSES>
         <JAVADOC />
         <SOURCES />
       </library>
     </orderEntry>
-    <orderEntry type="library" name="lib3" level="project" />
-    <orderEntry type="library" name="lib5" level="project" />
+    <orderEntry type="library" name="lib2" level="project" />
     <orderEntry type="module-library">
       <library>
         <CLASSES>
@@ -38,6 +41,8 @@
         <SOURCES />
       </library>
     </orderEntry>
+    <orderEntry type="library" name="lib5" level="project" />
+    <orderEntry type="library" name="lib13" level="project" />
     <orderEntry type="module-library">
       <library>
         <CLASSES>
@@ -47,8 +52,6 @@
         <SOURCES />
       </library>
     </orderEntry>
-    <orderEntry type="library" name="jars" level="project" />
-    <orderEntry type="library" name="lib6" level="project" />
     <orderEntry type="module-library">
       <library>
         <CLASSES>
@@ -58,9 +61,6 @@
         <SOURCES />
       </library>
     </orderEntry>
-    <orderEntry type="library" name="lib9" level="project" />
-    <orderEntry type="library" name="lib4" level="project" />
-    <orderEntry type="library" name="lib" level="project" />
   </component>
 </module>
 



More information about the hornetq-commits mailing list