[jboss-cvs] JBoss Messaging SVN: r4801 - in branches/Branch_Message_Chunking_new: src/main/org/jboss/messaging/core/message/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 14 05:50:35 EDT 2008


Author: ataylor
Date: 2008-08-14 05:50:34 -0400 (Thu, 14 Aug 2008)
New Revision: 4801

Modified:
   branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/PerfExample.java
   branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
Log:
more tidying up

Modified: branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-08-14 09:50:34 UTC (rev 4801)
@@ -21,27 +21,16 @@
    */
 package org.jboss.jms.example;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.naming.InitialContext;
-
 import org.jboss.jms.util.PerfParams;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.util.TokenBucketLimiter;
 import org.jboss.messaging.util.TokenBucketLimiterImpl;
 
+import javax.jms.*;
+import javax.naming.InitialContext;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * A simple example that can be used to gather basic performance measurements.
  * 
@@ -169,6 +158,10 @@
       producer.setDeliveryMode(deliveryMode);
       BytesMessage bytesMessage = session.createBytesMessage();
       byte[] payload = new byte[messageSize];
+      for(int i = 0; i < payload.length; i++)
+      {
+         payload[i] = (byte) i;
+      }
       bytesMessage.writeBytes(payload);
       
       final int modulo = 2000;

Modified: branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-08-14 09:50:34 UTC (rev 4801)
@@ -49,7 +49,7 @@
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          final MessageProducer producer = session.createProducer(queue);
          final BytesMessage message = session.createBytesMessage();
-         byte[] bytes = new byte[40000];
+         byte[] bytes = new byte[2048];
          for(int i = 0; i < bytes.length; i++)
          {
             bytes[i] = (byte) i;
@@ -57,9 +57,12 @@
          byte[] newbytes = new byte[bytes.length];
          message.writeBytes(bytes);
          log.info("sending message to queue");
+         /*for(int i = 0; i < 100000; i++)
+         {
+            producer.send(message);
+         }*/
          producer.send(message);
-         //producer.send(message);
-
+         producer.send(message);
          MessageConsumer messageConsumer = session.createConsumer(queue);
          connection.start();
          BytesMessage message2 = (BytesMessage) messageConsumer.receive(50000000);
@@ -77,7 +80,7 @@
                throw new RuntimeException(i+"");
             }
          }
-         /*message2 = (BytesMessage) messageConsumer.receive(50000000);
+         message2 = (BytesMessage) messageConsumer.receive(50000000);
          log.info("message received from queue");
          message2.readBytes(newbytes);
          //log.info("message = " + new String(newbytes));
@@ -91,22 +94,7 @@
             {
                throw new RuntimeException(i+"");
             }
-         }*/
-         /*message2 = (BytesMessage) messageConsumer.receive(50000000);
-         log.info("message received from queue");
-         message2.readBytes(newbytes);
-         //log.info("message = " + new String(newbytes));
-         for (int i = 0; i < newbytes.length; i++)
-         {
-            byte newbyte = newbytes[i];
-            System.out.print(newbyte);
-            System.out.print(" = ");
-            System.out.println(bytes[i]);
-            if(bytes[i] != newbyte)
-            {
-               throw new RuntimeException(i+"");
-            }
-         }*/
+         }
       }
       catch (Exception e)
       {

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-08-14 09:50:34 UTC (rev 4801)
@@ -22,14 +22,12 @@
 
 package org.jboss.messaging.core.message.impl;
 
-import org.apache.mina.core.buffer.IoBuffer;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.remoting.Encoder;
 import org.jboss.messaging.core.remoting.MessagingBuffer;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.EncoderImpl;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import static org.jboss.messaging.util.DataConstants.*;
 import org.jboss.messaging.util.SimpleString;
@@ -157,10 +155,11 @@
          {
             MessagingBuffer buffer = buffers.get(i);
             //todo fix horrible hack, there is a mina bug so slicing the buffer doesnt seem to work, so i need to make a copy in case of multiple sends
-            IoBuffer copied = IoBuffer.allocate(buffer.limit());
+            /*IoBuffer copied = IoBuffer.allocate(buffer.limit());
             copied.put(buffer.array(), 0, buffer.limit());
             copied.setAutoExpand(true);
-            buff.putBuffer(new IoBufferWrapper(copied));
+            buff.putBuffer(new IoBufferWrapper(copied));*/
+            buff.putBuffer(buffer.slice());
          }
       }
    }
@@ -197,7 +196,7 @@
 
     abstract protected void resetBodyForUse();
 
-    public void encode(Encoder buff)
+   public void encode(Encoder buff)
    {
       buff.putSimpleString(getDestination());
       buff.putByte(getType());
@@ -212,7 +211,14 @@
          for (int i = 0; i < buffers.size(); i++)
          {
             MessagingBuffer buffer = buffers.get(i);
-            buff.putBytes(buffer.array(), 9, buffersSizes.get(i));
+            try
+            {
+               buff.putBytes(buffer.array(), 9, buffersSizes.get(i));
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
          }
       }
       else

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java	2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java	2008-08-14 09:50:34 UTC (rev 4801)
@@ -452,8 +452,8 @@
    public void putBuffer(MessagingBuffer body)
    {
       buffers.add(body);
-      buffersSizes.add(body.limit() - 5);
-      bodySize += body.limit() - 5;
+      buffersSizes.add(body.limit() - 9);
+      bodySize += body.limit() - 9;
    }
 
    public void transferRemainingBuffer(Encoder buffer, int len)

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-08-14 09:50:34 UTC (rev 4801)
@@ -305,10 +305,15 @@
       MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
 
       packet.encode(buffer, packetIdCreator.getAndIncrement());
-
+      log.info("sending packet type:" + packet.getType());
       for (MessagingBuffer buff : packet.getBuffers())
       {
-         transportConnection.write(buff);   
+         if(packet.getType() == 100)
+         {
+            System.out.println(buff.getInt() + ":" + buff.getInt() + ":" + buff.getBoolean() + ":" + buff.getByte());
+            buff.position(0);
+         }
+         transportConnection.write(buff);
       }
    }
   

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java	2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java	2008-08-14 09:50:34 UTC (rev 4801)
@@ -111,6 +111,7 @@
       if (lastPacket)
       {
          packetsToDeliver.get(connectionID).remove(packetId);
+         log.info("handling packet type:" + packet.getType());
          packet.decode();
          if (executorFactory != null)
          {

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-08-13 14:43:30 UTC (rev 4800)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-08-14 09:50:34 UTC (rev 4801)
@@ -30,6 +30,7 @@
 import static org.jboss.messaging.util.DataConstants.SIZE_INT;
 
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -130,7 +131,6 @@
    public static final byte PROD_RECEIVETOKENS = 101;
    
    public static final byte RECEIVE_MSG = 110;
-   private int packetid;
 
    // Static --------------------------------------------------------
 
@@ -193,16 +193,15 @@
       setTargetID(other.getResponseTargetID());
    }
 
-   public void encode(MessagingBuffer buffer, int packetId)
-   {      
-      this.packetid = packetId;
+   public void encode(MessagingBuffer buffer,int packetId)
+   {
       //The standard header fields
       buffers.add(buffer);
       //buffersSizes.add(-1);
       //length filled at end
       getCurrentBuffer().putInt(-1);
       //are we the last packet, filled at end.
-      getCurrentBuffer().putInt(packetId);
+      getCurrentBuffer().putInt(-1);
       getCurrentBuffer().putBoolean(false);
       putByte(type);
       putInt(commandID);
@@ -219,7 +218,13 @@
          {
             messagingBuffer.flip();
          }
-         buffersSizes.add(i, messagingBuffer.limit() - SIZE_INT);
+        // buffersSizes.add(i, messagingBuffer.limit() - SIZE_INT);
+         messagingBuffer.putInt(0, messagingBuffer.limit() - SIZE_INT);
+         messagingBuffer.putInt(SIZE_INT, packetId);
+         if(i == buffers.size() - 1)
+         {
+            messagingBuffer.putBoolean(SIZE_INT * 2, true);
+         }
       }
    }
 
@@ -264,20 +269,11 @@
    }
    public List<MessagingBuffer> getBuffers()
    {
-      for (int i = 0; i < buffers.size(); i++)
+      List<MessagingBuffer> buffers = new ArrayList<MessagingBuffer>();
+      for (int i = 0; i < this.buffers.size(); i++)
       {
-         MessagingBuffer buffer = buffers.get(i);
-         if(buffer.position() != 0)
-         {
-            buffer.flip();
-         }
-         buffer.putInt(0, buffersSizes.get(i));
-         buffer.putInt(SIZE_INT, packetid);
-         if(i == buffers.size() - 1)
-         {
-            buffer.putBoolean(SIZE_INT * 2, true);
-         }
-
+         MessagingBuffer buffer = this.buffers.get(i).slice();
+         buffers.add(buffer);
       }
       return buffers;
    }




More information about the jboss-cvs-commits mailing list