[jboss-cvs] JBoss Messaging SVN: r4814 - in branches/Branch_Message_Chunking_new: examples/jms/src/org/jboss/jms/example and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Aug 18 04:59:08 EDT 2008


Author: ataylor
Date: 2008-08-18 04:59:07 -0400 (Mon, 18 Aug 2008)
New Revision: 4814

Modified:
   branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
   branches/Branch_Message_Chunking_new/messaging.ipr
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
Log:
more tidying up

Modified: branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/examples/jms/src/org/jboss/jms/example/QueueExample.java	2008-08-18 08:59:07 UTC (rev 4814)
@@ -49,7 +49,7 @@
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          final MessageProducer producer = session.createProducer(queue);
          final BytesMessage message = session.createBytesMessage();
-         byte[] bytes = new byte[2048];
+         byte[] bytes = new byte[1024];
          for(int i = 0; i < bytes.length; i++)
          {
             bytes[i] = (byte) i;
@@ -62,7 +62,7 @@
             producer.send(message);
          }*/
          producer.send(message);
-         producer.send(message);
+         //producer.send(message);
          MessageConsumer messageConsumer = session.createConsumer(queue);
          connection.start();
          BytesMessage message2 = (BytesMessage) messageConsumer.receive(50000000);
@@ -80,7 +80,7 @@
                throw new RuntimeException(i+"");
             }
          }
-         message2 = (BytesMessage) messageConsumer.receive(50000000);
+         /*message2 = (BytesMessage) messageConsumer.receive(50000000);
          log.info("message received from queue");
          message2.readBytes(newbytes);
          //log.info("message = " + new String(newbytes));
@@ -94,7 +94,7 @@
             {
                throw new RuntimeException(i+"");
             }
-         }
+         }*/
       }
       catch (Exception e)
       {

Modified: branches/Branch_Message_Chunking_new/messaging.ipr
===================================================================
--- branches/Branch_Message_Chunking_new/messaging.ipr	2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/messaging.ipr	2008-08-18 08:59:07 UTC (rev 4814)
@@ -463,14 +463,12 @@
         <root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant-junit.jar!/" />
         <root url="jar:///home/andy/devtools/apache-ant-1.7.0/lib/ant.jar!/" />
         <root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M3-20080730.120633-1.jar!/" />
-        <root url="file://$PROJECT_DIR$/examples/jms/config" />
       </CLASSES>
       <JAVADOC />
       <SOURCES>
         <root url="jar://$PROJECT_DIR$/thirdparty/jboss/jnpserver/lib/jnpserver-sources.jar!/" />
         <root url="jar://$PROJECT_DIR$/thirdparty/apache-mina/lib/mina-core-2.0.0-M3-20080730.120633-1-sources.jar!/" />
       </SOURCES>
-      <jarDirectory url="file://$PROJECT_DIR$/examples/jms/config" recursive="false" />
       <jarDirectory url="file://$PROJECT_DIR$/src/etc" recursive="false" />
       <jarDirectory url="file://$PROJECT_DIR$/tests/etc" recursive="false" />
       <jarDirectory url="file://$PROJECT_DIR$/tests/jms-tests/etc" recursive="false" />

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java	2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/ClientMessage.java	2008-08-18 08:59:07 UTC (rev 4814)
@@ -40,6 +40,4 @@
    long getDeliveryID();
    
    void setDeliveryID(long deliveryID);
-
-   void prepareBuffers();
 }

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-08-18 08:59:07 UTC (rev 4814)
@@ -103,15 +103,4 @@
    {
       return this.deliveryID;
    }
-
-
-   public void prepareBuffers()
-   {
-      bodySize = 0;
-      for (MessagingBuffer buffer : buffers)
-      {
-         buffersSizes.add(buffer.limit() - 9);
-         bodySize += (buffer.limit() - 9);
-      }
-   }
 }

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-08-18 08:59:07 UTC (rev 4814)
@@ -21,63 +21,23 @@
  */ 
 package org.jboss.messaging.core.client.impl;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.client.ClientBrowser;
-import org.jboss.messaging.core.client.ClientConnectionFactory;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.*;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.MessagingBuffer;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCancelMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TokenBucketLimiterImpl;
 
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
+import java.util.concurrent.Executor;
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -531,21 +491,21 @@
 
    public ClientMessage createClientMessage(byte type, boolean durable, long expiration, long timestamp, byte priority)
    {
-      MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE);
+      MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE + PacketImpl.BUFFER_HEADER_SIZE);
       
       return new ClientMessageImpl(type, durable, expiration, timestamp, priority, body);
    }
 
    public ClientMessage createClientMessage(byte type, boolean durable)
    {
-      MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE);
+      MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE + PacketImpl.BUFFER_HEADER_SIZE);
       
       return new ClientMessageImpl(type, durable, body);
    }
 
    public ClientMessage createClientMessage(boolean durable)
    {
-      MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE);
+      MessagingBuffer body = remotingConnection.createBuffer(INITIAL_MESSAGE_BODY_SIZE + PacketImpl.BUFFER_HEADER_SIZE);
       
       return new ClientMessageImpl(durable, body);
    }

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-08-18 08:59:07 UTC (rev 4814)
@@ -105,7 +105,7 @@
       this.priority = priority;
       buffers.add(body);
       pos = body.position();
-      bodySize = body.limit();
+      //bodySize = body.limit();
       //fill in the headers
       body.putInt(-1);
       body.putInt(-1);
@@ -154,12 +154,14 @@
          for (int i = 0; i < buffers.size(); i++)
          {
             MessagingBuffer buffer = buffers.get(i);
-            //todo fix horrible hack, there is a mina bug so slicing the buffer doesnt seem to work, so i need to make a copy in case of multiple sends
-            /*IoBuffer copied = IoBuffer.allocate(buffer.limit());
-            copied.put(buffer.array(), 0, buffer.limit());
-            copied.setAutoExpand(true);
-            buff.putBuffer(new IoBufferWrapper(copied));*/
-            buff.putBuffer(buffer.slice());
+            //todo it would be nice to always send the same buffer, but this is problematic since we need to change the packetid for each send!!!
+            //buff.putBuffer(buffer.slice());
+
+            MessagingBuffer copy = buffer.createNewBuffer(buffer.limit());
+            copy.putBytes(buffer.array());
+            copy.limit(buffer.limit());
+            copy.flip();
+            buff.putBuffer(copy);
          }
       }
    }
@@ -211,14 +213,7 @@
          for (int i = 0; i < buffers.size(); i++)
          {
             MessagingBuffer buffer = buffers.get(i);
-            try
-            {
-               buff.putBytes(buffer.array(), 9, buffersSizes.get(i));
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-            }
+            buff.putBytes(buffer.array(), PacketImpl.BUFFER_HEADER_SIZE, buffersSizes.get(i));
          }
       }
       else
@@ -254,7 +249,7 @@
       /* Timestamp */ SIZE_LONG +
       /* Priority */ SIZE_BYTE +
       /* PropertySize and Properties */ properties.getEncodeSize() +
-      /* BodySize and Body */ SIZE_INT + getBodySize();
+      /* BodySize and Body */ SIZE_INT + getBodySize() + (buffers.size() * PacketImpl.BUFFER_HEADER_SIZE);
    }
 
 

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java	2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/EncoderImpl.java	2008-08-18 08:59:07 UTC (rev 4814)
@@ -127,7 +127,7 @@
 
    public void putBytes(byte[] bytes, int i, int i1)
    {
-      if(getCurrentBuffer().remaining() > (i1 - i))
+      if(getCurrentBuffer().remaining() >= (i1 - i))
       {
          getCurrentBuffer().putBytes(bytes, i, i1);
          pos += (i1 - i);
@@ -429,7 +429,7 @@
       for (MessagingBuffer buffer : buffers)
       {
          buffer.flip();
-         bodySize += buffer.limit();
+         bodySize += buffer.limit() - PacketImpl.BUFFER_HEADER_SIZE;
       }
       pos = 0;
    }
@@ -452,16 +452,20 @@
    public void putBuffer(MessagingBuffer body)
    {
       buffers.add(body);
-      buffersSizes.add(body.limit() - 9);
-      bodySize += body.limit() - 9;
+      buffersSizes.add(body.limit() - PacketImpl.BUFFER_HEADER_SIZE);
+      bodySize += body.limit() - PacketImpl.BUFFER_HEADER_SIZE;
    }
 
    public void transferRemainingBuffer(Encoder buffer, int len)
    {
-      for(int i = bufferPos + 1; i < buffers.size(); i++)
+      while(buffers.size() > bufferPos + 1)
       {
+         buffer.putBuffer(buffers.remove(bufferPos + 1));
+      }
+      /*for(int i = bufferPos + 1; i < buffers.size(); i++)
+      {
          buffer.putBuffer(buffers.get(i));
-      }
+      }*/
    }
 
    private void checkReadSpace(int size)
@@ -482,7 +486,7 @@
 
    private void setNextFragment()
    {
-      MessagingBuffer buffer = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+      MessagingBuffer buffer = buffers.get(0).createNewBuffer(PacketImpl.INITIAL_BUFFER_SIZE + PacketImpl.BUFFER_HEADER_SIZE);
       buffer.putInt(-1);
       buffer.putInt(-1);
       buffer.putBoolean(false);

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-08-18 08:59:07 UTC (rev 4814)
@@ -302,17 +302,11 @@
          throw new IllegalStateException("Cannot write packet to connection, it is destroyed");
       }
       
-      MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+      MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE + PacketImpl.BUFFER_HEADER_SIZE);
 
       packet.encode(buffer, packetIdCreator.getAndIncrement());
-      log.info("sending packet type:" + packet.getType());
       for (MessagingBuffer buff : packet.getBuffers())
       {
-         if(packet.getType() == 100)
-         {
-            System.out.println(buff.getInt() + ":" + buff.getInt() + ":" + buff.getBoolean() + ":" + buff.getByte());
-            buff.position(0);
-         }
          transportConnection.write(buff);
       }
    }

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java	2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java	2008-08-18 08:59:07 UTC (rev 4814)
@@ -111,7 +111,6 @@
       if (lastPacket)
       {
          packetsToDeliver.get(connectionID).remove(packetId);
-         log.info("handling packet type:" + packet.getType());
          packet.decode();
          if (executorFactory != null)
          {

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-08-18 08:59:07 UTC (rev 4814)
@@ -50,6 +50,8 @@
    public static final long NO_ID_SET = -1L;
    
    public static final int INITIAL_BUFFER_SIZE = 1024;
+
+   public static final int BUFFER_HEADER_SIZE = 9;
    
    private int commandID;
 
@@ -197,11 +199,11 @@
    {
       //The standard header fields
       buffers.add(buffer);
-      //buffersSizes.add(-1);
       //length filled at end
       getCurrentBuffer().putInt(-1);
+      //the packetid filled at end
+      getCurrentBuffer().putInt(-1);
       //are we the last packet, filled at end.
-      getCurrentBuffer().putInt(-1);
       getCurrentBuffer().putBoolean(false);
       putByte(type);
       putInt(commandID);
@@ -218,7 +220,6 @@
          {
             messagingBuffer.flip();
          }
-        // buffersSizes.add(i, messagingBuffer.limit() - SIZE_INT);
          messagingBuffer.putInt(0, messagingBuffer.limit() - SIZE_INT);
          messagingBuffer.putInt(SIZE_INT, packetId);
          if(i == buffers.size() - 1)

Modified: branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java
===================================================================
--- branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java	2008-08-18 08:55:18 UTC (rev 4813)
+++ branches/Branch_Message_Chunking_new/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerSendMessage.java	2008-08-18 08:59:07 UTC (rev 4814)
@@ -82,7 +82,6 @@
    
    public void encodeBody()
    {
-      clientMessage.prepareBuffers();
       clientMessage.encodeForTransport(this);
    }
    




More information about the jboss-cvs-commits mailing list