Author: clebert.suconic
Date: 2011-10-11 22:27:00 -0400 (Tue, 11 Oct 2011)
New Revision: 11517
Modified:
trunk/hornetq-rest/examples/javascript-chat/src/main/webapp/index.html
trunk/hornetq-rest/examples/jms-to-rest/src/main/java/JmsHelper.java
trunk/hornetq-rest/examples/push/src/main/java/JmsHelper.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceConfiguration.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/DestinationServiceManager.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueDestinationsResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumerResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushStrategy.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/PushSubscriptionsResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Subscription.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/TopicDestinationsResource.java
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PushQueueConsumerTest.java
Log:
Reverting my commits to rest
Modified: trunk/hornetq-rest/examples/javascript-chat/src/main/webapp/index.html
===================================================================
--- trunk/hornetq-rest/examples/javascript-chat/src/main/webapp/index.html 2011-10-12
02:08:03 UTC (rev 11516)
+++ trunk/hornetq-rest/examples/javascript-chat/src/main/webapp/index.html 2011-10-12
02:27:00 UTC (rev 11517)
@@ -50,12 +50,17 @@
{
if (xhr.status == 200)
{
+ // getting the links from the rest resource
topicSender = xhr.getResponseHeader("msg-create");
subscriptions =
xhr.getResponseHeader("msg-pull-subscriptions");
+
+ // just adding the report
document.getElementById("errors").innerHTML =
"Subscriptions URL: " + subscriptions;
}
}
}
+
+ // this will send the request from javascript
xhr.send(null);
}
@@ -162,4 +167,4 @@
</pre>
</body>
-</html>
\ No newline at end of file
+</html>
Modified: trunk/hornetq-rest/examples/jms-to-rest/src/main/java/JmsHelper.java
===================================================================
--- trunk/hornetq-rest/examples/jms-to-rest/src/main/java/JmsHelper.java 2011-10-12
02:08:03 UTC (rev 11516)
+++ trunk/hornetq-rest/examples/jms-to-rest/src/main/java/JmsHelper.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -1,9 +1,11 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -20,8 +22,7 @@
config.setConfigurationUrl(configFile);
config.start();
TransportConfiguration transport =
config.getConnectorConfigurations().get("netty-connector");
- ClientSessionFactory factory =
HornetQClient.createClientSessionFactory(transport);
- return new HornetQConnectionFactory(factory);
+ return new
HornetQJMSConnectionFactory(HornetQClient.createServerLocatorWithoutHA(transport));
}
Modified: trunk/hornetq-rest/examples/push/src/main/java/JmsHelper.java
===================================================================
--- trunk/hornetq-rest/examples/push/src/main/java/JmsHelper.java 2011-10-12 02:08:03 UTC
(rev 11516)
+++ trunk/hornetq-rest/examples/push/src/main/java/JmsHelper.java 2011-10-12 02:27:00 UTC
(rev 11517)
@@ -4,6 +4,7 @@
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -20,9 +21,7 @@
config.setConfigurationUrl(configFile);
config.start();
TransportConfiguration transport =
config.getConnectorConfigurations().get("netty-connector");
- ClientSessionFactory factory =
HornetQClient.createClientSessionFactory(transport);
- return new HornetQConnectionFactory(factory);
+ return new
HornetQJMSConnectionFactory(HornetQClient.createServerLocatorWithoutHA(transport));
}
-
}
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceConfiguration.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceConfiguration.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceConfiguration.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -11,6 +11,7 @@
public class MessageServiceConfiguration
{
private int producerSessionPoolSize = 10;
+ private long producerTimeToLive = -1;
private int timeoutTaskInterval = 1;
private int consumerSessionTimeoutSeconds = 300;
private int consumerWindowSize = -1;
@@ -87,6 +88,17 @@
this.queuePushStoreDirectory = queuePushStoreDirectory;
}
+ @XmlElement(name="producer-time-to-live")
+ public long getProducerTimeToLive()
+ {
+ return producerTimeToLive;
+ }
+
+ public void setProducerTimeToLive(long producerTimeToLive)
+ {
+ this.producerTimeToLive = producerTimeToLive;
+ }
+
@XmlElement(name = "producer-session-pool-size")
public int getProducerSessionPoolSize()
{
@@ -130,4 +142,5 @@
{
this.consumerWindowSize = consumerWindowSize;
}
+
}
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -33,7 +33,7 @@
{
protected ExecutorService threadPool;
protected QueueServiceManager queueManager = new QueueServiceManager();
- protected TopicServiceManager topicManager = new TopicServiceManager();
+ protected TopicServiceManager topicManager = new TopicServiceManager();
protected TimeoutTask timeoutTask;
protected int timeoutTaskInterval = 1;
protected MessageServiceConfiguration configuration = new
MessageServiceConfiguration();
@@ -175,6 +175,7 @@
queueManager.setDefaultSettings(defaultSettings);
queueManager.setPushStoreFile(configuration.getQueuePushStoreDirectory());
queueManager.setProducerPoolSize(configuration.getProducerSessionPoolSize());
+ queueManager.setProducerTimeToLive(configuration.getProducerTimeToLive());
queueManager.setLinkStrategy(linkStrategy);
queueManager.setRegistry(registry);
@@ -186,6 +187,7 @@
topicManager.setDefaultSettings(defaultSettings);
topicManager.setPushStoreFile(configuration.getTopicPushStoreDirectory());
topicManager.setProducerPoolSize(configuration.getProducerSessionPoolSize());
+ queueManager.setProducerTimeToLive(configuration.getProducerTimeToLive());
topicManager.setLinkStrategy(linkStrategy);
topicManager.setRegistry(registry);
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/DestinationServiceManager.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/DestinationServiceManager.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/DestinationServiceManager.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -24,6 +24,7 @@
protected DestinationSettings defaultSettings = DestinationSettings.defaultSettings;
protected TimeoutTask timeoutTask;
protected int producerPoolSize;
+ protected long producerTimeToLive;
protected LinkStrategy linkStrategy;
protected BindingRegistry registry;
@@ -47,7 +48,16 @@
this.linkStrategy = linkStrategy;
}
+ public long getProducerTimeToLive()
+ {
+ return producerTimeToLive;
+ }
+ public void setProducerTimeToLive(long producerTimeToLive)
+ {
+ this.producerTimeToLive = producerTimeToLive;
+ }
+
public int getProducerPoolSize()
{
return producerPoolSize;
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -36,7 +36,23 @@
protected DestinationServiceManager serviceManager;
private AtomicLong counter = new AtomicLong(1);
private final String startupTime = Long.toString(System.currentTimeMillis());
+ protected long producerTimeToLive;
+ protected static class Pooled
+ {
+ public ClientSession session;
+ public ClientProducer producer;
+
+ private Pooled(ClientSession session, ClientProducer producer)
+ {
+ this.session = session;
+ this.producer = producer;
+ }
+ }
+
+ protected ArrayBlockingQueue<Pooled> pool;
+ protected int poolSize = 10;
+
protected String generateDupId()
{
return startupTime + Long.toString(counter.incrementAndGet());
@@ -44,6 +60,7 @@
public void publish(HttpHeaders headers, byte[] body, String dup,
boolean durable,
+ Long ttl,
Long expiration,
Integer priority) throws Exception
{
@@ -51,7 +68,7 @@
try
{
ClientProducer producer = pooled.producer;
- ClientMessage message = createHornetQMessage(headers, body, durable, expiration,
priority, pooled.session);
+ ClientMessage message = createHornetQMessage(headers, body, durable, ttl,
expiration, priority, pooled.session);
message.putStringProperty(ClientMessage.HDR_DUPLICATE_DETECTION_ID.toString(),
dup);
producer.send(message);
pool.add(pooled);
@@ -73,16 +90,18 @@
@PUT
@Path("{id}")
public Response putWithId(@PathParam("id") String dupId,
@QueryParam("durable") Boolean durable,
+ @QueryParam("ttl") Long ttl,
@QueryParam("expiration") Long expiration,
@QueryParam("priority") Integer priority,
@Context HttpHeaders headers, @Context UriInfo uriInfo,
byte[] body)
{
- return postWithId(dupId, durable, expiration, priority, headers, uriInfo, body);
+ return postWithId(dupId, durable, ttl, expiration, priority, headers, uriInfo,
body);
}
@POST
@Path("{id}")
public Response postWithId(@PathParam("id") String dupId,
@QueryParam("durable") Boolean durable,
+ @QueryParam("ttl") Long ttl,
@QueryParam("expiration") Long expiration,
@QueryParam("priority") Integer priority,
@Context HttpHeaders headers, @Context UriInfo uriInfo,
byte[] body)
@@ -100,7 +119,7 @@
}
try
{
- publish(headers, body, dupId, isDurable, expiration, priority);
+ publish(headers, body, dupId, isDurable, ttl, expiration, priority);
}
catch (Exception e)
{
@@ -115,22 +134,16 @@
return builder.build();
}
+ public long getProducerTimeToLive()
+ {
+ return producerTimeToLive;
+ }
- protected static class Pooled
+ public void setProducerTimeToLive(long producerTimeToLive)
{
- public ClientSession session;
- public ClientProducer producer;
-
- private Pooled(ClientSession session, ClientProducer producer)
- {
- this.session = session;
- this.producer = producer;
- }
+ this.producerTimeToLive = producerTimeToLive;
}
- protected ArrayBlockingQueue<Pooled> pool;
- protected int poolSize = 10;
-
public DestinationServiceManager getServiceManager()
{
return serviceManager;
@@ -228,6 +241,7 @@
protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[] body,
boolean durable,
+ Long ttl,
Long expiration,
Integer priority,
ClientSession session) throws Exception
@@ -237,6 +251,14 @@
{
message.setExpiration(expiration.longValue());
}
+ else if (ttl != null)
+ {
+ message.setExpiration(System.currentTimeMillis() + ttl.longValue());
+ }
+ else if (producerTimeToLive > 0)
+ {
+ message.setExpiration(System.currentTimeMillis() + producerTimeToLive);
+ }
if (priority != null)
{
byte p = priority.byteValue();
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -24,6 +24,7 @@
{
public void publish(HttpHeaders headers, byte[] body, boolean durable,
+ Long ttl,
Long expiration,
Integer priority) throws Exception
{
@@ -31,7 +32,7 @@
try
{
ClientProducer producer = pooled.producer;
- ClientMessage message = createHornetQMessage(headers, body, durable, expiration,
priority, pooled.session);
+ ClientMessage message = createHornetQMessage(headers, body, durable, ttl,
expiration, priority, pooled.session);
producer.send(message);
pool.add(pooled);
}
@@ -52,6 +53,7 @@
@POST
public Response create(@Context HttpHeaders headers,
@QueryParam("durable") Boolean durable,
+ @QueryParam("ttl") Long ttl,
@QueryParam("expiration") Long expiration,
@QueryParam("priority") Integer priority,
@Context UriInfo uriInfo,
@@ -64,7 +66,7 @@
{
isDurable = durable.booleanValue();
}
- publish(headers, body, isDurable, expiration, priority);
+ publish(headers, body, isDurable, ttl, expiration, priority);
}
catch (Exception e)
{
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -139,12 +139,6 @@
return checkIndexAndPoll(wait, info, info.getMatchedURIs().get(1), index);
}
- public synchronized Response runPoll(long wait, UriInfo info, String basePath)
- {
- ping();
- return pollWithIndex(wait, info, basePath, -1);
- }
-
protected Response checkIndexAndPoll(long wait, UriInfo info, String basePath, long
index)
{
ping();
@@ -167,7 +161,14 @@
}
- return pollWithIndex(wait, info, basePath, index);
+ try
+ {
+ return pollWithIndex(wait, info, basePath, index);
+ }
+ finally
+ {
+ ping(); // ping again as we don't want wait time included in timeout.
+ }
}
protected Response pollWithIndex(long wait, UriInfo info, String basePath, long
index)
@@ -197,7 +198,7 @@
protected void createSession()
throws HornetQException
{
- session = factory.createSession(true, true);
+ session = factory.createSession(true, true, 0);
if (selector == null)
{
consumer = session.createConsumer(destination);
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueDestinationsResource.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueDestinationsResource.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueDestinationsResource.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -204,6 +204,7 @@
sender.setDestination(queueName);
sender.setSessionFactory(manager.getSessionFactory());
sender.setPoolSize(manager.getProducerPoolSize());
+ sender.setProducerTimeToLive(manager.getProducerTimeToLive());
sender.init();
queueResource.setSender(sender);
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -17,20 +17,22 @@
public class PushConsumer implements MessageHandler
{
private static final Logger log = Logger.getLogger(PushConsumer.class);
- private PushRegistration registration;
+ protected PushRegistration registration;
protected ClientSessionFactory factory;
protected ClientSession session;
protected ClientConsumer consumer;
protected String destination;
protected String id;
protected PushStrategy strategy;
+ protected PushStore store;
- public PushConsumer(ClientSessionFactory factory, String destination, String id,
PushRegistration registration)
+ public PushConsumer(ClientSessionFactory factory, String destination, String id,
PushRegistration registration, PushStore store)
{
this.factory = factory;
this.destination = destination;
this.id = id;
this.registration = registration;
+ this.store = store;
}
public PushRegistration getRegistration()
@@ -68,7 +70,7 @@
strategy.setRegistration(registration);
strategy.start();
- session = factory.createSession(false, false);
+ session = factory.createSession(false, false, 0);
if (registration.getSelector() != null)
{
consumer = session.createConsumer(destination,
SelectorTranslator.convertToHornetQFilterString(registration.getSelector()));
@@ -108,24 +110,64 @@
}
}
+ public void disableFromFailure()
+ {
+ registration.setEnabled(false);
+ try
+ {
+ if (registration.isDurable()) store.update(registration);
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ }
+ stop();
+ }
+
@Override
public void onMessage(ClientMessage clientMessage)
{
- if (strategy.push(clientMessage) == false)
+
+ try
{
- throw new RuntimeException("Failed to push message to " +
registration.getTarget());
+ clientMessage.acknowledge();
}
- else
+ catch (HornetQException e)
{
+ throw new RuntimeException(e.getMessage(), e);
+ }
+
+ boolean acknowledge = strategy.push(clientMessage);
+
+ if (acknowledge)
+ {
try
{
log.debug("Acknowledging: " + clientMessage.getMessageID());
- clientMessage.acknowledge();
+ session.commit();
+ return;
}
catch (HornetQException e)
{
throw new RuntimeException(e);
}
}
+ else
+ {
+ try
+ {
+ session.rollback();
+ }
+ catch (HornetQException e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ if (registration.isDisableOnFailure())
+ {
+ log.error("Failed to push message to " + registration.getTarget() +
" disabling push registration...");
+ disableFromFailure();
+ return;
+ }
+ }
}
}
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumerResource.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumerResource.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumerResource.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -57,7 +57,8 @@
public void addRegistration(PushRegistration reg) throws Exception
{
- PushConsumer consumer = new PushConsumer(sessionFactory, destination, reg.getId(),
reg);
+ if (reg.isEnabled() == false) return;
+ PushConsumer consumer = new PushConsumer(sessionFactory, destination, reg.getId(),
reg, pushStore);
consumer.start();
consumers.put(reg.getId(), consumer);
}
@@ -72,7 +73,7 @@
String genId = sessionCounter.getAndIncrement() + "-" + startup;
registration.setId(genId);
registration.setDestination(destination);
- PushConsumer consumer = new PushConsumer(sessionFactory, destination, genId,
registration);
+ PushConsumer consumer = new PushConsumer(sessionFactory, destination, genId,
registration, pushStore);
try
{
consumer.start();
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushStrategy.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushStrategy.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushStrategy.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -9,6 +9,14 @@
*/
public interface PushStrategy
{
+ /**
+ * Return false if unable to connect. Push consumer may be disabled if configured to
do so when unable to connect.
+ * Throw an exception if the message sent was unaccepted by the receiver.
Hornetq's retry and dead letter logic
+ * will take over from there.
+ *
+ * @param message
+ * @return
+ */
public boolean push(ClientMessage message);
public void setRegistration(PushRegistration reg);
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -69,9 +69,9 @@
public boolean push(ClientMessage message)
{
String uri = createUri(message);
- for (int i = 0; i < 3; i++)
+ for (int i = 0; i < registration.getMaxRetries(); i++)
{
- int wait = 0;
+ long wait = registration.getRetryWaitMillis();
ClientRequest request = executor.createRequest(uri);
request.followRedirects(false);
@@ -85,31 +85,67 @@
{
log.debug(method + " " + uri);
res = request.httpMethod(method);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- if (res.getStatus() == 503)
- {
- String retryAfter = (String)
res.getHeaders().getFirst("Retry-After");
- if (retryAfter != null)
+ int status = res.getStatus();
+ if (status == 503)
{
- wait = Integer.parseInt(retryAfter);
+ String retryAfter = (String)
res.getHeaders().getFirst("Retry-After");
+ if (retryAfter != null)
+ {
+ wait = Long.parseLong(retryAfter) * 1000;
+ }
}
+ else if (status == 307)
+ {
+ uri = res.getLocation().getHref();
+ wait = 0;
+ }
+ else if ((status >= 200 && status < 299) || status == 303 ||
status == 304)
+ {
+ log.debug("Success");
+ return true;
+ }
+ else if (status >= 400)
+ {
+ switch (status)
+ {
+ case 400: // these usually mean the message you are trying to send is
crap, let dead letter logic take over
+ case 411:
+ case 412:
+ case 413:
+ case 414:
+ case 415:
+ case 416:
+ throw new RuntimeException("Something is wrong with the
message, status returned: " + status + " for push registration of URI: " +
uri);
+ case 401: // might as well consider these critical failures and abort.
Immediately signal to disable push registration depending on config
+ case 402:
+ case 403:
+ case 405:
+ case 406:
+ case 407:
+ case 417:
+ case 505:
+ return false;
+ case 404: // request timeout, gone, and not found treat as a retry
+ case 408:
+ case 409:
+ case 410:
+ break;
+ default: // all 50x requests just retry (except 505)
+ break;
+ }
+ }
}
- else if (res.getStatus() == 307)
+ catch (Exception e)
{
- uri = res.getLocation().getHref();
+ //throw new RuntimeException(e);
}
- else if ((res.getStatus() >= 200 && res.getStatus() < 299) ||
res.getStatus() == 303 || res.getStatus() == 304)
+ try
{
- log.debug("Success");
- return true;
+ if (wait > 0) Thread.sleep(wait);
}
- else
+ catch (InterruptedException e)
{
- throw new RuntimeException("failed to push message to: " + uri +
" status code: " + res.getStatus());
+ throw new RuntimeException("Interrupted");
}
}
return false;
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -18,7 +18,7 @@
*/
@XmlRootElement(name = "push-registration")
@XmlAccessorType(XmlAccessType.PROPERTY)
-@XmlType(propOrder = {"destination", "durable", "selector",
"target", "authenticationMechanism", "headers"})
+@XmlType(propOrder = {"enabled", "destination", "durable",
"selector", "target", "maxRetries",
"retryWaitMillis", "disableOnFailure",
"authenticationMechanism", "headers"})
public class PushRegistration implements Serializable
{
private String id;
@@ -29,7 +29,55 @@
private String destination;
private Object loadedFrom;
private String selector;
+ private long retryWaitMillis = 1000;
+ private boolean disableOnFailure;
+ private int maxRetries = 10;
+ private boolean enabled = true;
+ @XmlElement
+ public int getMaxRetries()
+ {
+ return maxRetries;
+ }
+
+ public void setMaxRetries(int maxRetries)
+ {
+ this.maxRetries = maxRetries;
+ }
+
+ @XmlElement
+ public long getRetryWaitMillis()
+ {
+ return retryWaitMillis;
+ }
+
+ public void setRetryWaitMillis(long retryWaitMillis)
+ {
+ this.retryWaitMillis = retryWaitMillis;
+ }
+
+ @XmlElement
+ public boolean isDisableOnFailure()
+ {
+ return disableOnFailure;
+ }
+
+ public void setDisableOnFailure(boolean disableOnFailure)
+ {
+ this.disableOnFailure = disableOnFailure;
+ }
+
+ @XmlElement
+ public boolean isEnabled()
+ {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled)
+ {
+ this.enabled = enabled;
+ }
+
@XmlTransient
public Object getLoadedFrom()
{
@@ -121,11 +169,17 @@
public String toString()
{
return "PushRegistration{" +
- "durable=" + durable +
+ "id='" + id + '\'' +
+ ", durable=" + durable +
", target=" + target +
", authenticationMechanism=" + authenticationMechanism +
", headers=" + headers +
", destination='" + destination + '\'' +
+ ", selector='" + selector + '\'' +
+ ", retryWaitMillis=" + retryWaitMillis +
+ ", disableOnFailure=" + disableOnFailure +
+ ", maxRetries=" + maxRetries +
+ ", enabled=" + enabled +
'}';
}
}
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -12,11 +12,15 @@
public class AcknowledgedSubscriptionResource extends AcknowledgedQueueConsumer
implements Subscription
{
private boolean durable;
+ private long timeout;
+ private boolean deleteWhenIdle;
- public AcknowledgedSubscriptionResource(ClientSessionFactory factory, String
destination, String id, DestinationServiceManager serviceManager, String selector)
+ public AcknowledgedSubscriptionResource(ClientSessionFactory factory, String
destination, String id, DestinationServiceManager serviceManager, String selector, boolean
durable, Long timeout)
throws HornetQException
{
super(factory, destination, id, serviceManager, selector);
+ this.timeout = timeout;
+ this.durable = durable;
}
public boolean isDurable()
@@ -29,4 +33,23 @@
this.durable = durable;
}
+ public long getTimeout()
+ {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout)
+ {
+ this.timeout = timeout;
+ }
+
+ public boolean isDeleteWhenIdle()
+ {
+ return deleteWhenIdle;
+ }
+
+ public void setDeleteWhenIdle(boolean deleteWhenIdle)
+ {
+ this.deleteWhenIdle = deleteWhenIdle;
+ }
}
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/PushSubscriptionsResource.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/PushSubscriptionsResource.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/PushSubscriptionsResource.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -27,7 +27,7 @@
*/
public class PushSubscriptionsResource
{
- protected Map<String, PushConsumer> consumers = new ConcurrentHashMap<String,
PushConsumer>();
+ protected Map<String, PushSubscription> consumers = new
ConcurrentHashMap<String, PushSubscription>();
protected ClientSessionFactory sessionFactory;
protected String destination;
protected final String startup = Long.toString(System.currentTimeMillis());
@@ -81,6 +81,7 @@
public void addRegistration(PushTopicRegistration reg) throws Exception
{
+ if (reg.isEnabled() == false) return;
String destination = reg.getDestination();
ClientSession session = sessionFactory.createSession(false, false, false);
ClientSession.QueueQuery query = session.queueQuery(new
SimpleString(destination));
@@ -89,7 +90,7 @@
{
createSession = createSubscription(destination, reg.isDurable());
}
- PushConsumer consumer = new PushConsumer(sessionFactory, reg.getDestination(),
reg.getId(), reg);
+ PushSubscription consumer = new PushSubscription(sessionFactory,
reg.getDestination(), reg.getId(), reg, pushStore);
try
{
consumer.start();
@@ -139,7 +140,7 @@
ClientSession createSession = createSubscription(genId, registration.isDurable());
try
{
- PushConsumer consumer = new PushConsumer(sessionFactory, genId, genId,
registration);
+ PushSubscription consumer = new PushSubscription(sessionFactory, genId, genId,
registration, pushStore);
try
{
consumer.start();
@@ -191,7 +192,7 @@
deleteSubscriberQueue(consumer);
}
- public Map<String, PushConsumer> getConsumers()
+ public Map<String, PushSubscription> getConsumers()
{
return consumers;
}
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Subscription.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Subscription.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/Subscription.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -9,4 +9,12 @@
boolean isDurable();
void setDurable(boolean isDurable);
+
+ long getTimeout();
+
+ void setTimeout(long timeout);
+
+ boolean isDeleteWhenIdle();
+
+ void setDeleteWhenIdle(boolean deleteWhenIdle);
}
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -11,12 +11,16 @@
*/
public class SubscriptionResource extends QueueConsumer implements Subscription
{
- boolean durable;
+ protected boolean durable;
+ protected long timeout;
+ private boolean deleteWhenIdle;
- public SubscriptionResource(ClientSessionFactory factory, String destination, String
id, DestinationServiceManager serviceManager, String selector)
+ public SubscriptionResource(ClientSessionFactory factory, String destination, String
id, DestinationServiceManager serviceManager, String selector, boolean durable, long
timeout)
throws HornetQException
{
super(factory, destination, id, serviceManager, selector);
+ this.durable = durable;
+ this.timeout = timeout;
}
public boolean isDurable()
@@ -28,4 +32,24 @@
{
this.durable = durable;
}
+
+ public long getTimeout()
+ {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout)
+ {
+ this.timeout = timeout;
+ }
+
+ public boolean isDeleteWhenIdle()
+ {
+ return deleteWhenIdle;
+ }
+
+ public void setDeleteWhenIdle(boolean deleteWhenIdle)
+ {
+ this.deleteWhenIdle = deleteWhenIdle;
+ }
}
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -86,15 +86,17 @@
public void testTimeout(String target)
{
QueueConsumer consumer = queueConsumers.get(target);
+ Subscription subscription = (Subscription)consumer;
if (consumer == null) return;
synchronized (consumer)
{
- if (System.currentTimeMillis() - consumer.getLastPingTime() >
consumerTimeoutSeconds * 1000)
+ if (System.currentTimeMillis() - consumer.getLastPingTime() >
subscription.getTimeout())
{
- log.warn("shutdown REST consumer because of session timeout for: "
+ consumer.getId());
+ log.warn("shutdown REST subscription because of session timeout for:
" + consumer.getId());
consumer.shutdown();
queueConsumers.remove(consumer.getId());
serviceManager.getTimeoutTask().remove(consumer.getId());
+ if (subscription.isDeleteWhenIdle()) deleteSubscriberQueue(consumer);
}
}
}
@@ -124,8 +126,15 @@
@FormParam("autoAck")
@DefaultValue("true") boolean autoAck,
@FormParam("name") String
subscriptionName,
@FormParam("selector") String selector,
+ @FormParam("delete-when-idle") Boolean
destroyWhenIdle,
+ @FormParam("idle-timeout") Long timeout,
@Context UriInfo uriInfo)
{
+
+ if (timeout == null) timeout = new Long(consumerTimeoutSeconds * 1000);
+ boolean deleteWhenIdle = !durable; // default is true if non-durable
+ if (destroyWhenIdle != null) deleteWhenIdle = destroyWhenIdle.booleanValue();
+
if (subscriptionName != null)
{
// see if this is a reconnect
@@ -182,7 +191,7 @@
session.createTemporaryQueue(destination, subscriptionName);
}
}
- QueueConsumer consumer = createConsumer(durable, autoAck, subscriptionName,
selector);
+ QueueConsumer consumer = createConsumer(durable, autoAck, subscriptionName,
selector, timeout, deleteWhenIdle);
queueConsumers.put(consumer.getId(), consumer);
serviceManager.getTimeoutTask().add(this, consumer.getId());
@@ -222,20 +231,22 @@
}
}
- protected QueueConsumer createConsumer(boolean durable, boolean autoAck, String
subscriptionName, String selector)
+ protected QueueConsumer createConsumer(boolean durable, boolean autoAck, String
subscriptionName, String selector, long timeout, boolean deleteWhenIdle)
throws HornetQException
{
QueueConsumer consumer;
if (autoAck)
{
- SubscriptionResource subscription = new SubscriptionResource(sessionFactory,
subscriptionName, subscriptionName, serviceManager, selector);
+ SubscriptionResource subscription = new SubscriptionResource(sessionFactory,
subscriptionName, subscriptionName, serviceManager, selector, durable, timeout);
subscription.setDurable(durable);
+ subscription.setDeleteWhenIdle(deleteWhenIdle);
consumer = subscription;
}
else
{
- AcknowledgedSubscriptionResource subscription = new
AcknowledgedSubscriptionResource(sessionFactory, subscriptionName, subscriptionName,
serviceManager, selector);
+ AcknowledgedSubscriptionResource subscription = new
AcknowledgedSubscriptionResource(sessionFactory, subscriptionName, subscriptionName,
serviceManager, selector, durable, timeout);
subscription.setDurable(durable);
+ subscription.setDeleteWhenIdle(deleteWhenIdle);
consumer = subscription;
}
return consumer;
@@ -370,7 +381,7 @@
QueueConsumer tmp = null;
try
{
- tmp = createConsumer(true, autoAck, subscriptionId, null);
+ tmp = createConsumer(true, autoAck, subscriptionId, null,
consumerTimeoutSeconds * 1000, false);
}
catch (HornetQException e)
{
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/TopicDestinationsResource.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/TopicDestinationsResource.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/TopicDestinationsResource.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -197,6 +197,7 @@
sender.setDestination(topicName);
sender.setSessionFactory(manager.getSessionFactory());
sender.setPoolSize(manager.getProducerPoolSize());
+ sender.setProducerTimeToLive(manager.getProducerTimeToLive());
sender.setServiceManager(manager);
sender.init();
topicResource.setSender(sender);
Modified:
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -74,7 +74,7 @@
Link sender =
MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response,
"create");
System.out.println("create: " + sender);
Link pushSubscriptions =
MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response,
"push-consumers");
- System.out.println("push consumers: " + pushSubscriptions);
+ System.out.println("push subscriptions: " + pushSubscriptions);
request = new ClientRequest(generateURL("/queues/forwardQueue"));
response = request.head();
@@ -87,6 +87,7 @@
PushRegistration reg = new PushRegistration();
reg.setDurable(true);
+ reg.setDisableOnFailure(true);
XmlLink target = new XmlLink();
target.setHref(generateURL("/queues/forwardQueue"));
target.setRelationship("destination");
@@ -112,6 +113,49 @@
shutdown();
}
+ @Test
+ public void testFailure() throws Exception
+ {
+ startup();
+ deployQueues();
+
+ ClientRequest request = new
ClientRequest(generateURL("/queues/testQueue"));
+
+ ClientResponse<?> response = request.head();
+ Assert.assertEquals(200, response.getStatus());
+ Link sender =
MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response,
"create");
+ System.out.println("create: " + sender);
+ Link pushSubscriptions =
MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response,
"push-consumers");
+ System.out.println("push subscriptions: " + pushSubscriptions);
+
+ PushRegistration reg = new PushRegistration();
+ reg.setDurable(true);
+ XmlLink target = new XmlLink();
+ target.setHref("http://localhost:3333/error");
+ target.setRelationship("uri");
+ reg.setTarget(target);
+ reg.setDisableOnFailure(true);
+ reg.setMaxRetries(3);
+ reg.setRetryWaitMillis(10);
+ response = pushSubscriptions.request().body("application/xml",
reg).post();
+ Assert.assertEquals(201, response.getStatus());
+ Link pushSubscription = response.getLocation();
+
+ ClientResponse res = sender.request().body("text/plain",
Integer.toString(1)).post();
+ Assert.assertEquals(201, res.getStatus());
+
+ Thread.sleep(1000);
+
+ response = pushSubscription.request().get();
+ PushRegistration reg2 = response.getEntity(PushRegistration.class);
+ Assert.assertEquals(reg.isDurable(), reg2.isDurable());
+ Assert.assertEquals(reg.getTarget().getHref(), reg2.getTarget().getHref());
+ Assert.assertFalse(reg2.isEnabled());
+
+ manager.getQueueManager().getPushStore().removeAll();
+ shutdown();
+ }
+
private void deployQueues()
throws Exception
{
Modified:
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -1,12 +1,15 @@
package org.hornetq.rest.test;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.rest.MessageServiceManager;
+import org.hornetq.rest.queue.push.xml.PushRegistration;
import org.hornetq.rest.queue.push.xml.XmlLink;
import org.hornetq.rest.topic.PushTopicRegistration;
import org.hornetq.rest.topic.TopicDeployment;
@@ -82,6 +85,56 @@
}
@Test
+ public void testFailure() throws Exception
+ {
+ startup();
+ deployTopic();
+
+ ClientRequest request = new
ClientRequest(generateURL("/topics/testTopic"));
+
+ ClientResponse<?> response = request.head();
+ Assert.assertEquals(200, response.getStatus());
+ Link sender =
MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response,
"create");
+ System.out.println("create: " + sender);
+ Link pushSubscriptions =
MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response,
"push-subscriptions");
+ System.out.println("push subscriptions: " + pushSubscriptions);
+
+ PushTopicRegistration reg = new PushTopicRegistration();
+ reg.setDurable(true);
+ XmlLink target = new XmlLink();
+ target.setHref("http://localhost:3333/error");
+ target.setRelationship("uri");
+ reg.setTarget(target);
+ reg.setDisableOnFailure(true);
+ reg.setMaxRetries(3);
+ reg.setRetryWaitMillis(10);
+ response = pushSubscriptions.request().body("application/xml",
reg).post();
+ Assert.assertEquals(201, response.getStatus());
+ Link pushSubscription = response.getLocation();
+
+ ClientResponse res = sender.request().body("text/plain",
Integer.toString(1)).post();
+ Assert.assertEquals(201, res.getStatus());
+
+ Thread.sleep(1000);
+
+ response = pushSubscription.request().get();
+ PushTopicRegistration reg2 = response.getEntity(PushTopicRegistration.class);
+ Assert.assertEquals(reg.isDurable(), reg2.isDurable());
+ Assert.assertEquals(reg.getTarget().getHref(), reg2.getTarget().getHref());
+ Assert.assertFalse(reg2.isEnabled());
+
+ String destination = reg2.getDestination();
+ ClientSession session =
manager.getQueueManager().getSessionFactory().createSession(false, false, false);
+ ClientSession.QueueQuery query = session.queueQuery(new
SimpleString(destination));
+ Assert.assertFalse(query.isExists());
+
+
+ manager.getQueueManager().getPushStore().removeAll();
+ shutdown();
+ }
+
+
+ @Test
public void testSuccessFirst() throws Exception
{
startup();
Modified:
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PushQueueConsumerTest.java
===================================================================
---
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PushQueueConsumerTest.java 2011-10-12
02:08:03 UTC (rev 11516)
+++
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/PushQueueConsumerTest.java 2011-10-12
02:27:00 UTC (rev 11517)
@@ -202,4 +202,5 @@
Assert.assertEquals("1", MyResource.gotit);
Assert.assertEquals(204, pushSubscription.request().delete().getStatus());
}
+
}