[jboss-cvs] JBoss Messaging SVN: r3545 - in trunk: src/main/org/jboss/messaging/core/remoting/impl/invm and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jan 8 10:26:31 EST 2008
Author: jmesnil
Date: 2008-01-08 10:26:30 -0500 (Tue, 08 Jan 2008)
New Revision: 3545
Modified:
trunk/src/main/org/jboss/messaging/core/remoting/Client.java
trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
Log:
* moved the responsibility to set a correlationID from the Client to the NIOSession objects
* added a correlation counter to NIOSession implementations + test in ClientTestBase
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Client.java 2008-01-08 14:13:48 UTC (rev 3544)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Client.java 2008-01-08 15:26:30 UTC (rev 3545)
@@ -41,7 +41,7 @@
// By default, a blocking request will timeout after 5 seconds
private int blockingRequestTimeout = 5;
private TimeUnit blockingRequestTimeUnit = SECONDS;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -95,14 +95,10 @@
assert packet != null;
checkConnected();
- // FIXME: must use a real counter for correlation ID
- packet.setCorrelationID(System.nanoTime());
-
try
{
- AbstractPacket response = (AbstractPacket) session.writeAndBlock(
- packet.getCorrelationID(), packet, blockingRequestTimeout,
- blockingRequestTimeUnit);
+ AbstractPacket response = (AbstractPacket) session.writeAndBlock(packet,
+ blockingRequestTimeout, blockingRequestTimeUnit);
return response;
} catch (Throwable t)
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java 2008-01-08 14:13:48 UTC (rev 3544)
+++ trunk/src/main/org/jboss/messaging/core/remoting/NIOSession.java 2008-01-08 15:26:30 UTC (rev 3545)
@@ -8,6 +8,8 @@
import java.util.concurrent.TimeUnit;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -21,7 +23,7 @@
void write(Object object);
- Object writeAndBlock(long requestID, Object object, long timeout,
+ Object writeAndBlock(AbstractPacket packet, long timeout,
TimeUnit timeUnit) throws Throwable;
boolean isConnected();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java 2008-01-08 14:13:48 UTC (rev 3544)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java 2008-01-08 15:26:30 UTC (rev 3545)
@@ -32,6 +32,7 @@
private long id;
private ExecutorService executor;
+ private long correlationCounter;
// Static --------------------------------------------------------
@@ -42,6 +43,7 @@
// FIXME have a real ID
this.id = System.currentTimeMillis();
this.executor = Executors.newSingleThreadExecutor();
+ this.correlationCounter = 0;
}
// Public --------------------------------------------------------
@@ -81,13 +83,12 @@
});
}
- public Object writeAndBlock(long requestID, final Object request,
+ public Object writeAndBlock(final AbstractPacket request,
long timeout, TimeUnit timeUnit) throws Throwable
{
- assert request instanceof AbstractPacket;
-
+ request.setCorrelationID(correlationCounter++);
Future<AbstractPacket> future = executor
- .submit(new PacketDispatcherCallable((AbstractPacket) request));
+ .submit(new PacketDispatcherCallable(request));
return future.get(timeout, timeUnit);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-01-08 14:13:48 UTC (rev 3544)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-01-08 15:26:30 UTC (rev 3545)
@@ -12,6 +12,7 @@
import org.apache.mina.filter.reqres.Request;
import org.apache.mina.filter.reqres.Response;
import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -27,6 +28,8 @@
private final IoSession session;
+ private long correlationCounter;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -36,6 +39,7 @@
assert session != null;
this.session = session;
+ correlationCounter = 0;
}
// Public --------------------------------------------------------
@@ -50,10 +54,11 @@
session.write(object);
}
- public Object writeAndBlock(long requestID, Object object, long timeout,
+ public Object writeAndBlock(AbstractPacket packet, long timeout,
TimeUnit timeUnit) throws Throwable
{
- Request req = new Request(requestID, object, timeout, timeUnit);
+ packet.setCorrelationID(correlationCounter++);
+ Request req = new Request(packet.getCorrelationID(), packet, timeout, timeUnit);
session.write(req);
Response response = req.awaitResponse();
return response.getMessage();
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java 2008-01-08 14:13:48 UTC (rev 3544)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/ClientTestBase.java 2008-01-08 15:26:30 UTC (rev 3545)
@@ -156,6 +156,23 @@
}
}
+ public void testCorrelationCounter() throws Exception
+ {
+ TextPacket request = new TextPacket("testSendBlocking");
+ request.setVersion((byte) 1);
+ request.setTargetID(serverPacketHandler.getID());
+
+ AbstractPacket receivedPacket = client.sendBlocking(request);
+ long correlationID = request.getCorrelationID();
+
+ assertNotNull(receivedPacket);
+ assertEquals(request.getCorrelationID(), receivedPacket.getCorrelationID());
+
+ receivedPacket = client.sendBlocking(request);
+ assertEquals(correlationID + 1, request.getCorrelationID());
+ assertEquals(correlationID + 1, receivedPacket.getCorrelationID());
+ }
+
public void testClientHandlePacketSentByServer() throws Exception
{
TestPacketHandler clientHandler = new TestPacketHandler();
More information about the jboss-cvs-commits
mailing list