[jboss-cvs] JBoss Messaging SVN: r1855 - in trunk: src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/remoting tests/etc tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Dec 23 13:38:10 EST 2006
Author: timfox
Date: 2006-12-23 13:38:04 -0500 (Sat, 23 Dec 2006)
New Revision: 1855
Modified:
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
trunk/tests/etc/log4j.xml
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
A few tweaks, new test
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-23 14:48:44 UTC (rev 1854)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-23 18:38:04 UTC (rev 1855)
@@ -38,7 +38,6 @@
import org.jboss.jms.server.endpoint.DeliveryInfo;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.Future;
-import org.jboss.remoting.callback.HandleCallbackException;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -452,11 +451,15 @@
//if we've already sent it - hence the check
startSendingMessageSent = true;
+ if (trace) { log.trace("Telling server to start resume sending messages, buffer size is " + buffer.size()); }
+
sendChangeRateMessage(1);
}
m.incDeliveryCount();
+ if (trace) { log.trace(this + " receive() returning " + m); }
+
return m;
}
@@ -548,12 +551,10 @@
private void sendChangeRateMessage(float newRate)
{
- //FIXME - We should be able to execute this invocation as a true
- //remoting asynchronous invocation - i.e. it is written to the transport
- //and no response is waited for
- //Therefore there is no need to execute it here on a separate thread.
- //Unfortunately remoting does not currently support this so this
- //will be SLOW now.
+ // FIXME - when the latest remoting changes make it into a release, we need
+ //to use server side one way invocations here.
+ //I.e. we want to sent the invocation to the transport on the this thread and
+ //return immediately without waiting for a response.
try
{
consumerDelegate.changeRate(newRate);
@@ -671,7 +672,6 @@
m = (MessageProxy)buffer.removeFirst();
}
- if (trace) { log.trace(this + ".getMessage() returning " + m); }
return m;
}
@@ -768,6 +768,8 @@
{
startSendingMessageSent = true;
+ if (trace) { log.trace("Telling server to start resume sending messages, buffer size is " + buffer.size()); }
+
sendChangeRateMessage(1);
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-23 14:48:44 UTC (rev 1854)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-23 18:38:04 UTC (rev 1855)
@@ -47,9 +47,6 @@
import org.jboss.remoting.callback.HandleCallbackException;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
-import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
-import EDU.oswego.cs.dl.util.concurrent.Sync;
-
/**
* Concrete implementation of ConsumerEndpoint.
*
@@ -221,6 +218,10 @@
try
{
+ //FIXME - when the latest remoting changes make it into a release, we need
+ //to use server side one way callbacks here.
+ //I.e. we want to sent the invocation to the transport on the this thread and
+ //return immediately without waiting for a response.
callbackHandler.handleCallback(callback);
}
catch (HandleCallbackException e)
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-23 14:48:44 UTC (rev 1854)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-23 18:38:04 UTC (rev 1855)
@@ -96,16 +96,13 @@
protected static final byte CANCEL = 3;
protected static final byte CANCEL_LIST = 4;
protected static final byte SEND = 5;
- //protected static final byte MORE = 6;
-
- protected static final byte CHANGE_RATE = 6;
-
+ protected static final byte CHANGE_RATE = 6;
protected static final byte SEND_TRANSACTION = 7;
protected static final byte GET_ID_BLOCK = 8;
protected static final byte RECOVER_DELIVERIES = 9;
- //protected static final byte CONFIRM_DELIVERY = 10;
+ protected static final byte CANCEL_INFLIGHT_MESSAGES = 10;
-
+
// The response codes - start from 100
@@ -367,7 +364,18 @@
if (trace) { log.trace("wrote getIdBlock()"); }
}
-
+ else if ("cancelInflightMessages".equals(methodName))
+ {
+ dos.writeByte(CANCEL_INFLIGHT_MESSAGES);
+
+ writeHeader(mi, dos);
+
+ long lastDeliveryId = ((Long)mi.getArguments()[0]).longValue();
+
+ dos.writeLong(lastDeliveryId);
+
+ dos.flush();
+ }
else
{
dos.write(SERIALIZED);
@@ -787,6 +795,26 @@
return request;
}
+ case CANCEL_INFLIGHT_MESSAGES:
+ {
+ MethodInvocation mi = readHeader(dis);
+
+ long lastDeliveryId = dis.readLong();
+
+ Object[] args = new Object[] {new Long(lastDeliveryId)};
+
+ mi.setArguments(args);
+
+ InvocationRequest request =
+ new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
+ new MessagingMarshallable(version, mi), null, null, null);
+
+ if (trace) { log.trace("read cancelInflightMessages()"); }
+
+ return request;
+
+
+ }
case ID_BLOCK_RESPONSE:
{
IDBlock block = new IDBlock();
Modified: trunk/tests/etc/log4j.xml
===================================================================
--- trunk/tests/etc/log4j.xml 2006-12-23 14:48:44 UTC (rev 1854)
+++ trunk/tests/etc/log4j.xml 2006-12-23 18:38:04 UTC (rev 1855)
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-<!-- $Id: log4j.xml 1184 2006-08-03 18:52:12Z ovidiu.feodorov at jboss.com $ -->
+<!-- $Id: log4j.xml 1019 2006-07-17 17:15:04Z timfox $ -->
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
@@ -14,8 +14,9 @@
"crash-server", etc.
-->
<param name="File" value="${module.output}/logs/messaging-${test.logfile.suffix}.log"/>
+
<param name="DatePattern" value="'.'yyyy-MM-dd"/>
- <param name="Threshold" value="TRACE#org.jboss.logging.XLevel"/>
+ <param name="Threshold" value="INFO"/>
<!-- since majority of the tests are ran in fork mode by ant, the log file is overwritten
for each test. We need to append if we want to preserve a full testsuite run log.
@@ -24,7 +25,7 @@
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{ABSOLUTE} %-5p @%t [%c{1}] %m%n"/>
+ <param name="ConversionPattern" value="%d %-5r %-5p [%c] @%t %m%n"/>
</layout>
</appender>
@@ -32,9 +33,8 @@
<errorHandler class="org.jboss.logging.util.OnlyOnceErrorHandler"/>
<param name="Target" value="System.out"/>
<param name="Threshold" value="INFO"/>
- <!-- <param name="Threshold" value="TRACE#org.jboss.logging.XLevel"/> -->
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="@%t %d{ABSOLUTE} %-5p [%c{1}] %m%n"/>
+ <param name="ConversionPattern" value="%t %d{ABSOLUTE} %-5p [%c{1}] %m%n"/>
</layout>
</appender>
@@ -46,44 +46,10 @@
<priority value="WARN"/>
</category>
- <category name="org.jboss.remoting">
- <priority value="INFO"/>
- </category>
-
<category name="org.jboss">
<priority value="INFO"/>
</category>
- <category name="org.jboss.messaging">
- <priority value="TRACE" class="org.jboss.logging.XLevel"/>
- </category>
-
- <category name="org.jboss.jms">
- <priority value="TRACE" class="org.jboss.logging.XLevel"/>
- </category>
-
- <category name="org.jboss.test">
- <priority value="TRACE" class="org.jboss.logging.XLevel"/>
- </category>
-
- <!-- Ignoring trace from these: -->
-
- <category name="org.jboss.jms.server.remoting.JMSServerInvocationHandler">
- <priority value="DEBUG"/>
- </category>
-
- <category name="org.jboss.messaging.core.plugin.JDBCSupport">
- <priority value="INFO"/>
- </category>
-
- <category name="org.jboss.test.messaging.tools.jmx.MockJBossSecurityManager">
- <priority value="DEBUG"/>
- </category>
-
- <category name="org.jboss.jms.server.remoting.JMSWireFormat">
- <priority value="DEBUG"/>
- </category>
-
<root>
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-12-23 14:48:44 UTC (rev 1854)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-12-23 18:38:04 UTC (rev 1855)
@@ -99,12 +99,14 @@
protected Method cancelDeliveryMethod;
- protected Method cancelDeliveriesMethod;
+ protected Method cancelDeliveriesMethod;
//Consumer
protected Method changeRateMethod;
+ protected Method cancelInflightMessagesMethod;
+
//connection
@@ -150,6 +152,8 @@
//Consumer
changeRateMethod = consumerDelegate.getMethod("changeRate", new Class[] { Float.TYPE });
+
+ cancelInflightMessagesMethod = consumerDelegate.getMethod("cancelInflightMessages", new Class[] { Long.TYPE });
//Connection
@@ -197,7 +201,12 @@
wf.testChangeRate();
}
+ public void testCancelInflightMessages() throws Exception
+ {
+ wf.testCancelInflightMessages();
+ }
+
//Connection
public void testSendTransaction() throws Exception
@@ -680,7 +689,92 @@
}
+ public void testCancelInflightMessages() throws Exception
+ {
+ long methodHash = 6236354;
+
+ int objectId = 543271;
+
+ MethodInvocation mi = new MethodInvocation(null, methodHash, cancelInflightMessagesMethod, cancelInflightMessagesMethod, null);
+
+ mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
+
+ long lastDeliveryId = 765;
+
+ Object[] args = new Object[] { new Long(lastDeliveryId) };
+
+ mi.setArguments(args);
+
+ MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
+
+ InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ OutputStream oos = new DataOutputStream(bos);
+
+ wf.write(ir, oos);
+
+ oos.flush();
+
+ byte[] bytes = bos.toByteArray();
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+
+ DataInputStream dis = new DataInputStream(bis);
+
+ //Check the bytes
+
+ //First byte should be version
+ assertEquals(77, dis.readByte());
+
+ //First byte should be CANCEL
+ assertEquals(JMSWireFormat.CANCEL_INFLIGHT_MESSAGES, dis.readByte());
+
+ //Next int should be objectId
+ assertEquals(objectId, dis.readInt());
+
+ //Next long should be methodHash
+ assertEquals(methodHash, dis.readLong());
+
+ //Next should be the lastdeliveryid
+ long l = dis.readLong();
+
+ assertEquals(lastDeliveryId, l);
+
+ //Now eos
+ try
+ {
+ dis.readByte();
+ fail("End of stream expected");
+ }
+ catch (EOFException e)
+ {
+ //Ok
+ }
+
+ bis.reset();
+
+ InputStream ois = new DataInputStream(bis);
+
+ InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
+
+ mm = (MessagingMarshallable)ir2.getParameter();
+
+ assertEquals(77, mm.getVersion());
+
+ MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
+
+ assertEquals(methodHash, mi2.getMethodHash());
+
+ assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
+
+ Long l2 = (Long)mi2.getArguments()[0];
+
+ assertEquals(lastDeliveryId, l2.longValue());
+ }
+
/*
* Test that general serializable invocation requests are marshalled correctky
*/
More information about the jboss-cvs-commits
mailing list