[jboss-cvs] JBoss Messaging SVN: r2002 - in trunk: src/main/org/jboss/jms/client/remoting and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Jan 20 07:37:39 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-01-20 07:37:39 -0500 (Sat, 20 Jan 2007)
New Revision: 2002
Added:
trunk/tests/src/org/jboss/test/messaging/jms/HTTPTransportTest.java
Modified:
trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
trunk/src/main/org/jboss/jms/tx/ResourceManager.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-699
Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -104,7 +104,7 @@
*/
public Object invoke(Invocation invocation) throws Throwable
{
- if (trace) { log.trace(this + " invoking " + ((MethodInvocation)invocation).getMethod().getName() + " on server"); }
+ String methodName = ((MethodInvocation)invocation).getMethod().getName();
invocation.getMetaData().addMetaData(Dispatcher.DISPATCHER,
Dispatcher.OID,
@@ -115,11 +115,27 @@
byte version = getState().getVersionToUse().getProviderIncrementingVersion();
MessagingMarshallable request = new MessagingMarshallable(version, invocation);
- MessagingMarshallable response = (MessagingMarshallable)client.invoke(request, null);
+ // select invocations ought to be sent "one way" for increased performance
+ if ("changeRate".equals(methodName))
+ {
+ if (trace) { log.trace(this + " invoking " + methodName + " asynchronously on server"); }
- if (trace) { log.trace(this + " got server response for " + ((MethodInvocation)invocation).getMethod().getName()); }
+ client.invokeOneway(request);
- return response.getLoad();
+ if (trace) { log.trace(this + " asynchronously invoked " + methodName + " on server, no response expected"); }
+
+ return null;
+ }
+ else
+ {
+ if (trace) { log.trace(this + " invoking " + methodName + " synchronously on server"); }
+
+ MessagingMarshallable response = (MessagingMarshallable)client.invoke(request, null);
+
+ if (trace) { log.trace(this + " got server response for " + methodName); }
+
+ return response.getLoad();
+ }
}
// Initializable implemenation ------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -508,7 +508,7 @@
//if we've already sent it - hence the check
startSendingMessageSent = true;
- if (trace) { log.trace("Telling server to start resume sending messages, buffer size is " + buffer.size()); }
+ if (trace) { log.trace("telling server to start resume sending messages, buffer size is " + buffer.size()); }
sendChangeRateMessage(1);
}
@@ -610,12 +610,10 @@
private void sendChangeRateMessage(float newRate)
{
- // FIXME - when the latest remoting changes make it into a release, we need
- //to use server side one way invocations here.
- //I.e. we want to sent the invocation to the transport on the this thread and
- //return immediately without waiting for a response.
try
{
+ // this invocation will be sent asynchronously to the server; it's DelegateSupport.invoke()
+ // job to detect it and turn it into a remoting one way invocation.
consumerDelegate.changeRate(newRate);
}
catch (JMSException e)
@@ -821,8 +819,8 @@
}
- //Tell the server we need more messages - but we don't want to keep sending the message
- //if we've already sent it - hence the check
+ // Tell the server we need more messages - but we don't want to keep sending the message
+ // if we've already sent it - hence the check
if (!startSendingMessageSent && buffer.size() <= minBufferSize)
{
startSendingMessageSent = true;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -113,6 +113,11 @@
return consumerId;
}
+ public String toString()
+ {
+ return "ClientDelivery[" + msg + "]";
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -243,10 +243,9 @@
try
{
- // FIXME - when the latest remoting changes make it into a release, we need to use
- // server side one way callbacks here. I.e. we want to sent the invocation to the
- // transport on the this thread and return immediately without waiting for a response.
- callbackHandler.handleCallback(callback);
+ // one way invocation, no acknowledgment sent back by the client
+ if (trace) { log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously"); }
+ callbackHandler.handleCallbackOneway(callback);
}
catch (HandleCallbackException e)
{
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -55,6 +55,7 @@
import org.jboss.remoting.InvocationResponse;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.invocation.InternalInvocation;
+import org.jboss.remoting.invocation.OnewayInvocation;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
import org.jboss.serial.io.JBossObjectInputStream;
@@ -101,10 +102,7 @@
protected static final byte GET_ID_BLOCK = 8;
protected static final byte RECOVER_DELIVERIES = 9;
protected static final byte CANCEL_INFLIGHT_MESSAGES = 10;
-
-
-
// The response codes - start from 100
protected static final byte MESSAGE_DELIVERY = 100;
@@ -145,14 +143,14 @@
}
else
{
- //Further sanity check
+ // Further sanity check
if (out instanceof ObjectOutputStream)
{
- throw new IllegalArgumentException("Invalid stream - are you sure you have configured socket wrappers?");
+ throw new IllegalArgumentException("Invalid stream - are you sure you have " +
+ "configured socket wrappers?");
}
- //This would be the case for the HTTP transport for example
- //Wrap the stream
+ // This would be the case for the HTTP transport for example. Wrap the stream.
//TODO Ideally remoting would let us wrap this before invoking the marshaller
//but this does not appear to be possible
@@ -169,12 +167,20 @@
InvocationRequest req = (InvocationRequest)obj;
- Object param;
+ Object param = req.getParameter();
- // Unwrap Callback.
- if (req.getParameter() instanceof InternalInvocation)
+ if (param instanceof OnewayInvocation)
{
- InternalInvocation ii = (InternalInvocation) req.getParameter();
+ // an oneway invocation is only using the first slot in its parameter array so we're
+ // taking some shortcuts here.
+ param = ((OnewayInvocation)param).getParameters()[0];
+ }
+
+ if (param instanceof InternalInvocation)
+ {
+ // unwrap callback
+
+ InternalInvocation ii = (InternalInvocation)param;
Object[] params = ii.getParameters();
if (params != null && params.length > 0 && params[0] instanceof Callback)
@@ -190,20 +196,12 @@
param = callback.getParameter();
}
}
- else
- {
- param = req.getParameter();
- }
}
- else if (req.getParameter() instanceof MessagingMarshallable)
+ else if (param instanceof MessagingMarshallable)
{
- param = ((MessagingMarshallable)req.getParameter()).getLoad();
+ param = ((MessagingMarshallable)param).getLoad();
}
- else
- {
- param = req.getParameter();
- }
-
+
if (trace) { log.trace("param is " + param); }
if (param instanceof MethodInvocation)
@@ -242,7 +240,7 @@
dos.flush();
- if (trace) { log.trace("wrote activate()"); }
+ if (trace) { log.trace("wrote changeRate()"); }
}
else if ("acknowledgeDelivery".equals(methodName))
{
@@ -403,7 +401,7 @@
}
else if (param instanceof ClientDelivery)
{
- //Message delivery callback
+ // Message delivery callback
if (trace) { log.trace("DeliveryRunnable"); }
@@ -518,10 +516,9 @@
dos.writeUTF(resp.getSessionId());
dos.writeInt(callbackList.size());
- Iterator it = callbackList.iterator();
- while (it.hasNext())
+ for(Iterator i = callbackList.iterator(); i.hasNext(); )
{
- Callback callback = (Callback)it.next();
+ Callback callback = (Callback)i.next();
// We don't use acknowledgeable push callbacks
@@ -529,6 +526,8 @@
ClientDelivery delivery = (ClientDelivery)mm.getLoad();
delivery.write(dos);
dos.flush();
+
+ if (trace) { log.trace("wrote delivery " + delivery); }
}
}
else
@@ -571,14 +570,13 @@
}
else
{
- //Further sanity check
+ // Further sanity check
if (in instanceof ObjectInputStream)
{
throw new IllegalArgumentException("Invalid stream - are you sure you have configured socket wrappers?");
}
- //This would be the case for the HTTP transport for example
- //Wrap the stream
+ // This would be the case for the HTTP transport for example. Wrap the stream
//TODO Ideally remoting would let us wrap this before invoking the marshaller
//but this does not appear to be possible
@@ -631,6 +629,8 @@
}
case CHANGE_RATE:
{
+ // asynchronous invocation
+
MethodInvocation mi = readHeader(dis);
float f = dis.readFloat();
@@ -638,12 +638,14 @@
Object[] args = new Object[] {new Float(f)};
mi.setArguments(args);
+
+ OnewayInvocation oi = new OnewayInvocation(new MessagingMarshallable(version, mi));
InvocationRequest request =
new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
- new MessagingMarshallable(version, mi), null, null, null);
+ oi, null, null, null);
- if (trace) { log.trace("read activate()"); }
+ if (trace) { log.trace("read changeRate()"); }
return request;
}
@@ -898,19 +900,24 @@
}
case MESSAGE_DELIVERY:
{
+ // asynchronous invocation
+
String sessionId = dis.readUTF();
ClientDelivery dr = new ClientDelivery();
dr.read(dis);
- // Recreate Callback.
+ // recreate callback
MessagingMarshallable mm = new MessagingMarshallable(version, dr);
Callback callback = new Callback(mm);
InternalInvocation ii
= new InternalInvocation(InternalInvocation.HANDLECALLBACK, new Object[]{callback});
+
+ OnewayInvocation oi = new OnewayInvocation(ii);
+
InvocationRequest request
= new InvocationRequest(sessionId, CallbackManager.JMS_CALLBACK_SUBSYSTEM,
- ii, null, null, null);
+ oi, null, null, null);
if (trace) { log.trace("read callback()"); }
@@ -928,6 +935,9 @@
ClientDelivery delivery = new ClientDelivery();
delivery.read(dis);
+
+ if (trace) { log.trace("read delivery " + delivery); }
+
MessagingMarshallable mm = new MessagingMarshallable(version, delivery);
Callback callback = new Callback(mm);
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -29,7 +29,6 @@
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
-import javax.jms.TransactionRolledBackException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/build.xml 2007-01-20 12:37:39 UTC (rev 2002)
@@ -352,14 +352,10 @@
<target name="tests" depends="tests-jar, prepare-testdirs, clear-test-logs">
<antcall target="crash-tests"/>
<antcall target="invm-tests"/>
-
<antcall target="remote-tests"/> <!-- default remoting configuration (socket) -->
- <!--
<antcall target="remote-tests">
<param name="test.remoting" value="http"/>
</antcall>
-
- -->
<antcall target="clustering-tests"/>
</target>
Added: trunk/tests/src/org/jboss/test/messaging/jms/HTTPTransportTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/HTTPTransportTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/HTTPTransportTest.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -0,0 +1,139 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+import javax.naming.InitialContext;
+import javax.jms.ConnectionFactory;
+import javax.jms.Queue;
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+
+/**
+ * This class contain tests that only make sense for a HTTP transport. They will be ignored for
+ * any other kind of transport.
+ *
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class HTTPTransportTest extends MessagingTestCase
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private InitialContext ic;
+ private ConnectionFactory cf;
+ private Queue queue;
+ private ObjectName queueObjectName;
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public HTTPTransportTest(String name)
+ {
+ super(name);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public void testCallbackList() throws Exception
+ {
+ if (!"http".equals(ServerManagement.getRemotingTransport(0)))
+ {
+ log.warn("The server we are connecting to did not start its remoting service " +
+ "with HTTP transport enabled, skipping test ...");
+ return;
+ }
+
+ // send a bunch of messages and let them accumulate in the queue
+ Connection conn = cf.createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = session.createProducer(queue);
+
+ int messageCount = 20;
+
+ for(int i = 0; i < messageCount; i++)
+ {
+ Message m = session.createTextMessage("krakatau" + i);
+ prod.send(m);
+ }
+
+ conn.close();
+
+ // make sure messages made it to the queue
+ Integer count = (Integer)ServerManagement.getAttribute(queueObjectName, "MessageCount");
+ assertEquals(messageCount, count.intValue());
+
+
+ conn = cf.createConnection();
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = session.createConsumer(queue);
+
+ conn.start();
+
+ // messages will be sent in bulk from server side, on the next HTTP client listner poll
+
+ for(int i = 0; i < messageCount; i++)
+ {
+ TextMessage t = (TextMessage)cons.receive(2000);
+ assertNotNull(t);
+ assertEquals("krakatau" + i, t.getText());
+ }
+
+ conn.close();
+
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ ServerManagement.start("all");
+
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+ ServerManagement.deployQueue("HTTPTestQueue");
+
+ cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ queue = (Queue)ic.lookup("/queue/HTTPTestQueue");
+
+ queueObjectName =
+ new ObjectName("jboss.messaging.destination:service=Queue,name=HTTPTestQueue");
+
+ ServerManagement.invoke(queueObjectName, "removeAllMessages", new Object[0], new String[0]);
+
+ log.debug("setup done");
+ }
+
+ public void tearDown() throws Exception
+ {
+ ServerManagement.undeployQueue("HTTPTestQueue");
+
+ ic.close();
+
+ super.tearDown();
+ }
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+}
Property changes on: trunk/tests/src/org/jboss/test/messaging/jms/HTTPTransportTest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -1179,8 +1179,6 @@
}
-
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -1096,6 +1096,12 @@
return s;
}
+ public static String getRemotingTransport(int serverIndex) throws Exception
+ {
+ insureStarted(serverIndex);
+ return servers[serverIndex].getServer().getRemotingTransport();
+ }
+
// Attributes ----------------------------------------------------
// Constructors --------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -819,6 +819,11 @@
return config.getDatabaseType();
}
+ public String getRemotingTransport()
+ {
+ return config.getRemotingTransport();
+ }
+
public boolean isClustered()
{
return config.isClustered();
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -256,6 +256,11 @@
return sc.getDatabaseType();
}
+ public String getRemotingTransport()
+ {
+ return sc.getRemotingTransport();
+ }
+
public void log(int level, String text)
{
if (ServerManagement.FATAL == level)
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -246,6 +246,11 @@
return server.getDatabaseType();
}
+ public String getRemotingTransport()
+ {
+ return server.getRemotingTransport();
+ }
+
public void log(int level, String text) throws Exception
{
server.log(level, text);
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java 2007-01-20 10:38:29 UTC (rev 2001)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java 2007-01-20 12:37:39 UTC (rev 2002)
@@ -108,6 +108,11 @@
String getDatabaseType() throws Exception;
/**
+ * @return one of "socket", "http", ...
+ */
+ String getRemotingTransport() throws Exception;
+
+ /**
* Only for remote use!
*/
void log(int level, String text) throws Exception;
More information about the jboss-cvs-commits
mailing list