[hornetq-commits] JBoss hornetq SVN: r12299 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/ra and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Mar 13 23:07:02 EDT 2012


Author: gaohoward
Date: 2012-03-13 23:06:58 -0400 (Tue, 13 Mar 2012)
New Revision: 12299

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
Log:
JBPAPP-8017


Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2012-03-13 16:45:44 UTC (rev 12298)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2012-03-14 03:06:58 UTC (rev 12299)
@@ -161,7 +161,6 @@
             if (activation.getTopicTemporaryQueue() == null)
             {
                queueName = new SimpleString(UUID.randomUUID().toString());
-               session.createQueue(activation.getAddress(), queueName, selectorString, false);
                activation.setTopicTemporaryQueue(queueName);
             }
             else
@@ -173,6 +172,20 @@
          {
             queueName = activation.getAddress();
          }
+
+         QueueQuery subResponse = session.queueQuery(queueName);
+
+         if (!subResponse.isExists())
+         {
+            if (activation.isTopic())
+            {
+               session.createTemporaryQueue(activation.getAddress(), queueName, selectorString);
+            }
+            else
+            {
+               session.createQueue(activation.getAddress(), queueName, selectorString, true);
+            }
+         }
          consumer = session.createConsumer(queueName, selectorString);
       }
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java	2012-03-13 16:45:44 UTC (rev 12298)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java	2012-03-14 03:06:58 UTC (rev 12299)
@@ -12,17 +12,23 @@
  */
 package org.hornetq.tests.integration.ra;
 
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientProducer;
 import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSession.QueueQuery;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.inflow.HornetQActivation;
 import org.hornetq.ra.inflow.HornetQActivationSpec;
 import org.hornetq.tests.util.UnitTestCase;
 
 import javax.jms.Message;
+import javax.resource.spi.ActivationSpec;
+
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -347,6 +353,53 @@
       qResourceAdapter.stop();
    }
 
+   //https://issues.jboss.org/browse/JBPAPP-8017
+   public void testNonDurableSubscriptionDeleteAfterCrash() throws Exception
+   {
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      qResourceAdapter.setConnectorClassName(UnitTestCase.INVM_CONNECTOR_FACTORY);
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Topic");
+      spec.setDestination("mdbTopic");
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      
+      ClientSession session = locator.createSessionFactory().createSession();
+      ClientProducer clientProducer = session.createProducer("jms.topic.mdbTopic");
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeString("1");
+      clientProducer.send(message);
+
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "1");
+      
+      Map<ActivationSpec, HornetQActivation> activations = qResourceAdapter.getActivations();
+      assertEquals(1,activations.size());
+      
+      HornetQActivation activation = activations.values().iterator().next();
+      SimpleString tempQueueName = activation.getTopicTemporaryQueue();
+      
+      QueueQuery query = session.queueQuery(tempQueueName);
+      assertTrue(query.isExists());
+
+      //this should be enough to simulate the crash
+      qResourceAdapter.getDefaultHornetQConnectionFactory().close();
+      qResourceAdapter.stop();
+
+      query = session.queueQuery(tempQueueName);
+      
+      assertFalse(query.isExists());
+   }
+
    public void testSelectorChangedWithTopic() throws Exception
    {
       HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();



More information about the hornetq-commits mailing list