[hornetq-commits] JBoss hornetq SVN: r12299 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/ra and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Mar 13 23:07:02 EDT 2012
Author: gaohoward
Date: 2012-03-13 23:06:58 -0400 (Tue, 13 Mar 2012)
New Revision: 12299
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
Log:
JBPAPP-8017
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2012-03-13 16:45:44 UTC (rev 12298)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2012-03-14 03:06:58 UTC (rev 12299)
@@ -161,7 +161,6 @@
if (activation.getTopicTemporaryQueue() == null)
{
queueName = new SimpleString(UUID.randomUUID().toString());
- session.createQueue(activation.getAddress(), queueName, selectorString, false);
activation.setTopicTemporaryQueue(queueName);
}
else
@@ -173,6 +172,20 @@
{
queueName = activation.getAddress();
}
+
+ QueueQuery subResponse = session.queueQuery(queueName);
+
+ if (!subResponse.isExists())
+ {
+ if (activation.isTopic())
+ {
+ session.createTemporaryQueue(activation.getAddress(), queueName, selectorString);
+ }
+ else
+ {
+ session.createQueue(activation.getAddress(), queueName, selectorString, true);
+ }
+ }
consumer = session.createConsumer(queueName, selectorString);
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java 2012-03-13 16:45:44 UTC (rev 12298)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java 2012-03-14 03:06:58 UTC (rev 12299)
@@ -12,17 +12,23 @@
*/
package org.hornetq.tests.integration.ra;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSession.QueueQuery;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
import org.hornetq.tests.util.UnitTestCase;
import javax.jms.Message;
+import javax.resource.spi.ActivationSpec;
+
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -347,6 +353,53 @@
qResourceAdapter.stop();
}
+ //https://issues.jboss.org/browse/JBPAPP-8017
+ public void testNonDurableSubscriptionDeleteAfterCrash() throws Exception
+ {
+ HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+ qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
+ MyBootstrapContext ctx = new MyBootstrapContext();
+ qResourceAdapter.start(ctx);
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Topic");
+ spec.setDestination("mdbTopic");
+ qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+ CountDownLatch latch = new CountDownLatch(1);
+ DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+ DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+
+ ClientSession session = locator.createSessionFactory().createSession();
+ ClientProducer clientProducer = session.createProducer("jms.topic.mdbTopic");
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeString("1");
+ clientProducer.send(message);
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "1");
+
+ Map<ActivationSpec, HornetQActivation> activations = qResourceAdapter.getActivations();
+ assertEquals(1,activations.size());
+
+ HornetQActivation activation = activations.values().iterator().next();
+ SimpleString tempQueueName = activation.getTopicTemporaryQueue();
+
+ QueueQuery query = session.queueQuery(tempQueueName);
+ assertTrue(query.isExists());
+
+ //this should be enough to simulate the crash
+ qResourceAdapter.getDefaultHornetQConnectionFactory().close();
+ qResourceAdapter.stop();
+
+ query = session.queueQuery(tempQueueName);
+
+ assertFalse(query.isExists());
+ }
+
public void testSelectorChangedWithTopic() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
More information about the hornetq-commits
mailing list