[jboss-cvs] JBoss Messaging SVN: r5103 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/journal/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 10 18:56:14 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-10-10 18:56:13 -0400 (Fri, 10 Oct 2008)
New Revision: 5103

Modified:
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/chunkmessage/MessageChunkTest.java
Log:
Server2client chunks
(a lot of work for a friday :-) )

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-10 22:56:13 UTC (rev 5103)
@@ -33,8 +33,8 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -57,8 +57,8 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   // TODO This is temporary, make this better
-   private static final int BIG_PACKAGE_SIZE = 10 * 1024;
+   // TODO This is temporary, make this configurable somewhere
+   public static final int BIG_PACKAGE_SIZE = 10 * 1024;
 
    // TODO This is temporary, make this better
    public static final int CHUNK_SIZE = 10 * 1024;
@@ -149,14 +149,14 @@
       doSend(address, msg, 0);
    }
 
-    public void send(final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
+   public void send(final ClientMessage msg, final long scheduleDeliveryTime) throws MessagingException
    {
       checkClosed();
 
       doSend(null, msg, scheduleDeliveryTime);
    }
 
-   public void send(final SimpleString address, final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
+   public void send(final SimpleString address, final ClientMessage msg, final long scheduleDeliveryTime) throws MessagingException
    {
       checkClosed();
 
@@ -305,7 +305,7 @@
       closed = true;
    }
 
-   private void doSend(final SimpleString address, final ClientMessage msg, long scheduledDeliveryTime) throws MessagingException
+   private void doSend(final SimpleString address, final ClientMessage msg, final long scheduledDeliveryTime) throws MessagingException
    {
       if (address != null)
       {
@@ -330,57 +330,17 @@
 
       boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
 
-
       if (msg.getEncodeSize() > BIG_PACKAGE_SIZE)
       {
-         int headerSize = msg.getPropertiesEncodeSize();
+         sendMessageInChunks(msg);
 
-         if (headerSize > BIG_PACKAGE_SIZE)
-         {
-            throw new MessagingException(MessagingException.ILLEGAL_STATE,
-                                         "Header size is too big, use the messageBody for large data");
-         }
-
-         MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(headerSize));
-         msg.encodeProperties(headerBuffer);
-
-         final int bodySize = msg.getBodyEncodeSize();
-
-         int bodyLength = BIG_PACKAGE_SIZE - headerSize;
-
-         MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
-
-         msg.encodeBody(bodyBuffer, 0, bodyLength);
-
-         SessionSendChunkMessage chunk = new SessionSendChunkMessage(id,
-                                                                     headerBuffer.array(),
-                                                                     bodyBuffer.array(),
-                                                                     true,
-                                                                     true);
-
-         channel.sendBlocking(chunk);
-
-         for (int pos = bodyLength; pos < bodySize;)
-         {
-            bodyLength = Math.min(bodySize - pos, CHUNK_SIZE);
-            bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
-
-            msg.encodeBody(bodyBuffer, pos, bodyLength);
-
-            chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, true);
-
-            channel.sendBlocking(chunk);
-
-            pos += bodyLength;
-         }
-
       }
       else
       {
          SessionSendMessage message;
 
-         //check to see if this message need to be scheduled.
-         if(scheduledDeliveryTime <= 0)
+         // check to see if this message need to be scheduled.
+         if (scheduledDeliveryTime <= 0)
          {
             message = new SessionSendMessage(id, msg, sendBlocking);
          }
@@ -412,6 +372,54 @@
       }
    }
 
+   /**
+    * @param msg
+    * @throws MessagingException
+    */
+   private void sendMessageInChunks(final ClientMessage msg) throws MessagingException
+   {
+      int headerSize = msg.getPropertiesEncodeSize();
+
+      if (headerSize > BIG_PACKAGE_SIZE)
+      {
+         throw new MessagingException(MessagingException.ILLEGAL_STATE,
+                                      "Header size is too big, use the messageBody for large data");
+      }
+
+      MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(headerSize));
+      msg.encodeProperties(headerBuffer);
+
+      final int bodySize = msg.getBodySize();
+
+      int bodyLength = BIG_PACKAGE_SIZE - headerSize;
+
+      MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+
+      msg.encodeBody(bodyBuffer, 0, bodyLength);
+
+      SessionSendChunkMessage chunk = new SessionSendChunkMessage(id,
+                                                                  headerBuffer.array(),
+                                                                  bodyBuffer.array(),
+                                                                  bodyLength < bodySize,
+                                                                  true);
+
+      channel.sendBlocking(chunk);
+
+      for (int pos = bodyLength; pos < bodySize;)
+      {
+         bodyLength = Math.min(bodySize - pos, CHUNK_SIZE);
+         bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+
+         msg.encodeBody(bodyBuffer, pos, bodyLength);
+
+         chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, true);
+
+         channel.sendBlocking(chunk);
+
+         pos += bodyLength;
+      }
+   }
+
    private void checkClosed() throws MessagingException
    {
       if (closed)

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-10 22:56:13 UTC (rev 5103)
@@ -25,14 +25,22 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
 
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
  *
@@ -46,6 +54,8 @@
    private static final Logger log = Logger.getLogger(ClientSessionPacketHandler.class);
 
    private final ClientSessionInternal clientSession;
+   
+   private Map<Long, ClientMessage> currentChunk = new ConcurrentHashMap<Long, ClientMessage>();
 
    public ClientSessionPacketHandler(final ClientSessionInternal clientSesssion)
    {     
@@ -68,6 +78,52 @@
                
                break;
             }
+            case SESS_CHUNK_SEND:
+            {
+               System.out.println("received a chunk");
+               SessionSendChunkMessage chunk = (SessionSendChunkMessage) packet;
+               
+               ClientMessage currentChunkMessage = null;
+               
+               if (chunk.getHeader() != null)
+               {
+
+                  MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
+
+                  currentChunkMessage = new ClientMessageImpl();
+                  
+                  currentChunkMessage.decodeProperties(header);
+                  
+                  MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
+                  
+                  currentChunkMessage.setBody(initialBody);
+                  
+                  currentChunk.put(chunk.getTargetID(), currentChunkMessage);
+               }
+               else
+               {
+                  ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
+
+                  currentChunkMessage = currentChunk.get(chunk.getMessageID());
+                  
+                  MessagingBuffer currentBody = currentChunkMessage.getBody();
+                  
+                  MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
+                  
+                  newBody.putBytes(currentBody.array());
+                  newBody.putBytes(body.array());
+                  
+                  currentChunkMessage.setBody(newBody);
+               }
+               
+               if (!chunk.isContinues())
+               {
+                  clientSession.handleReceiveMessage(chunk.getTargetID(), currentChunkMessage);
+               }
+               
+               
+               break;
+            }
             case SESS_RECEIVE_MSG:
             {
                SessionReceiveMessage message = (SessionReceiveMessage) packet;

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-10-10 22:56:13 UTC (rev 5103)
@@ -232,5 +232,10 @@
    {
       return (int)channel.position();
    }
+   
+   public String toString()
+   {
+      return "NIOSequentialFile " + this.fileName;
+   }
 
 }

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java	2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java	2008-10-10 22:56:13 UTC (rev 5103)
@@ -82,7 +82,7 @@
    void decodeProperties(MessagingBuffer buffer);
    
    
-   int getBodyEncodeSize();
+   int getBodySize();
    
    
    // Used on Message chunk

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-10-10 22:56:13 UTC (rev 5103)
@@ -151,7 +151,7 @@
 
    public int getEncodeSize()
    {
-      return getPropertiesEncodeSize() + getBodyEncodeSize();
+      return getPropertiesEncodeSize() + SIZE_INT + getBodySize();
    }
 
    public int getPropertiesEncodeSize()
@@ -165,9 +165,9 @@
       /* PropertySize and Properties */properties.getEncodeSize();
    }
 
-   public int getBodyEncodeSize()
+   public int getBodySize()
    {
-      return /* BodySize and Body */SIZE_INT + body.limit();
+      return /* BodySize and Body */ body.limit();
    }
 
    

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	2008-10-10 22:56:13 UTC (rev 5103)
@@ -24,7 +24,6 @@
 
 import java.nio.ByteBuffer;
 
-import org.jboss.messaging.core.client.impl.ClientProducerImpl;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.ServerLargeMessage;
@@ -77,32 +76,31 @@
    }
 
    @Override
-   public void encodeBody(final MessagingBuffer bufferOut)
+   public void encodeBody(MessagingBuffer bufferOut, int start, int size)
    {
       try
       {
-         ByteBuffer bufferRead = ByteBuffer.allocate(ClientProducerImpl.CHUNK_SIZE);
+         // This could maybe be optimized (maybe reading directly into bufferOut)
+         ByteBuffer bufferRead = ByteBuffer.allocate(size);
          if (!file.isOpen())
          {
             file.open();
          }
 
          int bytesRead = 0;
-         file.position(0);
-         do
-         {
-            bufferRead.clear();
-            bytesRead = file.read(bufferRead);
-            bufferRead.flip();
+         file.position(start);
 
-            if (bytesRead > 0)
-            {
-               bufferOut.putBytes(bufferRead.array(), 0, bytesRead);
-            }
+         bytesRead = file.read(bufferRead);
+         
+         
+         bufferRead.flip();
+
+         if (bytesRead > 0)
+         {
+            bufferOut.putBytes(bufferRead.array(), 0, bytesRead);
          }
-         while (bytesRead == ClientProducerImpl.CHUNK_SIZE);
 
-         releaseResources();
+         //releaseResources();
       }
       catch (Exception e)
       {
@@ -111,7 +109,7 @@
    }
 
    @Override
-   public int getBodyEncodeSize()
+   public int getBodySize()
    {
       try
       {

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-10 22:56:13 UTC (rev 5103)
@@ -147,7 +147,7 @@
 
    ServerLargeMessage getCurrentLargeMessage(long producerID);
    
-   ServerLargeMessage createLargeMessageStorage(long producerID, byte[] header) throws Exception;
+   ServerLargeMessage createLargeMessageStorage(long producerID, long messageID, byte[] header) throws Exception;
    
    void clearCurrentLargeMessage(long producerID);
    

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-10 22:56:13 UTC (rev 5103)
@@ -22,20 +22,26 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConsumer;
+import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -61,6 +67,12 @@
    // Static
    // ---------------------------------------------------------------------------------------
 
+   // TODO This is temporary, make this configurable somewhere
+   public static final int BIG_PACKAGE_SIZE = 10 * 1024;
+
+   // TODO This is temporary, make this better
+   public static final int CHUNK_SIZE = 10 * 1024;
+
    // Attributes
    // -----------------------------------------------------------------------------------
 
@@ -182,15 +194,23 @@
          }
 
          deliveringRefs.add(ref);
-         
-         SessionReceiveMessage packet = new SessionReceiveMessage(id, ref.getMessage(), ref.getDeliveryCount() + 1);
 
-         channel.send(packet);
+         if (message instanceof ServerLargeMessage)
+         {
+            sendChunks((ServerLargeMessage)message);
+            
+         }
+         else
+         {
+             SessionReceiveMessage packet = new SessionReceiveMessage(id, ref.getMessage(), ref.getDeliveryCount() + 1);
+             channel.send(packet);
+         }
 
+
          return HandleStatus.HANDLED;
       }
    }
-   
+
    public void close() throws Exception
    {     
       setStarted(false);
@@ -300,6 +320,46 @@
    // Private
    // --------------------------------------------------------------------------------------
 
+   /**
+    * @param message
+    * @throws MessagingException
+    */
+   private void sendChunks(ServerLargeMessage message) throws MessagingException
+   {
+      int headerSize = message.getPropertiesEncodeSize();
+
+      final int bodySize = message.getBodySize();
+      
+      int bodyLength = BIG_PACKAGE_SIZE - headerSize;
+
+      MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(message.getPropertiesEncodeSize()));
+      message.encodeProperties(headerBuffer);
+      
+      MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+      message.encodeBody(bodyBuffer, 0, bodyLength);
+
+      SessionSendChunkMessage chunk = new SessionSendChunkMessage(id,
+                                                                  headerBuffer.array(),
+                                                                  bodyBuffer.array(),
+                                                                  bodyLength < bodySize,
+                                                                  false);
+      channel.send(chunk);
+
+      for (int pos = bodyLength; pos < bodySize;)
+      {
+         bodyLength = Math.min(bodySize - pos, CHUNK_SIZE);
+         bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+
+         message.encodeBody(bodyBuffer, pos, bodyLength);
+
+         chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, false);
+
+         channel.send(chunk);
+
+         pos += bodyLength;
+      }
+   }
+   
    private void promptDelivery()
    {
       session.promptDelivery(messageQueue);

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-10 22:56:13 UTC (rev 5103)
@@ -1172,13 +1172,16 @@
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.server.ServerSession#createLargeMessage(long, int, byte[])
     */
-   public ServerLargeMessage createLargeMessageStorage(long producerID, byte[] header) throws Exception
+   public ServerLargeMessage createLargeMessageStorage(long producerID, long messageID, byte[] header) throws Exception
    {
-      ServerLargeMessage largeMessage = storageManager.createLargeMessageStorage(storageManager.generateUniqueID());
+      ServerLargeMessage largeMessage = storageManager.createLargeMessageStorage(messageID);
       
       MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
       
       largeMessage.decodeProperties(headerBuffer);
+
+      // decodeProperties will clean this, as the client didn send the ID originally
+      largeMessage.setMessageID(messageID);
       
       ServerProducer producer = producers.get(producerID);
        

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-10 22:56:13 UTC (rev 5103)
@@ -397,8 +397,7 @@
                
                if (message.getHeader() != null)
                {
-                  largeMessage = session.createLargeMessageStorage(message.getTargetID(), message.getHeader());
-                  largeMessage.setMessageID(message.getMessageID());
+                  largeMessage = session.createLargeMessageStorage(message.getTargetID(), message.getMessageID(), message.getHeader());
                }
                else
                {

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/chunkmessage/MessageChunkTest.java	2008-10-10 15:55:41 UTC (rev 5102)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/chunkmessage/MessageChunkTest.java	2008-10-10 22:56:13 UTC (rev 5103)
@@ -75,8 +75,10 @@
       {
          body.putInt(i);
       }
-      body.flip();      
+      body.flip();
       
+      printBuffer("body to be sent : " , body);
+      
       ClientMessage message = session.createClientMessage(true); 
 
       message.setBody(body);
@@ -106,6 +108,8 @@
       
       System.out.println("msg on client = " + message2.getMessageID());
       
+      printBuffer("message received : ", message2.getBody());
+      
       assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
       
       session.close();
@@ -117,7 +121,7 @@
 
    protected void setUp() throws Exception
    {
-      this.realFiles = false;
+      this.realFiles = true;
       super.setUp();
    }
 
@@ -125,9 +129,38 @@
    {
       super.tearDown();
    }
-
+   
+   
    // Private -------------------------------------------------------
+   
+   public static void printBuffer(String msg, MessagingBuffer buffer)
+   {
+      
+      buffer.rewind();
+      
+      int size = buffer.limit();
+    
+      System.out.print(msg);
+    
+      
+      for (int i = 0; i < size; i ++)
+      {
+         System.out.print(String.format("%1$X", buffer.getByte()));
+         if (i % 40 != 0 || i == 0)
+         {
+            System.out.print(", ");
+         }
+         else
+         {
+            System.out.println();
+            System.out.print(msg);
+         }
+      }
+      buffer.rewind();
 
+      
+   }
+
    // Inner classes -------------------------------------------------
 
 }




More information about the jboss-cvs-commits mailing list