[jboss-cvs] JBoss Messaging SVN: r2002 - in trunk: src/main/org/jboss/jms/client/remoting and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Jan 20 07:37:39 EST 2007


Author: ovidiu.feodorov at jboss.com
Date: 2007-01-20 07:37:39 -0500 (Sat, 20 Jan 2007)
New Revision: 2002

Added:
   trunk/tests/src/org/jboss/test/messaging/jms/HTTPTransportTest.java
Modified:
   trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
   trunk/src/main/org/jboss/jms/tx/ResourceManager.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-699


Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -104,7 +104,7 @@
     */
    public Object invoke(Invocation invocation) throws Throwable
    {
-      if (trace) { log.trace(this + " invoking " + ((MethodInvocation)invocation).getMethod().getName() + " on server"); }
+      String methodName = ((MethodInvocation)invocation).getMethod().getName();
 
       invocation.getMetaData().addMetaData(Dispatcher.DISPATCHER,
                                            Dispatcher.OID,
@@ -115,11 +115,27 @@
       byte version = getState().getVersionToUse().getProviderIncrementingVersion();
       MessagingMarshallable request = new MessagingMarshallable(version, invocation);
 
-      MessagingMarshallable response = (MessagingMarshallable)client.invoke(request, null);
+      // select invocations ought to be sent "one way" for increased performance
+      if ("changeRate".equals(methodName))
+      {
+         if (trace) { log.trace(this + " invoking " + methodName + " asynchronously on server"); }
 
-      if (trace) { log.trace(this + " got server response for " + ((MethodInvocation)invocation).getMethod().getName()); }
+         client.invokeOneway(request);
 
-      return response.getLoad();
+         if (trace) { log.trace(this + " asynchronously invoked " + methodName + " on server, no response expected"); }
+
+         return null;
+      }
+      else
+      {
+         if (trace) { log.trace(this + " invoking " + methodName + " synchronously on server"); }
+
+         MessagingMarshallable response = (MessagingMarshallable)client.invoke(request, null);
+
+         if (trace) { log.trace(this + " got server response for " + methodName); }
+
+         return response.getLoad();
+      }
    }
 
    // Initializable implemenation ------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -508,7 +508,7 @@
          //if we've already sent it - hence the check
          startSendingMessageSent = true;
             
-         if (trace) { log.trace("Telling server to start resume sending messages, buffer size is " + buffer.size()); }
+         if (trace) { log.trace("telling server to start resume sending messages, buffer size is " + buffer.size()); }
          
          sendChangeRateMessage(1);                    
       }
@@ -610,12 +610,10 @@
    
    private void sendChangeRateMessage(float newRate) 
    {
-      // FIXME - when the latest remoting changes make it into a release, we need
-      //to use server side one way invocations here.
-      //I.e. we want to sent the invocation to the transport on the this thread and 
-      //return immediately without waiting for a response.
       try
       {
+         // this invocation will be sent asynchronously to the server; it's DelegateSupport.invoke()
+         // job to detect it and turn it into a remoting one way invocation.
          consumerDelegate.changeRate(newRate);
       }
       catch (JMSException e)
@@ -821,8 +819,8 @@
          }
          
          
-         //Tell the server we need more messages - but we don't want to keep sending the message
-         //if we've already sent it - hence the check
+         // Tell the server we need more messages - but we don't want to keep sending the message
+         // if we've already sent it - hence the check
          if (!startSendingMessageSent && buffer.size() <= minBufferSize)
          {                    
             startSendingMessageSent = true;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -113,6 +113,11 @@
       return consumerId;
    }
 
+   public String toString()
+   {
+      return "ClientDelivery[" + msg + "]";
+   }
+
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -243,10 +243,9 @@
            
          try
          {
-            // FIXME - when the latest remoting changes make it into a release, we need to use
-            // server side one way callbacks here. I.e. we want to sent the invocation to the
-            // transport on the this thread and return immediately without waiting for a response.
-            callbackHandler.handleCallback(callback);  
+            // one way invocation, no acknowledgment sent back by the client
+            if (trace) { log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously"); }
+            callbackHandler.handleCallbackOneway(callback);
          }
          catch (HandleCallbackException e)
          {

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -55,6 +55,7 @@
 import org.jboss.remoting.InvocationResponse;
 import org.jboss.remoting.callback.Callback;
 import org.jboss.remoting.invocation.InternalInvocation;
+import org.jboss.remoting.invocation.OnewayInvocation;
 import org.jboss.remoting.marshal.Marshaller;
 import org.jboss.remoting.marshal.UnMarshaller;
 import org.jboss.serial.io.JBossObjectInputStream;
@@ -101,10 +102,7 @@
    protected static final byte GET_ID_BLOCK = 8;
    protected static final byte RECOVER_DELIVERIES = 9;
    protected static final byte CANCEL_INFLIGHT_MESSAGES = 10;
-   
 
- 
-
    // The response codes - start from 100
 
    protected static final byte MESSAGE_DELIVERY = 100;
@@ -145,14 +143,14 @@
       }
       else
       {
-         //Further sanity check
+         // Further sanity check
          if (out instanceof ObjectOutputStream)
          {
-            throw new IllegalArgumentException("Invalid stream - are you sure you have configured socket wrappers?");
+            throw new IllegalArgumentException("Invalid stream - are you sure you have " +
+                                               "configured socket wrappers?");
          }
          
-         //This would be the case for the HTTP transport for example
-         //Wrap the stream
+         // This would be the case for the HTTP transport for example. Wrap the stream.
          
          //TODO Ideally remoting would let us wrap this before invoking the marshaller
          //but this does not appear to be possible
@@ -169,12 +167,20 @@
    
             InvocationRequest req = (InvocationRequest)obj;
    
-            Object param;
+            Object param = req.getParameter();
             
-            // Unwrap Callback.
-            if (req.getParameter() instanceof InternalInvocation)
+            if (param instanceof OnewayInvocation)
             {
-               InternalInvocation ii = (InternalInvocation) req.getParameter();
+               // an oneway invocation is only using the first slot in its parameter array so we're
+               // taking some shortcuts here.
+               param = ((OnewayInvocation)param).getParameters()[0];
+            }
+
+            if (param instanceof InternalInvocation)
+            {
+               // unwrap callback
+
+               InternalInvocation ii = (InternalInvocation)param;
                Object[] params = ii.getParameters();
                
                if (params != null && params.length > 0 && params[0] instanceof Callback)
@@ -190,20 +196,12 @@
                      param = callback.getParameter();
                   }
                }
-               else
-               {
-                  param = req.getParameter();
-               }
             }
-            else if (req.getParameter() instanceof MessagingMarshallable)
+            else if (param instanceof MessagingMarshallable)
             {
-               param = ((MessagingMarshallable)req.getParameter()).getLoad();
+               param = ((MessagingMarshallable)param).getLoad();
             }
-            else
-            {
-               param = req.getParameter();
-            }
-   
+
             if (trace) { log.trace("param is " + param); }
    
             if (param instanceof MethodInvocation)
@@ -242,7 +240,7 @@
    
                   dos.flush();
    
-                  if (trace) { log.trace("wrote activate()"); }
+                  if (trace) { log.trace("wrote changeRate()"); }
                }           
                else if ("acknowledgeDelivery".equals(methodName))
                {
@@ -403,7 +401,7 @@
             }
             else if (param instanceof ClientDelivery)
             {
-               //Message delivery callback
+               // Message delivery callback
    
                if (trace) { log.trace("DeliveryRunnable"); }
    
@@ -518,10 +516,9 @@
                dos.writeUTF(resp.getSessionId());
                dos.writeInt(callbackList.size());
                
-               Iterator it = callbackList.iterator();
-               while (it.hasNext())
+               for(Iterator i = callbackList.iterator(); i.hasNext(); )
                {
-                  Callback callback = (Callback)it.next();
+                  Callback callback = (Callback)i.next();
 
                   // We don't use acknowledgeable push callbacks
 
@@ -529,6 +526,8 @@
                   ClientDelivery delivery = (ClientDelivery)mm.getLoad();
                   delivery.write(dos);
                   dos.flush();
+
+                  if (trace) { log.trace("wrote delivery " + delivery); }
                }
             }
             else
@@ -571,14 +570,13 @@
       }
       else
       {        
-         //Further sanity check
+         // Further sanity check
          if (in instanceof ObjectInputStream)
          {
             throw new IllegalArgumentException("Invalid stream - are you sure you have configured socket wrappers?");
          }
          
-         //This would be the case for the HTTP transport for example
-         //Wrap the stream
+         // This would be the case for the HTTP transport for example. Wrap the stream
          
          //TODO Ideally remoting would let us wrap this before invoking the marshaller
          //but this does not appear to be possible
@@ -631,6 +629,8 @@
             }
             case CHANGE_RATE:
             {
+               // asynchronous invocation
+
                MethodInvocation mi = readHeader(dis);
                
                float f = dis.readFloat();
@@ -638,12 +638,14 @@
                Object[] args = new Object[] {new Float(f)};
                
                mi.setArguments(args);
+
+               OnewayInvocation oi = new OnewayInvocation(new MessagingMarshallable(version, mi));
                   
                InvocationRequest request =
                   new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
-                                        new MessagingMarshallable(version, mi), null, null, null);
+                                        oi, null, null, null);
    
-               if (trace) { log.trace("read activate()"); }
+               if (trace) { log.trace("read changeRate()"); }
    
                return request;
             }         
@@ -898,19 +900,24 @@
             }
             case MESSAGE_DELIVERY:
             {
+               // asynchronous invocation
+
                String sessionId = dis.readUTF();
                ClientDelivery dr = new ClientDelivery();
                
                dr.read(dis);
 
-               // Recreate Callback.
+               // recreate callback
                MessagingMarshallable mm = new MessagingMarshallable(version, dr);
                Callback callback = new Callback(mm);
                InternalInvocation ii
                   = new InternalInvocation(InternalInvocation.HANDLECALLBACK, new Object[]{callback});
+
+               OnewayInvocation oi = new OnewayInvocation(ii);
+
                InvocationRequest request
                   = new InvocationRequest(sessionId, CallbackManager.JMS_CALLBACK_SUBSYSTEM,
-                                          ii, null, null, null);
+                                          oi, null, null, null);
    
                if (trace) { log.trace("read callback()"); }
    
@@ -928,6 +935,9 @@
 
                   ClientDelivery delivery = new ClientDelivery();
                   delivery.read(dis);
+
+                  if (trace) { log.trace("read delivery " + delivery); }
+
                   MessagingMarshallable mm = new MessagingMarshallable(version, delivery);
                   Callback callback = new Callback(mm);
 

Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -29,7 +29,6 @@
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
-import javax.jms.TransactionRolledBackException;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/build.xml	2007-01-20 12:37:39 UTC (rev 2002)
@@ -352,14 +352,10 @@
    <target name="tests" depends="tests-jar, prepare-testdirs, clear-test-logs">
     <antcall target="crash-tests"/>
       <antcall target="invm-tests"/>
-
       <antcall target="remote-tests"/>  <!-- default remoting configuration (socket) -->
-      <!--
       <antcall target="remote-tests">
          <param name="test.remoting" value="http"/>
       </antcall>
-
-      -->
       <antcall target="clustering-tests"/>
    </target>
 

Added: trunk/tests/src/org/jboss/test/messaging/jms/HTTPTransportTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/HTTPTransportTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/HTTPTransportTest.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -0,0 +1,139 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+import javax.naming.InitialContext;
+import javax.jms.ConnectionFactory;
+import javax.jms.Queue;
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+
+/**
+ * This class contain tests that only make sense for a HTTP transport. They will be ignored for
+ * any other kind of transport.
+ *
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class HTTPTransportTest extends MessagingTestCase
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private InitialContext ic;
+   private ConnectionFactory cf;
+   private Queue queue;
+   private ObjectName queueObjectName;
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public HTTPTransportTest(String name)
+   {
+      super(name);
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public void testCallbackList() throws Exception
+   {
+      if (!"http".equals(ServerManagement.getRemotingTransport(0)))
+      {
+         log.warn("The server we are connecting to did not start its remoting service " +
+            "with HTTP transport enabled, skipping test ...");
+         return;
+      }
+
+      // send a bunch of messages and let them accumulate in the queue
+      Connection conn = cf.createConnection();
+      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer prod = session.createProducer(queue);
+
+      int messageCount = 20;
+
+      for(int i = 0; i < messageCount; i++)
+      {
+         Message m = session.createTextMessage("krakatau" + i);
+         prod.send(m);
+      }
+
+      conn.close();
+
+      // make sure messages made it to the queue
+      Integer count = (Integer)ServerManagement.getAttribute(queueObjectName, "MessageCount");
+      assertEquals(messageCount, count.intValue());
+
+
+      conn = cf.createConnection();
+      session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer cons = session.createConsumer(queue);
+
+      conn.start();
+
+      // messages will be sent in bulk from server side, on the next HTTP client listner poll
+
+      for(int i = 0; i < messageCount; i++)
+      {
+         TextMessage t = (TextMessage)cons.receive(2000);
+         assertNotNull(t);
+         assertEquals("krakatau" + i, t.getText());
+      }
+      
+      conn.close();
+
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      ServerManagement.start("all");
+
+      ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+      ServerManagement.deployQueue("HTTPTestQueue");
+
+      cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+      queue = (Queue)ic.lookup("/queue/HTTPTestQueue");
+
+      queueObjectName =
+         new ObjectName("jboss.messaging.destination:service=Queue,name=HTTPTestQueue");
+
+      ServerManagement.invoke(queueObjectName, "removeAllMessages", new Object[0], new String[0]);
+
+      log.debug("setup done");
+   }
+
+   public void tearDown() throws Exception
+   {
+      ServerManagement.undeployQueue("HTTPTestQueue");
+
+      ic.close();
+
+      super.tearDown();
+   }
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+}


Property changes on: trunk/tests/src/org/jboss/test/messaging/jms/HTTPTransportTest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -1179,8 +1179,6 @@
 
    }
    
-   
-   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -1096,6 +1096,12 @@
       return s;
    }
 
+   public static String getRemotingTransport(int serverIndex) throws Exception
+   {
+      insureStarted(serverIndex);
+      return servers[serverIndex].getServer().getRemotingTransport();
+   }
+
    // Attributes ----------------------------------------------------
 
    // Constructors --------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -819,6 +819,11 @@
       return config.getDatabaseType();
    }
 
+   public String getRemotingTransport()
+   {
+      return config.getRemotingTransport();
+   }
+
    public boolean isClustered()
    {
       return config.isClustered();

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -256,6 +256,11 @@
       return sc.getDatabaseType();
    }
 
+   public String getRemotingTransport()
+   {
+      return sc.getRemotingTransport();
+   }
+
    public void log(int level, String text)
    {
       if (ServerManagement.FATAL == level)

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -246,6 +246,11 @@
       return server.getDatabaseType();
    }
 
+   public String getRemotingTransport()
+   {
+      return server.getRemotingTransport();
+   }
+
    public void log(int level, String text) throws Exception
    {
       server.log(level, text);

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java	2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java	2007-01-20 12:37:39 UTC (rev 2002)
@@ -108,6 +108,11 @@
    String getDatabaseType() throws Exception;
 
    /**
+    * @return one of "socket", "http", ...
+    */
+   String getRemotingTransport() throws Exception;
+
+   /**
     * Only for remote use!
     */
    void log(int level, String text) throws Exception;




More information about the jboss-cvs-commits mailing list