[hornetq-commits] JBoss hornetq SVN: r9249 - in trunk: tests/src/org/hornetq/tests/integration and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu May 20 08:48:36 EDT 2010


Author: ataylor
Date: 2010-05-20 08:48:35 -0400 (Thu, 20 May 2010)
New Revision: 9249

Added:
   trunk/tests/src/org/hornetq/tests/integration/ra/
   trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
Modified:
   trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
   trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://jira.jboss.org/browse/HORNETQ-389 - fixed selector issue and added test

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java	2010-05-19 15:32:37 UTC (rev 9248)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java	2010-05-20 12:48:35 UTC (rev 9249)
@@ -458,6 +458,7 @@
          if (Topic.class.getName().equals(spec.getDestinationType()))
          {
             destination = (HornetQDestination)HornetQJMSClient.createTopic(spec.getDestination());
+            isTopic = true;
          }
          else
          {

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2010-05-19 15:32:37 UTC (rev 9248)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2010-05-20 12:48:35 UTC (rev 9249)
@@ -124,11 +124,9 @@
             SimpleString oldFilterString = subResponse.getFilterString();
 
             boolean selectorChanged = selector == null && oldFilterString != null ||
-                                      oldFilterString == null &&
-                                      selector != null ||
-                                      oldFilterString != null &&
-                                      selector != null &&
-                                      !oldFilterString.equals(selector);
+                                      oldFilterString == null && selector != null ||
+                                      (oldFilterString != null && selector != null &&
+                                      !oldFilterString.toString().equals(selector));
 
             SimpleString oldTopicName = subResponse.getAddress();
 

Added: trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java	2010-05-20 12:48:35 UTC (rev 9249)
@@ -0,0 +1,288 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.ra;
+
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.inflow.HornetQActivationSpec;
+import org.hornetq.tests.util.ServiceTestBase;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.resource.ResourceException;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.UnavailableException;
+import javax.resource.spi.XATerminator;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.*;
+import javax.transaction.xa.XAResource;
+import java.lang.reflect.Method;
+import java.util.Timer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ *         Created May 20, 2010
+ */
+public class HornetQMessageHandlerTest  extends ServiceTestBase
+{
+   private Configuration configuration;
+
+   private HornetQServer server;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      clearData();
+      configuration = createDefaultConfig();
+      configuration.setSecurityEnabled(false);
+      server = createServer(true, configuration);
+      server.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (server != null)
+      {
+         try
+         {
+            server.stop();
+            server = null;
+         }
+         catch (Exception e)
+         {
+            // ignore
+         }
+      }
+      super.tearDown();
+   }
+
+   public void testSelectorChangedWithTopic() throws Exception
+   {
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Topic");
+      spec.setDestination("mdbTopic");
+      spec.setSubscriptionDurability("Durable");
+      spec.setSubscriptionName("durable-mdb");
+      spec.setClientID("id-1");
+      spec.setMessageSelector("foo='bar'");
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      ClientSession session = createFactory(false).createSession();
+      ClientProducer clientProducer = session.createProducer("jms.topic.mdbTopic");
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeString("1");
+      message.putStringProperty("foo", "bar");
+      clientProducer.send(message);
+
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "1");
+
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+
+      message = session.createMessage(true);
+      message.getBodyBuffer().writeString("2");
+      message.putStringProperty("foo", "bar");
+      clientProducer.send(message);
+
+      latch = new CountDownLatch(1);
+      endpoint = new DummyMessageEndpoint(latch);
+      //change the selector forcing the queue to be recreated
+      spec.setMessageSelector("foo='abar'");
+      endpointFactory = new DummyMessageEndpointFactory(endpoint);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      message = session.createMessage(true);
+      message.getBodyBuffer().writeString("3");
+      message.putStringProperty("foo", "abar");
+      clientProducer.send(message);
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "3");
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+   }
+
+   public void testSelectorNotChangedWithTopic() throws Exception
+   {      
+      HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      HornetQActivationSpec spec = new HornetQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Topic");
+      spec.setDestination("mdbTopic");
+      spec.setSubscriptionDurability("Durable");
+      spec.setSubscriptionName("durable-mdb");
+      spec.setClientID("id-1");
+      spec.setMessageSelector("foo='bar'");
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      ClientSession session = createFactory(false).createSession();
+      ClientProducer clientProducer = session.createProducer("jms.topic.mdbTopic");
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeString("1");
+      message.putStringProperty("foo", "bar");
+      clientProducer.send(message);
+
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "1");
+
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+
+      message = session.createMessage(true);
+      message.getBodyBuffer().writeString("2");
+      message.putStringProperty("foo", "bar");
+      clientProducer.send(message);
+
+      latch = new CountDownLatch(1);
+      endpoint = new DummyMessageEndpoint(latch);
+      endpointFactory = new DummyMessageEndpointFactory(endpoint);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "2");
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+
+   }
+
+   class DummyMessageEndpointFactory implements MessageEndpointFactory
+   {
+      private DummyMessageEndpoint endpoint;
+
+      public DummyMessageEndpointFactory(DummyMessageEndpoint endpoint)
+      {
+         this.endpoint = endpoint;
+      }
+
+      public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException
+      {
+         return endpoint;
+      }
+
+      public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException
+      {
+         return false;
+      }
+   }
+
+   class DummyMessageEndpoint implements MessageEndpoint, MessageListener
+   {
+      public CountDownLatch latch;
+
+      private HornetQMessage lastMessage;
+
+      public DummyMessageEndpoint(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException
+      {
+
+      }
+
+      public void afterDelivery() throws ResourceException
+      {      
+         if(latch != null)
+         {
+            latch.countDown();
+         }
+      }
+
+      public void release()
+      {
+
+      }
+
+      public void onMessage(Message message)
+      {
+         lastMessage = (HornetQMessage) message;
+      }
+   }
+
+   class MyBootstrapContext implements BootstrapContext
+   {
+      WorkManager workManager = new DummyWorkManager();
+
+      public Timer createTimer() throws UnavailableException
+      {
+         return null;
+      }
+
+      public WorkManager getWorkManager()
+      {
+         return workManager;
+      }
+
+      public XATerminator getXATerminator()
+      {
+         return null;
+      }
+
+      class DummyWorkManager implements WorkManager
+      {
+         public void doWork(Work work) throws WorkException
+         {
+         }
+
+         public void doWork(Work work, long l, ExecutionContext executionContext, WorkListener workListener) throws WorkException
+         {
+         }
+
+         public long startWork(Work work) throws WorkException
+         {
+            return 0;
+         }
+
+         public long startWork(Work work, long l, ExecutionContext executionContext, WorkListener workListener) throws WorkException
+         {
+            return 0;
+         }
+
+         public void scheduleWork(Work work) throws WorkException
+         {
+            work.run();
+         }
+
+         public void scheduleWork(Work work, long l, ExecutionContext executionContext, WorkListener workListener) throws WorkException
+         {
+         }
+      }
+   }
+}



More information about the hornetq-commits mailing list