[jboss-cvs] jboss-jms/src/main/org/jboss/jms/server/remoting ...
Timothy Fox
tim.fox at jboss.com
Mon Jul 17 13:14:46 EDT 2006
User: timfox
Date: 06/07/17 13:14:46
Modified: src/main/org/jboss/jms/server/remoting JMSWireFormat.java
MetaDataConstants.java
Log:
Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
Revision Changes Path
1.21 +99 -171 jboss-jms/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: JMSWireFormat.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -b -r1.20 -r1.21
--- JMSWireFormat.java 24 Jun 2006 09:05:37 -0000 1.20
+++ JMSWireFormat.java 17 Jul 2006 17:14:46 -0000 1.21
@@ -34,11 +34,12 @@
import org.jboss.aop.Dispatcher;
import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.jms.client.remoting.CallbackServerFactory;
+import org.jboss.jms.client.remoting.HandleMessageResponse;
import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.Version;
-import org.jboss.jms.server.endpoint.DeliveryRunnable;
+import org.jboss.jms.server.endpoint.ClientDelivery;
+import org.jboss.jms.tx.AckInfo;
import org.jboss.jms.tx.TransactionRequest;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.message.MessageFactory;
@@ -80,23 +81,25 @@
// The request codes - start from zero
protected static final byte SERIALIZED = 0;
- protected static final byte SEND = 1;
- protected static final byte ACTIVATE = 2;
- protected static final byte DEACTIVATE = 3;
- protected static final byte GETMESSAGENOW = 4;
- protected static final byte ACKNOWLEDGE = 5;
+
+ protected static final byte ACKNOWLEDGE = 1;
+ protected static final byte ACKNOWLEDGE_BATCH = 2;
+ protected static final byte SEND = 3;
+
+ protected static final byte CANCEL_DELIVERIES = 4;
+ protected static final byte MORE = 5;
+
protected static final byte SEND_TRANSACTION = 6;
protected static final byte GET_ID_BLOCK = 7;
- protected static final byte CANCEL_DELIVERY = 8;
- protected static final byte CANCEL_DELIVERIES = 9;
+
// The response codes - start from 100
protected static final byte CALLBACK = 100;
protected static final byte NULL_RESPONSE = 101;
- protected static final byte MESSAGE_RESPONSE = 102;
- protected static final byte ID_BLOCK_RESPONSE = 103;
- protected static final byte DEACTIVATE_RESPONSE = 104;
+ protected static final byte ID_BLOCK_RESPONSE = 102;
+ protected static final byte HANDLE_MESSAGE_RESPONSE = 103;
+
// Static --------------------------------------------------------
@@ -175,9 +178,9 @@
if (trace) { log.trace("wrote send()"); }
}
- else if ("activate".equals(methodName))
+ else if ("more".equals(methodName))
{
- oos.writeByte(ACTIVATE);
+ oos.writeByte(MORE);
writeHeader(mi, oos);
@@ -185,35 +188,37 @@
if (trace) { log.trace("wrote activate()"); }
}
- else if ("deactivate".equals(methodName))
+ else if ("acknowledge".equals(methodName))
{
- oos.writeByte(DEACTIVATE);
+ oos.writeByte(ACKNOWLEDGE);
writeHeader(mi, oos);
+ AckInfo ack = (AckInfo)mi.getArguments()[0];
+
+ ack.writeExternal(oos);
+
oos.flush();
- if (trace) { log.trace("wrote deactivate()"); }
+ if (trace) { log.trace("wrote acknowledge()"); }
}
- else if ("getMessageNow".equals(methodName))
+ else if ("acknowledgeBatch".equals(methodName))
{
- oos.writeByte(GETMESSAGENOW);
+ oos.writeByte(ACKNOWLEDGE_BATCH);
writeHeader(mi, oos);
- boolean wait = ((Boolean)mi.getArguments()[0]).booleanValue();
+ List acks = (List)mi.getArguments()[0];
- oos.writeBoolean(wait);
+ oos.writeInt(acks.size());
- oos.flush();
+ Iterator iter = acks.iterator();
- if (trace) { log.trace("wrote getMessageNow()"); }
- }
- else if ("acknowledge".equals(methodName))
+ while (iter.hasNext())
{
- oos.writeByte(ACKNOWLEDGE);
-
- writeHeader(mi, oos);
+ AckInfo ack = (AckInfo)iter.next();
+ ack.writeExternal(oos);
+ }
oos.flush();
@@ -247,20 +252,6 @@
if (trace) { log.trace("wrote getIdBlock()"); }
}
- else if ("cancelDelivery".equals(methodName))
- {
- oos.writeByte(CANCEL_DELIVERY);
-
- writeHeader(mi, oos);
-
- long id = ((Long)mi.getArguments()[0]).longValue();
-
- oos.writeLong(id);
-
- oos.flush();
-
- if (trace) { log.trace("wrote cancelDelivery)"); }
- }
else if ("cancelDeliveries".equals(methodName) && mi.getArguments() != null)
{
oos.writeByte(CANCEL_DELIVERIES);
@@ -275,8 +266,8 @@
while (iter.hasNext())
{
- Long l = (Long)iter.next();
- oos.writeLong(l.longValue());
+ AckInfo ack = (AckInfo)iter.next();
+ ack.writeExternal(oos);
}
oos.flush();
@@ -293,27 +284,17 @@
if (trace) { log.trace("wrote using standard serialization"); }
}
}
- else if (param instanceof DeliveryRunnable)
+ else if (param instanceof ClientDelivery)
{
//Message delivery callback
if (trace) { log.trace("DeliveryRunnable"); }
- DeliveryRunnable dr = (DeliveryRunnable)param;
+ ClientDelivery dr = (ClientDelivery)param;
oos.writeByte(CALLBACK);
- int consumerID = dr.getConsumerID();
-
- MessageProxy del = dr.getMessageProxy();
-
- oos.writeInt(consumerID);
-
- oos.writeByte(del.getMessage().getType());
-
- oos.writeInt(del.getDeliveryCount());
-
- del.getMessage().writeExternal(oos);
+ dr.writeExternal(oos);
oos.flush();
@@ -358,23 +339,6 @@
if (trace) { log.trace("wrote null response"); }
}
- else if (res instanceof MessageProxy)
- {
- // return value from getMessageNow
- oos.write(MESSAGE_RESPONSE);
-
- MessageProxy mp = (MessageProxy)res;
-
- oos.writeByte(mp.getMessage().getType());
-
- oos.writeInt(mp.getDeliveryCount());
-
- mp.getMessage().writeExternal(oos);
-
- oos.flush();
-
- if (trace) { log.trace("wrote message response"); }
- }
else if (res instanceof IdBlock)
{
//Return value from getMessageNow
@@ -386,20 +350,20 @@
oos.flush();
- if (trace) { log.trace("wrote message response"); }
+ if (trace) { log.trace("wrote id block response"); }
}
- else if (res instanceof Long)
+ else if (res instanceof HandleMessageResponse)
{
- //Return value from deactivate
- oos.write(DEACTIVATE_RESPONSE);
+ //Return value from delivering messages to client
+ oos.write(HANDLE_MESSAGE_RESPONSE);
- Long l = (Long)res;
+ HandleMessageResponse response = (HandleMessageResponse)res;
- oos.writeLong(l.longValue());
+ response.writeExternal(oos);
oos.flush();
- if (trace) { log.trace("wrote deactivate response"); }
+ if (trace) { log.trace("wrote handle message response"); }
}
else
{
@@ -483,7 +447,7 @@
return request;
}
- case ACTIVATE:
+ case MORE:
{
MethodInvocation mi = readHeader(ois);
@@ -495,37 +459,6 @@
return request;
}
- case DEACTIVATE:
- {
- MethodInvocation mi = readHeader(ois);
-
- InvocationRequest request =
- new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
- new MessagingMarshallable(version, mi), null, null, null);
-
-
- if (trace) { log.trace("read deactivate()"); }
-
- return request;
- }
- case GETMESSAGENOW:
- {
- MethodInvocation mi = readHeader(ois);
-
- boolean wait = ois.readBoolean();
-
- Object[] args = new Object[] {Boolean.valueOf(wait)};
-
- mi.setArguments(args);
-
- InvocationRequest request =
- new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
- new MessagingMarshallable(version, mi), null, null, null);
-
- if (trace) { log.trace("read getMessageNow()"); }
-
- return request;
- }
case SEND_TRANSACTION:
{
MethodInvocation mi = readHeader(ois);
@@ -568,6 +501,14 @@
{
MethodInvocation mi = readHeader(ois);
+ AckInfo info = new AckInfo();
+
+ info.readExternal(ois);
+
+ Object[] args = new Object[] {info};
+
+ mi.setArguments(args);
+
InvocationRequest request =
new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
new MessagingMarshallable(version, mi), null, null, null);
@@ -576,13 +517,24 @@
return request;
}
- case CANCEL_DELIVERY:
+ case ACKNOWLEDGE_BATCH:
{
MethodInvocation mi = readHeader(ois);
- long id = ois.readLong();
+ int num = ois.readInt();
+
+ List acks = new ArrayList(num);
- Object[] args = new Object[] {new Long(id)};
+ for (int i = 0; i < num; i++)
+ {
+ AckInfo ack = new AckInfo();
+
+ ack.readExternal(ois);
+
+ acks.add(ack);
+ }
+
+ Object[] args = new Object[] {acks};
mi.setArguments(args);
@@ -590,7 +542,7 @@
new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
new MessagingMarshallable(version, mi), null, null, null);
- if (trace) { log.trace("read cancelDelivery()"); }
+ if (trace) { log.trace("read acknowledge()"); }
return request;
}
@@ -600,16 +552,18 @@
int size = ois.readInt();
- List ids = new ArrayList(size);
+ List acks = new ArrayList(size);
for (int i = 0; i < size; i++)
{
- long id = ois.readLong();
+ AckInfo ack = new AckInfo();
+
+ ack.readExternal(ois);
- ids.add(new Long(id));
+ acks.add(ack);
}
- Object[] args = new Object[] {ids};
+ Object[] args = new Object[] {acks};
mi.setArguments(args);
@@ -621,24 +575,6 @@
return request;
}
- case MESSAGE_RESPONSE:
- {
- byte type = ois.readByte();
-
- int deliveryCount = ois.readInt();
-
- JBossMessage m = (JBossMessage)MessageFactory.createMessage(type);
-
- m.readExternal(ois);
-
- MessageProxy md = JBossMessage.createThinDelegate(m, deliveryCount);
-
- InvocationResponse resp = new InvocationResponse(null, new MessagingMarshallable(version, md), false, null);
-
- if (trace) { log.trace("read message response"); }
-
- return resp;
- }
case ID_BLOCK_RESPONSE:
{
IdBlock block = new IdBlock();
@@ -647,17 +583,19 @@
InvocationResponse resp = new InvocationResponse(null, new MessagingMarshallable(version, block), false, null);
- if (trace) { log.trace("read message response"); }
+ if (trace) { log.trace("read id block response"); }
return resp;
}
- case DEACTIVATE_RESPONSE:
+ case HANDLE_MESSAGE_RESPONSE:
{
- long id = ois.readLong();
+ HandleMessageResponse res = new HandleMessageResponse();
+
+ res.readExternal(ois);
- InvocationResponse resp = new InvocationResponse(null, new MessagingMarshallable(version, new Long(id)), false, null);
+ InvocationResponse resp = new InvocationResponse(null, new MessagingMarshallable(version, res), false, null);
- if (trace) { log.trace("read deactivate response"); }
+ if (trace) { log.trace("read handle message response"); }
return resp;
}
@@ -672,19 +610,9 @@
}
case CALLBACK:
{
- int consumerID = ois.readInt();
-
- byte type = ois.readByte();
-
- int deliveryCount = ois.readInt();
-
- JBossMessage m = (JBossMessage)MessageFactory.createMessage(type);
-
- m.readExternal(ois);
-
- MessageProxy md = JBossMessage.createThinDelegate(m, deliveryCount);
+ ClientDelivery dr = new ClientDelivery();
- DeliveryRunnable dr = new DeliveryRunnable(md, consumerID, null, trace);
+ dr.readExternal(ois);
InvocationRequest request =
new InvocationRequest(null, CallbackServerFactory.JMS_CALLBACK_SUBSYSTEM,
1.12 +3 -1 jboss-jms/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MetaDataConstants.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -b -r1.11 -r1.12
--- MetaDataConstants.java 20 Apr 2006 20:42:26 -0000 1.11
+++ MetaDataConstants.java 17 Jul 2006 17:14:46 -0000 1.12
@@ -27,7 +27,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision 1.1 $</tt>
*
- * $Id: MetaDataConstants.java,v 1.11 2006/04/20 20:42:26 timfox Exp $
+ * $Id: MetaDataConstants.java,v 1.12 2006/07/17 17:14:46 timfox Exp $
*/
public class MetaDataConstants
{
@@ -39,6 +39,8 @@
public static final String CONSUMER_ID = "CONSUMER_ID";
+ public static final String PREFETCH_SIZE = "BUFFER_SIZE";
+
public static final String CLIENT_CONNECTION_ID = "CC_ID";
public static final String VERSION_NUMBER = "VERSION_NUMBER";
More information about the jboss-cvs-commits
mailing list