[jboss-cvs] JBoss Messaging SVN: r1855 - in trunk: src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/remoting tests/etc tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Dec 23 13:38:10 EST 2006


Author: timfox
Date: 2006-12-23 13:38:04 -0500 (Sat, 23 Dec 2006)
New Revision: 1855

Modified:
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
   trunk/tests/etc/log4j.xml
   trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
A few tweaks, new test



Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-23 14:48:44 UTC (rev 1854)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-23 18:38:04 UTC (rev 1855)
@@ -38,7 +38,6 @@
 import org.jboss.jms.server.endpoint.DeliveryInfo;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.util.Future;
-import org.jboss.remoting.callback.HandleCallbackException;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -452,11 +451,15 @@
          //if we've already sent it - hence the check
          startSendingMessageSent = true;
             
+         if (trace) { log.trace("Telling server to start resume sending messages, buffer size is " + buffer.size()); }
+         
          sendChangeRateMessage(1);                    
       }
       
       m.incDeliveryCount();
       
+      if (trace) { log.trace(this + " receive() returning " + m); }
+      
       return m;
    }    
    
@@ -548,12 +551,10 @@
    
    private void sendChangeRateMessage(float newRate) 
    {
-      //FIXME - We should be able to execute this invocation as a true
-      //remoting asynchronous invocation - i.e. it is written to the transport
-      //and no response is waited for
-      //Therefore there is no need to execute it here on a separate thread.
-      //Unfortunately remoting does not currently support this so this
-      //will be SLOW now.
+      // FIXME - when the latest remoting changes make it into a release, we need
+      //to use server side one way invocations here.
+      //I.e. we want to sent the invocation to the transport on the this thread and 
+      //return immediately without waiting for a response.
       try
       {
          consumerDelegate.changeRate(newRate);
@@ -671,7 +672,6 @@
          m = (MessageProxy)buffer.removeFirst();
       }
 
-      if (trace) { log.trace(this + ".getMessage() returning " + m); }
       return m;
    }
    
@@ -768,6 +768,8 @@
          {                    
             startSendingMessageSent = true;
             
+            if (trace) { log.trace("Telling server to start resume sending messages, buffer size is " + buffer.size()); }            
+            
             sendChangeRateMessage(1);
          } 
          

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-23 14:48:44 UTC (rev 1854)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-23 18:38:04 UTC (rev 1855)
@@ -47,9 +47,6 @@
 import org.jboss.remoting.callback.HandleCallbackException;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
-import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
-import EDU.oswego.cs.dl.util.concurrent.Sync;
-
 /**
  * Concrete implementation of ConsumerEndpoint.
  * 
@@ -221,6 +218,10 @@
            
          try
          {
+            //FIXME - when the latest remoting changes make it into a release, we need
+            //to use server side one way callbacks here.
+            //I.e. we want to sent the invocation to the transport on the this thread and 
+            //return immediately without waiting for a response.
             callbackHandler.handleCallback(callback);  
          }
          catch (HandleCallbackException e)

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-23 14:48:44 UTC (rev 1854)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-23 18:38:04 UTC (rev 1855)
@@ -96,16 +96,13 @@
    protected static final byte CANCEL = 3;
    protected static final byte CANCEL_LIST = 4;
    protected static final byte SEND = 5;   
-   //protected static final byte MORE = 6;
-   
-   protected static final byte CHANGE_RATE = 6;
-   
+   protected static final byte CHANGE_RATE = 6; 
    protected static final byte SEND_TRANSACTION = 7;
    protected static final byte GET_ID_BLOCK = 8;
    protected static final byte RECOVER_DELIVERIES = 9;
-   //protected static final byte CONFIRM_DELIVERY = 10;
+   protected static final byte CANCEL_INFLIGHT_MESSAGES = 10;
    
-   
+
  
 
    // The response codes - start from 100
@@ -367,7 +364,18 @@
    
                   if (trace) { log.trace("wrote getIdBlock()"); }
                }           
-               
+               else if ("cancelInflightMessages".equals(methodName))
+               {
+                  dos.writeByte(CANCEL_INFLIGHT_MESSAGES);
+                  
+                  writeHeader(mi, dos);
+                  
+                  long lastDeliveryId = ((Long)mi.getArguments()[0]).longValue();
+                  
+                  dos.writeLong(lastDeliveryId);
+                  
+                  dos.flush();
+               }
                else
                {
                   dos.write(SERIALIZED);
@@ -787,6 +795,26 @@
    
                return request;
             }
+            case CANCEL_INFLIGHT_MESSAGES:
+            {
+               MethodInvocation mi = readHeader(dis);
+               
+               long lastDeliveryId = dis.readLong();
+               
+               Object[] args = new Object[] {new Long(lastDeliveryId)};
+               
+               mi.setArguments(args);
+   
+               InvocationRequest request =
+                  new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
+                                        new MessagingMarshallable(version, mi), null, null, null);
+   
+               if (trace) { log.trace("read cancelInflightMessages()"); }
+   
+               return request;
+               
+               
+            }
             case ID_BLOCK_RESPONSE:
             {
                IDBlock block = new IDBlock();

Modified: trunk/tests/etc/log4j.xml
===================================================================
--- trunk/tests/etc/log4j.xml	2006-12-23 14:48:44 UTC (rev 1854)
+++ trunk/tests/etc/log4j.xml	2006-12-23 18:38:04 UTC (rev 1855)
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
 
-<!-- $Id: log4j.xml 1184 2006-08-03 18:52:12Z ovidiu.feodorov at jboss.com $ -->
+<!-- $Id: log4j.xml 1019 2006-07-17 17:15:04Z timfox $ -->
 
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
 
@@ -14,8 +14,9 @@
           "crash-server", etc.
       -->
       <param name="File" value="${module.output}/logs/messaging-${test.logfile.suffix}.log"/>
+
       <param name="DatePattern" value="'.'yyyy-MM-dd"/>
-      <param name="Threshold" value="TRACE#org.jboss.logging.XLevel"/>
+      <param name="Threshold" value="INFO"/>
 
       <!-- since majority of the tests are ran in fork mode by ant, the log file is overwritten
            for each test. We need to append if we want to preserve a full testsuite run log.
@@ -24,7 +25,7 @@
       <param name="Append" value="true"/>
 
       <layout class="org.apache.log4j.PatternLayout">
-         <param name="ConversionPattern" value="%d{ABSOLUTE} %-5p @%t [%c{1}] %m%n"/>
+         <param name="ConversionPattern" value="%d %-5r %-5p [%c] @%t %m%n"/>
       </layout>
    </appender>
 
@@ -32,9 +33,8 @@
       <errorHandler class="org.jboss.logging.util.OnlyOnceErrorHandler"/>
       <param name="Target" value="System.out"/>
       <param name="Threshold" value="INFO"/>
-      <!-- <param name="Threshold" value="TRACE#org.jboss.logging.XLevel"/> -->
       <layout class="org.apache.log4j.PatternLayout">
-         <param name="ConversionPattern" value="@%t %d{ABSOLUTE} %-5p [%c{1}] %m%n"/>
+         <param name="ConversionPattern" value="%t %d{ABSOLUTE} %-5p [%c{1}] %m%n"/>
       </layout>
    </appender>
 
@@ -46,44 +46,10 @@
       <priority value="WARN"/>
    </category>
 
-   <category name="org.jboss.remoting">
-      <priority value="INFO"/>
-   </category>
-
    <category name="org.jboss">
       <priority value="INFO"/>
    </category>
 
-   <category name="org.jboss.messaging">
-      <priority value="TRACE" class="org.jboss.logging.XLevel"/>
-   </category>
-
-   <category name="org.jboss.jms">
-      <priority value="TRACE" class="org.jboss.logging.XLevel"/>
-   </category>
-
-   <category name="org.jboss.test">
-      <priority value="TRACE" class="org.jboss.logging.XLevel"/>
-   </category>
-
-   <!-- Ignoring trace from these: -->
-
-   <category name="org.jboss.jms.server.remoting.JMSServerInvocationHandler">
-      <priority value="DEBUG"/>
-   </category>
-
-   <category name="org.jboss.messaging.core.plugin.JDBCSupport">
-      <priority value="INFO"/>
-   </category>
-
-   <category name="org.jboss.test.messaging.tools.jmx.MockJBossSecurityManager">
-      <priority value="DEBUG"/>
-   </category>
-
-   <category name="org.jboss.jms.server.remoting.JMSWireFormat">
-      <priority value="DEBUG"/>
-   </category>
-
    <root>
       <appender-ref ref="CONSOLE"/>
       <appender-ref ref="FILE"/>

Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-12-23 14:48:44 UTC (rev 1854)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-12-23 18:38:04 UTC (rev 1855)
@@ -99,12 +99,14 @@
    
    protected Method cancelDeliveryMethod;
    
-   protected Method cancelDeliveriesMethod;
+   protected Method cancelDeliveriesMethod;      
    
    //Consumer
         
    protected Method changeRateMethod;
    
+   protected Method cancelInflightMessagesMethod;
+   
  
    //connection
    
@@ -150,6 +152,8 @@
       //Consumer
             
       changeRateMethod = consumerDelegate.getMethod("changeRate", new Class[] { Float.TYPE });
+      
+      cancelInflightMessagesMethod = consumerDelegate.getMethod("cancelInflightMessages", new Class[] { Long.TYPE });
 
       //Connection
       
@@ -197,7 +201,12 @@
       wf.testChangeRate();
    }
    
+   public void testCancelInflightMessages() throws Exception
+   {
+      wf.testCancelInflightMessages();
+   }
    
+   
    //Connection
    
    public void testSendTransaction() throws Exception
@@ -680,7 +689,92 @@
          
       }  
       
+      public void testCancelInflightMessages() throws Exception
+      {
+         long methodHash = 6236354;
+         
+         int objectId = 543271;
+         
+         MethodInvocation mi = new MethodInvocation(null, methodHash, cancelInflightMessagesMethod, cancelInflightMessagesMethod, null);
+         
+         mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));   
+         
+         long lastDeliveryId = 765;
+         
+         Object[] args = new Object[] { new Long(lastDeliveryId) };
+         
+         mi.setArguments(args);
+         
+         MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
+         
+         InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+         
+         ByteArrayOutputStream bos = new ByteArrayOutputStream();
+         
+         OutputStream oos = new DataOutputStream(bos);
+                  
+         wf.write(ir, oos);
+         
+         oos.flush();
+         
+         byte[] bytes = bos.toByteArray();
+         
+         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+                  
+         DataInputStream dis = new DataInputStream(bis); 
+                 
+         //Check the bytes
+         
+         //First byte should be version
+         assertEquals(77, dis.readByte());
+         
+         //First byte should be CANCEL
+         assertEquals(JMSWireFormat.CANCEL_INFLIGHT_MESSAGES, dis.readByte());
+         
+         //Next int should be objectId
+         assertEquals(objectId, dis.readInt());
+         
+         //Next long should be methodHash
+         assertEquals(methodHash, dis.readLong());
+         
+         //Next should be the lastdeliveryid
+         long l = dis.readLong();
+         
+         assertEquals(lastDeliveryId, l);
+         
+         //Now eos
+         try
+         {
+            dis.readByte();
+            fail("End of stream expected");
+         }
+         catch (EOFException e)
+         {
+            //Ok
+         }
+         
+         bis.reset();
+         
+         InputStream ois = new DataInputStream(bis);
+         
+         InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
+         
+         mm = (MessagingMarshallable)ir2.getParameter();
+         
+         assertEquals(77, mm.getVersion());
+         
+         MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
+         
+         assertEquals(methodHash, mi2.getMethodHash());
+         
+         assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
+         
+         Long l2 = (Long)mi2.getArguments()[0];
+         
+         assertEquals(lastDeliveryId, l2.longValue());
+      }
       
+      
       /*
        * Test that general serializable invocation requests are marshalled correctky
        */




More information about the jboss-cvs-commits mailing list