[jboss-cvs] jboss-jms/src/main/org/jboss/jms/server/remoting ...

Timothy Fox tim.fox at jboss.com
Mon Jul 17 13:14:46 EDT 2006


  User: timfox  
  Date: 06/07/17 13:14:46

  Modified:    src/main/org/jboss/jms/server/remoting   JMSWireFormat.java
                        MetaDataConstants.java
  Log:
  Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
  
  Revision  Changes    Path
  1.21      +99 -171   jboss-jms/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: JMSWireFormat.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -b -r1.20 -r1.21
  --- JMSWireFormat.java	24 Jun 2006 09:05:37 -0000	1.20
  +++ JMSWireFormat.java	17 Jul 2006 17:14:46 -0000	1.21
  @@ -34,11 +34,12 @@
   import org.jboss.aop.Dispatcher;
   import org.jboss.aop.joinpoint.MethodInvocation;
   import org.jboss.jms.client.remoting.CallbackServerFactory;
  +import org.jboss.jms.client.remoting.HandleMessageResponse;
   import org.jboss.jms.message.JBossMessage;
  -import org.jboss.jms.message.MessageProxy;
   import org.jboss.jms.server.ServerPeer;
   import org.jboss.jms.server.Version;
  -import org.jboss.jms.server.endpoint.DeliveryRunnable;
  +import org.jboss.jms.server.endpoint.ClientDelivery;
  +import org.jboss.jms.tx.AckInfo;
   import org.jboss.jms.tx.TransactionRequest;
   import org.jboss.logging.Logger;
   import org.jboss.messaging.core.message.MessageFactory;
  @@ -80,23 +81,25 @@
      // The request codes  - start from zero
   
      protected static final byte SERIALIZED = 0;
  -   protected static final byte SEND = 1;
  -   protected static final byte ACTIVATE = 2;
  -   protected static final byte DEACTIVATE = 3;
  -   protected static final byte GETMESSAGENOW = 4;
  -   protected static final byte ACKNOWLEDGE = 5;
  +   
  +   protected static final byte ACKNOWLEDGE = 1;
  +   protected static final byte ACKNOWLEDGE_BATCH = 2;
  +   protected static final byte SEND = 3;
  +   
  +   protected static final byte CANCEL_DELIVERIES = 4;
  +   protected static final byte MORE = 5;
  +
      protected static final byte SEND_TRANSACTION = 6;
      protected static final byte GET_ID_BLOCK = 7;
  -   protected static final byte CANCEL_DELIVERY = 8;
  -   protected static final byte CANCEL_DELIVERIES = 9;
  + 
   
      // The response codes - start from 100
   
      protected static final byte CALLBACK = 100;
      protected static final byte NULL_RESPONSE = 101;
  -   protected static final byte MESSAGE_RESPONSE = 102;
  -   protected static final byte ID_BLOCK_RESPONSE = 103;
  -   protected static final byte DEACTIVATE_RESPONSE = 104;
  +   protected static final byte ID_BLOCK_RESPONSE = 102;
  +   protected static final byte HANDLE_MESSAGE_RESPONSE = 103;
  +
   
      // Static --------------------------------------------------------
   
  @@ -175,9 +178,9 @@
   
                  if (trace) { log.trace("wrote send()"); }
               }
  -            else if ("activate".equals(methodName))
  +            else if ("more".equals(methodName))
               {
  -               oos.writeByte(ACTIVATE);
  +               oos.writeByte(MORE);
   
                  writeHeader(mi, oos);
   
  @@ -185,35 +188,37 @@
   
                  if (trace) { log.trace("wrote activate()"); }
               }
  -            else if ("deactivate".equals(methodName))
  +            else if ("acknowledge".equals(methodName))
               {
  -               oos.writeByte(DEACTIVATE);
  +               oos.writeByte(ACKNOWLEDGE);
   
                  writeHeader(mi, oos);
   
  +               AckInfo ack = (AckInfo)mi.getArguments()[0];
  +               
  +               ack.writeExternal(oos);
  +
                  oos.flush();
   
  -               if (trace) { log.trace("wrote deactivate()"); }
  +               if (trace) { log.trace("wrote acknowledge()"); }
               }
  -            else if ("getMessageNow".equals(methodName))
  +            else if ("acknowledgeBatch".equals(methodName))
               {
  -               oos.writeByte(GETMESSAGENOW);
  +               oos.writeByte(ACKNOWLEDGE_BATCH);
   
                  writeHeader(mi, oos);
   
  -               boolean wait = ((Boolean)mi.getArguments()[0]).booleanValue();
  +               List acks = (List)mi.getArguments()[0];
   
  -               oos.writeBoolean(wait);
  +               oos.writeInt(acks.size());
   
  -               oos.flush();
  +               Iterator iter = acks.iterator();
   
  -               if (trace) { log.trace("wrote getMessageNow()"); }
  -            }
  -            else if ("acknowledge".equals(methodName))
  +               while (iter.hasNext())
               {
  -               oos.writeByte(ACKNOWLEDGE);
  -
  -               writeHeader(mi, oos);
  +                  AckInfo ack = (AckInfo)iter.next();
  +                  ack.writeExternal(oos);
  +               }
   
                  oos.flush();
   
  @@ -247,20 +252,6 @@
   
                  if (trace) { log.trace("wrote getIdBlock()"); }
               }
  -            else if ("cancelDelivery".equals(methodName))
  -            {
  -               oos.writeByte(CANCEL_DELIVERY);
  -
  -               writeHeader(mi, oos);
  -
  -               long id = ((Long)mi.getArguments()[0]).longValue();
  -
  -               oos.writeLong(id);
  -
  -               oos.flush();
  -
  -               if (trace) { log.trace("wrote cancelDelivery)"); }
  -            }
               else if ("cancelDeliveries".equals(methodName) && mi.getArguments() != null)
               {
                  oos.writeByte(CANCEL_DELIVERIES);
  @@ -275,8 +266,8 @@
   
                  while (iter.hasNext())
                  {
  -                  Long l = (Long)iter.next();
  -                  oos.writeLong(l.longValue());
  +                  AckInfo ack = (AckInfo)iter.next();
  +                  ack.writeExternal(oos);
                  }
   
                  oos.flush();
  @@ -293,27 +284,17 @@
                  if (trace) { log.trace("wrote using standard serialization"); }
               }
            }
  -         else if (param instanceof DeliveryRunnable)
  +         else if (param instanceof ClientDelivery)
            {
               //Message delivery callback
   
               if (trace) { log.trace("DeliveryRunnable"); }
   
  -            DeliveryRunnable dr = (DeliveryRunnable)param;
  +            ClientDelivery dr = (ClientDelivery)param;
   
               oos.writeByte(CALLBACK);
   
  -            int consumerID = dr.getConsumerID();
  -
  -            MessageProxy del = dr.getMessageProxy();
  -
  -            oos.writeInt(consumerID);
  -
  -            oos.writeByte(del.getMessage().getType());
  -
  -            oos.writeInt(del.getDeliveryCount());
  -
  -            del.getMessage().writeExternal(oos);
  +            dr.writeExternal(oos);
   
               oos.flush();
   
  @@ -358,23 +339,6 @@
   
               if (trace) { log.trace("wrote null response"); }
            }
  -         else if (res instanceof MessageProxy)
  -         {
  -            // return value from getMessageNow
  -            oos.write(MESSAGE_RESPONSE);
  -
  -            MessageProxy mp = (MessageProxy)res;
  -
  -            oos.writeByte(mp.getMessage().getType());
  -
  -            oos.writeInt(mp.getDeliveryCount());
  -
  -            mp.getMessage().writeExternal(oos);
  -
  -            oos.flush();
  -
  -            if (trace) { log.trace("wrote message response"); }
  -         }
            else if (res instanceof IdBlock)
            {
               //Return value from getMessageNow
  @@ -386,20 +350,20 @@
   
               oos.flush();
   
  -            if (trace) { log.trace("wrote message response"); }
  +            if (trace) { log.trace("wrote id block response"); }
            }
  -         else if (res instanceof Long)
  +         else if (res instanceof HandleMessageResponse)
            {
  -            //Return value from deactivate
  -            oos.write(DEACTIVATE_RESPONSE);
  +            //Return value from delivering messages to client
  +            oos.write(HANDLE_MESSAGE_RESPONSE);
   
  -            Long l = (Long)res;
  +            HandleMessageResponse response = (HandleMessageResponse)res;
   
  -            oos.writeLong(l.longValue());
  +            response.writeExternal(oos);
   
               oos.flush();
   
  -            if (trace) { log.trace("wrote deactivate response"); }
  +            if (trace) { log.trace("wrote handle message response"); }
            }
            else
            {
  @@ -483,7 +447,7 @@
   
               return request;
            }
  -         case ACTIVATE:
  +         case MORE:
            {
               MethodInvocation mi = readHeader(ois);
   
  @@ -495,37 +459,6 @@
   
               return request;
            }
  -         case DEACTIVATE:
  -         {
  -            MethodInvocation mi = readHeader(ois);
  -
  -            InvocationRequest request =
  -               new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
  -                                     new MessagingMarshallable(version, mi), null, null, null);
  -
  -
  -            if (trace) { log.trace("read deactivate()"); }
  -
  -            return request;
  -         }
  -         case GETMESSAGENOW:
  -         {
  -            MethodInvocation mi = readHeader(ois);
  -
  -            boolean wait = ois.readBoolean();
  -
  -            Object[] args = new Object[] {Boolean.valueOf(wait)};
  -
  -            mi.setArguments(args);
  -
  -            InvocationRequest request =
  -               new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
  -                                     new MessagingMarshallable(version, mi), null, null, null);
  -
  -            if (trace) { log.trace("read getMessageNow()"); }
  -
  -            return request;
  -         }
            case SEND_TRANSACTION:
            {
               MethodInvocation mi = readHeader(ois);
  @@ -568,6 +501,14 @@
            {
               MethodInvocation mi = readHeader(ois);
   
  +            AckInfo info = new AckInfo();
  +            
  +            info.readExternal(ois);
  +            
  +            Object[] args = new Object[] {info};
  +
  +            mi.setArguments(args);
  +
               InvocationRequest request =
                  new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
                                        new MessagingMarshallable(version, mi), null, null, null);
  @@ -576,13 +517,24 @@
   
               return request;
            }
  -         case CANCEL_DELIVERY:
  +         case ACKNOWLEDGE_BATCH:
            {
               MethodInvocation mi = readHeader(ois);
   
  -            long id = ois.readLong();
  +            int num = ois.readInt();
  +            
  +            List acks = new ArrayList(num);
   
  -            Object[] args = new Object[] {new Long(id)};
  +            for (int i = 0; i < num; i++)
  +            {
  +               AckInfo ack = new AckInfo();
  +               
  +               ack.readExternal(ois);
  +               
  +               acks.add(ack);
  +            }
  +                        
  +            Object[] args = new Object[] {acks};
   
               mi.setArguments(args);
   
  @@ -590,7 +542,7 @@
                  new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
                                        new MessagingMarshallable(version, mi), null, null, null);
   
  -            if (trace) { log.trace("read cancelDelivery()"); }
  +            if (trace) { log.trace("read acknowledge()"); }
   
               return request;
            }
  @@ -600,16 +552,18 @@
   
               int size = ois.readInt();
   
  -            List ids = new ArrayList(size);
  +            List acks = new ArrayList(size);
   
               for (int i = 0; i < size; i++)
               {
  -               long id = ois.readLong();
  +               AckInfo ack = new AckInfo();
  +               
  +               ack.readExternal(ois);
   
  -               ids.add(new Long(id));
  +               acks.add(ack);
               }
   
  -            Object[] args = new Object[] {ids};
  +            Object[] args = new Object[] {acks};
   
               mi.setArguments(args);
   
  @@ -621,24 +575,6 @@
   
               return request;
            }
  -         case MESSAGE_RESPONSE:
  -         {
  -            byte type = ois.readByte();
  -
  -            int deliveryCount = ois.readInt();
  -
  -            JBossMessage m = (JBossMessage)MessageFactory.createMessage(type);
  -
  -            m.readExternal(ois);
  -
  -            MessageProxy md = JBossMessage.createThinDelegate(m, deliveryCount);
  -
  -            InvocationResponse resp = new InvocationResponse(null, new MessagingMarshallable(version, md), false, null);
  -
  -            if (trace) { log.trace("read message response"); }
  -
  -            return resp;
  -         }
            case ID_BLOCK_RESPONSE:
            {
               IdBlock block = new IdBlock();
  @@ -647,17 +583,19 @@
   
               InvocationResponse resp = new InvocationResponse(null, new MessagingMarshallable(version, block), false, null);
   
  -            if (trace) { log.trace("read message response"); }
  +            if (trace) { log.trace("read id block response"); }
   
               return resp;
            }
  -         case DEACTIVATE_RESPONSE:
  +         case HANDLE_MESSAGE_RESPONSE:
            {
  -            long id = ois.readLong();
  +            HandleMessageResponse res = new HandleMessageResponse();
  +
  +            res.readExternal(ois);
   
  -            InvocationResponse resp = new InvocationResponse(null, new MessagingMarshallable(version, new Long(id)), false, null);
  +            InvocationResponse resp = new InvocationResponse(null, new MessagingMarshallable(version, res), false, null);
   
  -            if (trace) { log.trace("read deactivate response"); }
  +            if (trace) { log.trace("read handle message response"); }
   
               return resp;
            }
  @@ -672,19 +610,9 @@
            }
            case CALLBACK:
            {
  -            int consumerID = ois.readInt();
  -
  -            byte type = ois.readByte();
  -
  -            int deliveryCount = ois.readInt();
  -
  -            JBossMessage m = (JBossMessage)MessageFactory.createMessage(type);
  -
  -            m.readExternal(ois);
  -
  -            MessageProxy md = JBossMessage.createThinDelegate(m, deliveryCount);
  +            ClientDelivery dr = new ClientDelivery();
   
  -            DeliveryRunnable dr = new DeliveryRunnable(md, consumerID, null, trace);
  +            dr.readExternal(ois);
   
               InvocationRequest request =
                  new InvocationRequest(null, CallbackServerFactory.JMS_CALLBACK_SUBSYSTEM,
  
  
  
  1.12      +3 -1      jboss-jms/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: MetaDataConstants.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -b -r1.11 -r1.12
  --- MetaDataConstants.java	20 Apr 2006 20:42:26 -0000	1.11
  +++ MetaDataConstants.java	17 Jul 2006 17:14:46 -0000	1.12
  @@ -27,7 +27,7 @@
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
    * @version <tt>$Revision 1.1 $</tt>
    *
  - * $Id: MetaDataConstants.java,v 1.11 2006/04/20 20:42:26 timfox Exp $
  + * $Id: MetaDataConstants.java,v 1.12 2006/07/17 17:14:46 timfox Exp $
    */
   public class MetaDataConstants
   {
  @@ -39,6 +39,8 @@
      
      public static final String CONSUMER_ID = "CONSUMER_ID";   
      
  +   public static final String PREFETCH_SIZE = "BUFFER_SIZE";
  +   
      public static final String CLIENT_CONNECTION_ID = "CC_ID";
      
      public static final String VERSION_NUMBER = "VERSION_NUMBER";
  
  
  



More information about the jboss-cvs-commits mailing list