[jboss-cvs] JBoss Messaging SVN: r5445 - in trunk: src/main/org/jboss/messaging/core/client/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 2 04:06:42 EST 2008


Author: jmesnil
Date: 2008-12-02 04:06:42 -0500 (Tue, 02 Dec 2008)
New Revision: 5445

Added:
   trunk/src/main/org/jboss/messaging/core/client/ClientRequestor.java
   trunk/tests/src/org/jboss/messaging/tests/integration/basic/ClientRequestorTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
Log:
added a ClientRequestor to JBM Core API

this class is similar to JMS QueueRequestor and can be used to send simple blocking request/reply messages

Added: trunk/src/main/org/jboss/messaging/core/client/ClientRequestor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientRequestor.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientRequestor.java	2008-12-02 09:06:42 UTC (rev 5445)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.client;
+
+import java.util.UUID;
+
+import org.jboss.messaging.core.client.impl.ClientMessageImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * a ClientRequestor.
+ * 
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class ClientRequestor
+{
+   private ClientSession queueSession;
+
+   private ClientProducer requestProducer;
+
+   private ClientConsumer replyConsumer;
+
+   private SimpleString replyAddress;
+
+   private SimpleString replyQueue;
+
+   public ClientRequestor(ClientSession session, SimpleString requestAddress) throws Exception
+   {
+      queueSession = session;
+
+      requestProducer = queueSession.createProducer(requestAddress);
+      replyAddress = new SimpleString(UUID.randomUUID().toString());
+      replyQueue = new SimpleString(UUID.randomUUID().toString());
+      queueSession.addDestination(replyAddress, false, true);
+      queueSession.createQueue(replyAddress, replyQueue, null, false, true, false);
+      replyConsumer = queueSession.createConsumer(replyQueue);
+   }
+
+   public ClientMessage request(ClientMessage request) throws Exception
+   {
+      return request(request, 0);
+   }
+
+   public ClientMessage request(ClientMessage request, int timeout) throws Exception
+   {
+      request.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, replyAddress);
+      requestProducer.send(request);
+      return replyConsumer.receive(timeout);
+   }
+
+   public void close() throws Exception
+   {
+      try
+      {
+         replyConsumer.close();
+      }
+      catch (Exception ignored)
+      {
+      }
+      try
+      {
+         queueSession.deleteQueue(replyQueue);
+         queueSession.removeDestination(replyAddress, false);
+      }
+      catch (Exception ignored)
+      {
+      }
+      queueSession.close();
+   }
+
+ }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-12-01 23:32:29 UTC (rev 5444)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-12-02 09:06:42 UTC (rev 5445)
@@ -24,9 +24,9 @@
 
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -38,6 +38,9 @@
  */
 public class ClientMessageImpl extends MessageImpl implements ClientMessage
 {
+   // added this constant here so that the client package have no dependency on JMS
+   public static final SimpleString REPLYTO_HEADER_NAME = new SimpleString("JMSReplyTo");
+   
    private int deliveryCount;
 
    private ClientConsumerInternal consumer;

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java	2008-12-01 23:32:29 UTC (rev 5444)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java	2008-12-02 09:06:42 UTC (rev 5445)
@@ -57,8 +57,8 @@
 {
    // Constants -----------------------------------------------------
 
-   public static final SimpleString REPLYTO_HEADER_NAME = new SimpleString("JMSReplyTo");
-
+   public static final SimpleString REPLYTO_HEADER_NAME = ClientMessageImpl.REPLYTO_HEADER_NAME;
+   
    public static final SimpleString CORRELATIONID_HEADER_NAME = new SimpleString("JMSCorrelationID");
 
    public static final SimpleString JBM_MESSAGE_ID = new SimpleString("JMSMessageID");

Added: trunk/tests/src/org/jboss/messaging/tests/integration/basic/ClientRequestorTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/basic/ClientRequestorTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/basic/ClientRequestorTest.java	2008-12-02 09:06:42 UTC (rev 5445)
@@ -0,0 +1,283 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.basic;
+
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientRequestor;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientMessageImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A ClientRequestorTest
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class ClientRequestorTest extends TestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private MessagingService service;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testRequest() throws Exception
+   {
+      final SimpleString key = randomSimpleString();
+      long value = randomLong();
+      SimpleString requestAddress = randomSimpleString();
+      SimpleString requestQueue = randomSimpleString();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.start();
+
+      session.addDestination(requestAddress, false, true);
+      session.createQueue(requestAddress, requestQueue, null, false, true, false);
+
+      ClientConsumer requestConsumer = session.createConsumer(requestQueue);
+      requestConsumer.setMessageHandler(new SimpleMessageHandler(key, session));
+
+      ClientRequestor requestor = new ClientRequestor(session, requestAddress);
+      ClientMessage request = session.createClientMessage(false);
+      request.putLongProperty(key, value);
+
+      ClientMessage reply = requestor.request(request, 500);
+      assertNotNull("reply was not received", reply);
+      assertEquals(value, reply.getProperty(key));
+
+      session.close();
+   }
+
+   public void testTwoRequests() throws Exception
+   {
+      final SimpleString key = randomSimpleString();
+      long value = randomLong();
+      SimpleString requestAddress = randomSimpleString();
+      SimpleString requestQueue = randomSimpleString();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.start();
+
+      session.addDestination(requestAddress, false, true);
+      session.createQueue(requestAddress, requestQueue, null, false, true, false);
+
+      ClientConsumer requestConsumer = session.createConsumer(requestQueue);
+      requestConsumer.setMessageHandler(new SimpleMessageHandler(key, session));
+
+      ClientRequestor requestor = new ClientRequestor(session, requestAddress);
+      ClientMessage request = session.createClientMessage(false);
+      request.putLongProperty(key, value);
+
+      ClientMessage reply = requestor.request(request, 500);
+      assertNotNull("reply was not received", reply);
+      assertEquals(value, reply.getProperty(key));
+
+      request = session.createClientMessage(false);
+      request.putLongProperty(key, value + 1);
+
+      reply = requestor.request(request, 500);
+      assertNotNull("reply was not received", reply);
+      assertEquals(value + 1, reply.getProperty(key));
+
+      session.close();
+   }
+
+   public void testRequestWithRequestConsumerWhichDoesNotReply() throws Exception
+   {
+      SimpleString requestAddress = randomSimpleString();
+      SimpleString requestQueue = randomSimpleString();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.start();
+
+      session.addDestination(requestAddress, false, true);
+      session.createQueue(requestAddress, requestQueue, null, false, true, false);
+
+      ClientConsumer requestConsumer = session.createConsumer(requestQueue);
+      requestConsumer.setMessageHandler(new MessageHandler()
+      {
+         // return a message with the negative request's value
+         public void onMessage(ClientMessage request)
+         {
+            // do nothing -> no reply
+         }
+      });
+
+      ClientRequestor requestor = new ClientRequestor(session, requestAddress);
+      ClientMessage request = session.createClientMessage(false);
+
+      ClientMessage reply = requestor.request(request, 500);
+      assertNull(reply);
+
+      session.close();
+   }
+
+   public void testClientRequestorConstructorWithClosedSession() throws Exception
+   {
+      SimpleString requestAddress = randomSimpleString();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.close();
+
+      try
+      {
+         new ClientRequestor(session, requestAddress);
+         fail("ClientRequestor's session must not be closed");
+      }
+      catch (Exception e)
+      {
+      }
+   }
+
+   public void testClose() throws Exception
+   {
+      final SimpleString key = randomSimpleString();
+      long value = randomLong();
+      SimpleString requestAddress = randomSimpleString();
+      SimpleString requestQueue = randomSimpleString();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      final ClientSession session = sf.createSession(false, true, true);
+
+      session.start();
+
+      session.addDestination(requestAddress, false, true);
+      session.createQueue(requestAddress, requestQueue, null, false, true, false);
+
+      ClientConsumer requestConsumer = session.createConsumer(requestQueue);
+      requestConsumer.setMessageHandler(new SimpleMessageHandler(key, session));
+
+      ClientRequestor requestor = new ClientRequestor(session, requestAddress);
+      ClientMessage request = session.createClientMessage(false);
+      request.putLongProperty(key, value);
+
+      ClientMessage reply = requestor.request(request, 500);
+      assertNotNull("reply was not received", reply);
+      assertEquals(value, reply.getProperty(key));
+
+      request = session.createClientMessage(false);
+      request.putLongProperty(key, value + 1);
+
+      requestor.close();
+
+      try
+      {
+         reply = requestor.request(request, 500);
+         fail("can not send a request on a closed ClientRequestor");
+      }
+      catch (Exception e)
+      {
+
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+
+      Configuration conf = new ConfigurationImpl();
+      conf.setSecurityEnabled(false);
+      conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      service = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+      service.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      service.stop();
+
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   private final class SimpleMessageHandler implements MessageHandler
+   {
+      private final SimpleString key;
+
+      private final ClientSession session;
+
+      private SimpleMessageHandler(SimpleString key, ClientSession session)
+      {
+         this.key = key;
+         this.session = session;
+      }
+
+      public void onMessage(ClientMessage request)
+      {
+         try
+         {
+            ClientMessage reply = session.createClientMessage(false);
+            SimpleString replyTo = (SimpleString)request.getProperty(ClientMessageImpl.REPLYTO_HEADER_NAME);
+            long value = (Long)request.getProperty(key);
+            reply.putLongProperty(key, value);
+            ClientProducer replyProducer = session.createProducer(replyTo);
+            replyProducer.send(reply);
+            request.acknowledge();
+         }
+         catch (MessagingException e)
+         {
+            e.printStackTrace();
+         }
+      }
+   }
+}




More information about the jboss-cvs-commits mailing list