[jboss-cvs] JBoss Messaging SVN: r3545 - in trunk: src/main/org/jboss/messaging/core/remoting/impl/invm and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jan 8 10:26:31 EST 2008


Author: jmesnil
Date: 2008-01-08 10:26:30 -0500 (Tue, 08 Jan 2008)
New Revision: 3545

Modified:
   trunk/src/main/org/jboss/messaging/core/remoting/Client.java
   trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
Log:
* moved the responsibility to set a correlationID from the Client to the NIOSession objects
* added a correlation counter to NIOSession implementations + test in ClientTestBase


Modified: trunk/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Client.java	2008-01-08 14:13:48 UTC (rev 3544)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Client.java	2008-01-08 15:26:30 UTC (rev 3545)
@@ -41,7 +41,7 @@
    // By default, a blocking request will timeout after 5 seconds
    private int blockingRequestTimeout = 5;
    private TimeUnit blockingRequestTimeUnit = SECONDS;
-
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -95,14 +95,10 @@
       assert packet != null;
       checkConnected();
 
-      // FIXME: must use a real counter for correlation ID
-      packet.setCorrelationID(System.nanoTime());
-
       try
       {
-         AbstractPacket response = (AbstractPacket) session.writeAndBlock(
-               packet.getCorrelationID(), packet, blockingRequestTimeout,
-               blockingRequestTimeUnit);
+         AbstractPacket response = (AbstractPacket) session.writeAndBlock(packet, 
+               blockingRequestTimeout, blockingRequestTimeUnit);
          return response;
       } catch (Throwable t)
       {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java	2008-01-08 14:13:48 UTC (rev 3544)
+++ trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java	2008-01-08 15:26:30 UTC (rev 3545)
@@ -8,6 +8,8 @@
 
 import java.util.concurrent.TimeUnit;
 
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 
@@ -21,7 +23,7 @@
 
    void write(Object object);
 
-   Object writeAndBlock(long requestID, Object object, long timeout,
+   Object writeAndBlock(AbstractPacket packet, long timeout,
          TimeUnit timeUnit) throws Throwable;
 
    boolean isConnected();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java	2008-01-08 14:13:48 UTC (rev 3544)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java	2008-01-08 15:26:30 UTC (rev 3545)
@@ -32,6 +32,7 @@
 
    private long id;
    private ExecutorService executor;
+   private long correlationCounter;
 
    // Static --------------------------------------------------------
 
@@ -42,6 +43,7 @@
       // FIXME have a real ID
       this.id = System.currentTimeMillis();
       this.executor = Executors.newSingleThreadExecutor();
+      this.correlationCounter = 0;
    }
 
    // Public --------------------------------------------------------
@@ -81,13 +83,12 @@
             });
    }
 
-   public Object writeAndBlock(long requestID, final Object request,
+   public Object writeAndBlock(final AbstractPacket request,
          long timeout, TimeUnit timeUnit) throws Throwable
    {
-      assert request instanceof AbstractPacket;
-
+      request.setCorrelationID(correlationCounter++);
       Future<AbstractPacket> future = executor
-            .submit(new PacketDispatcherCallable((AbstractPacket) request));
+            .submit(new PacketDispatcherCallable(request));
       return future.get(timeout, timeUnit);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-01-08 14:13:48 UTC (rev 3544)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-01-08 15:26:30 UTC (rev 3545)
@@ -12,6 +12,7 @@
 import org.apache.mina.filter.reqres.Request;
 import org.apache.mina.filter.reqres.Response;
 import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -27,6 +28,8 @@
 
    private final IoSession session;
 
+   private long correlationCounter;
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -36,6 +39,7 @@
       assert session != null;
 
       this.session = session;
+      correlationCounter = 0;
    }
 
    // Public --------------------------------------------------------
@@ -50,10 +54,11 @@
       session.write(object);
    }
 
-   public Object writeAndBlock(long requestID, Object object, long timeout,
+   public Object writeAndBlock(AbstractPacket packet, long timeout,
          TimeUnit timeUnit) throws Throwable
    {
-      Request req = new Request(requestID, object, timeout, timeUnit);
+      packet.setCorrelationID(correlationCounter++);
+      Request req = new Request(packet.getCorrelationID(), packet, timeout, timeUnit);
       session.write(req);
       Response response = req.awaitResponse();
       return response.getMessage();

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java	2008-01-08 14:13:48 UTC (rev 3544)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java	2008-01-08 15:26:30 UTC (rev 3545)
@@ -156,6 +156,23 @@
       }
    }
    
+   public void testCorrelationCounter() throws Exception
+   {
+      TextPacket request = new TextPacket("testSendBlocking");
+      request.setVersion((byte) 1);
+      request.setTargetID(serverPacketHandler.getID());
+
+      AbstractPacket receivedPacket = client.sendBlocking(request);
+      long correlationID = request.getCorrelationID();
+      
+      assertNotNull(receivedPacket);      
+      assertEquals(request.getCorrelationID(), receivedPacket.getCorrelationID());
+      
+      receivedPacket = client.sendBlocking(request);
+      assertEquals(correlationID + 1, request.getCorrelationID());
+      assertEquals(correlationID + 1, receivedPacket.getCorrelationID());      
+   }
+
    public void testClientHandlePacketSentByServer() throws Exception
    {
       TestPacketHandler clientHandler = new TestPacketHandler();




More information about the jboss-cvs-commits mailing list