[jboss-cvs] JBoss Messaging SVN: r5418 - in trunk: src/main/org/jboss/messaging/core/client/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 21 12:28:00 EST 2008


Author: timfox
Date: 2008-11-21 12:28:00 -0500 (Fri, 21 Nov 2008)
New Revision: 5418

Added:
   trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java
Removed:
   trunk/src/main/org/jboss/messaging/core/client/FileClientMessage.java
   trunk/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
Log:
Some changes on large message support


Copied: trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java (from rev 5412, trunk/src/main/org/jboss/messaging/core/client/FileClientMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.client;
+
+import java.io.File;
+import java.nio.channels.FileChannel;
+
+import org.jboss.messaging.core.exception.MessagingException;
+
+/**
+ * A ClientFileMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Oct 14, 2008 3:21:15 PM
+ *
+ *
+ */
+public interface ClientFileMessage extends ClientMessage
+{
+   File getFile();
+
+   void setFile(File file);
+   
+   FileChannel getChannel() throws MessagingException;
+   
+   void closeChannel() throws MessagingException;     
+}

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -124,7 +124,7 @@
 
    ClientMessage createClientMessage(final boolean durable);
 
-   FileClientMessage createFileMessage(final boolean durable);
+   ClientFileMessage createFileMessage(final boolean durable);
 
    void start() throws MessagingException;
 

Deleted: trunk/src/main/org/jboss/messaging/core/client/FileClientMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/FileClientMessage.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/FileClientMessage.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -1,50 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.client;
-
-import java.io.File;
-import java.nio.channels.FileChannel;
-
-import org.jboss.messaging.core.exception.MessagingException;
-
-/**
- * A FileClientMessage
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * 
- * Created Oct 14, 2008 3:21:15 PM
- *
- *
- */
-public interface FileClientMessage extends ClientMessage
-{
-   File getFile();
-
-   void setFile(File file);
-   
-   FileChannel getChannel() throws MessagingException;
-   
-   void closeChannel() throws MessagingException;  
-   
-
-}

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -20,7 +20,7 @@
 import java.util.concurrent.Executor;
 
 import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.client.ClientFileMessage;
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -73,7 +73,6 @@
    private File directory;
 
    private ClientMessage currentChunkMessage;
-
    
    private volatile Thread receiverThread;
 
@@ -217,32 +216,7 @@
          receiverThread = null;
       }
    }
-
-   
-   public ClientMessage createFileMessage(MessagingBuffer propertiesBuffer) throws Exception
-   {
-      if (isFileConsumer())
-      {
-         if (!this.directory.exists())
-         {
-            directory.mkdirs();
-         }
-         
-         FileClientMessageImpl message = new FileClientMessageImpl();
-         message.decodeProperties(propertiesBuffer);
-         message.setFile(new File(this.directory, message.getMessageID() + "-" + this.session.getName() + "-" + this.getID() + ".jbm"));
-         message.setLargeMessage(true);
-         return message;
-      }
-      else
-      {
-         ClientMessageImpl message = new ClientMessageImpl();
-         message.decodeProperties(propertiesBuffer);
-         message.setLargeMessage(true);
-         return message;
-      }
-   }
-   
+        
    public ClientMessage receive() throws MessagingException
    {
       return receive(0);
@@ -338,8 +312,7 @@
       }
       
       ClientMessage messageToHandle = message;
-      
-      
+            
       if (isFileConsumer())
       {
          messageToHandle = cloneAsFileMessage(message);
@@ -372,19 +345,17 @@
       
       flowControl(chunk.getBody().length);
       
-
       if (chunk.getHeader() != null)
       {
-
          // The Header only comes on the first message, so a buffer has to be created on the client
          // to hold either a file or a big message
          MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
 
          currentChunkMessage = createFileMessage(header);
 
-         if (currentChunkMessage instanceof FileClientMessage)
+         if (currentChunkMessage instanceof ClientFileMessage)
          {
-            FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+            ClientFileMessage fileMessage = (ClientFileMessage)currentChunkMessage;
             addBytesBody(fileMessage, chunk.getBody());
          }
          else
@@ -398,9 +369,9 @@
          // No header.. this is then a continuation of a previous message
          ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
 
-         if (currentChunkMessage instanceof FileClientMessage)
+         if (currentChunkMessage instanceof ClientFileMessage)
          {
-            FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+            ClientFileMessage fileMessage = (ClientFileMessage)currentChunkMessage;
             addBytesBody(fileMessage, chunk.getBody());
          }
          else
@@ -419,18 +390,17 @@
       if (!chunk.isContinues())
       {
          // Close the file that was being generated
-         if (currentChunkMessage instanceof FileClientMessage)
+         if (currentChunkMessage instanceof ClientFileMessage)
          {
-            ((FileClientMessage)currentChunkMessage).closeChannel();
+            ((ClientFileMessage)currentChunkMessage).closeChannel();
          }
+         
          ClientMessage msgToSend = currentChunkMessage;
          currentChunkMessage = null;
          handleMessage(msgToSend);
-      }
-      
+      }      
    }
    
-
    public void clear()
    {
       synchronized (this)
@@ -642,19 +612,19 @@
       session.acknowledge(id, message.getMessageID());
    }
    
-
-   private FileClientMessage cloneAsFileMessage(ClientMessage message) throws Exception
+   private ClientFileMessage cloneAsFileMessage(final ClientMessage message) throws Exception
    {
       int propertiesSize = message.getPropertiesEncodeSize();
+      
       MessagingBuffer bufferProperties = message.getBody().createNewBuffer(propertiesSize);
 
-      // FIXME: Find a better way to clone this ClientMessageImpl as FileClientMessageImpl without using the MessagingBuffer.
+      // FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the MessagingBuffer.
       //        There is no direct access into the Properties, and I couldn't add a direct cast to this method without loose abstraction
       message.encodeProperties(bufferProperties);
       
       bufferProperties.rewind();
 
-      FileClientMessageImpl cloneMessage = new FileClientMessageImpl();
+      ClientFileMessageImpl cloneMessage = new ClientFileMessageImpl();
       
       cloneMessage.decodeProperties(bufferProperties);
       
@@ -667,15 +637,35 @@
       addBytesBody(cloneMessage, message.getBody().array());
       
       cloneMessage.closeChannel();
-     
-      
+           
       return cloneMessage;
    }
    
+   private ClientMessage createFileMessage(final MessagingBuffer propertiesBuffer) throws Exception
+   {
+      if (isFileConsumer())
+      {
+         if (!this.directory.exists())
+         {
+            directory.mkdirs();
+         }
+         
+         ClientFileMessageImpl message = new ClientFileMessageImpl();
+         message.decodeProperties(propertiesBuffer);
+         message.setFile(new File(this.directory, message.getMessageID() + "-" + this.session.getName() + "-" + this.getID() + ".jbm"));
+         message.setLargeMessage(true);
+         return message;
+      }
+      else
+      {
+         ClientMessageImpl message = new ClientMessageImpl();
+         message.decodeProperties(propertiesBuffer);
+         message.setLargeMessage(true);
+         return message;
+      }
+   }
 
-
-
-   private void addBytesBody(FileClientMessage fileMessage, byte[] body) throws Exception
+   private void addBytesBody(final ClientFileMessage fileMessage, final byte[] body) throws Exception
    {
       FileChannel channel = fileMessage.getChannel();
       channel.write(ByteBuffer.wrap(body));

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -26,7 +26,6 @@
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
  * 
@@ -53,8 +52,6 @@
 
    void cleanUp() throws Exception;
    
-   ClientMessage createFileMessage(MessagingBuffer propertiesBuffer) throws Exception;
-   
    void acknowledge(ClientMessage message) throws MessagingException;
    
    void flushAcks() throws MessagingException;

Copied: trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java (from rev 5412, trunk/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -0,0 +1,231 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.client.impl;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.jboss.messaging.core.client.ClientFileMessage;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A ClientFileMessageImpl
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Oct 13, 2008 4:33:56 PM
+ *
+ *
+ */
+public class ClientFileMessageImpl extends ClientMessageImpl implements ClientFileMessage
+{
+   private File file;
+
+   private FileChannel currentChannel;
+
+   public ClientFileMessageImpl()
+   {
+   }
+
+   public ClientFileMessageImpl(final boolean durable)
+   {
+      super(durable, null);
+   }
+
+   /**
+    * @param type
+    * @param durable
+    * @param expiration
+    * @param timestamp
+    * @param priority
+    * @param body
+    */
+   public ClientFileMessageImpl(final byte type,
+                                final boolean durable,
+                                final long expiration,
+                                final long timestamp,
+                                final byte priority,
+                                final MessagingBuffer body)
+   {
+      super(type, durable, expiration, timestamp, priority, body);
+   }
+
+   /**
+    * @param type
+    * @param durable
+    * @param body
+    */
+   public ClientFileMessageImpl(final byte type, final boolean durable, final MessagingBuffer body)
+   {
+      super(type, durable, body);
+   }
+
+   /**
+    * @param deliveryCount
+    */
+   public ClientFileMessageImpl(final int deliveryCount)
+   {
+      super(deliveryCount);
+   }
+
+   /**
+    * @return the file
+    */
+   public File getFile()
+   {
+      return file;
+   }
+
+   /**
+    * @param file the file to set
+    */
+   public void setFile(final File file)
+   {
+      this.file = file;
+   }
+
+   @Override
+   public MessagingBuffer getBody()
+   {
+      // TODO: Throw an unsuported exception. (Make sure no tests are using this method first)
+
+      FileChannel channel = null;
+      try
+      {
+         // We open a new channel on getBody.
+         // for a better performance, users should be using the channels when using file
+         channel = newChannel();
+
+         ByteBuffer buffer = ByteBuffer.allocate((int)channel.size());
+
+         channel.position(0);
+         channel.read(buffer);
+
+         return new ByteBufferWrapper(buffer);
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e);
+      }
+      finally
+      {
+         try
+         {
+            channel.close();
+         }
+         catch (Throwable ignored)
+         {
+
+         }
+      }
+   }
+
+   @Override
+   public synchronized void encodeBody(final MessagingBuffer buffer, final long start, final int size)
+   {
+      try
+      {
+         FileChannel channel = getChannel();
+
+         ByteBuffer bufferRead = ByteBuffer.allocate(size);
+
+         channel.position(start);
+         channel.read(bufferRead);
+
+         buffer.putBytes(bufferRead.array());
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public void setBody(final MessagingBuffer body)
+   {
+      throw new RuntimeException("Not supported");
+   }
+
+   public synchronized FileChannel getChannel() throws MessagingException
+   {
+      if (currentChannel == null)
+      {
+         currentChannel = newChannel();
+      }
+
+      return currentChannel;
+   }
+
+   public synchronized void closeChannel() throws MessagingException
+   {
+      if (currentChannel != null)
+      {
+         try
+         {
+            currentChannel.close();
+         }
+         catch (IOException e)
+         {
+            throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
+         }
+         currentChannel = null;
+      }
+
+   }
+
+   @Override
+   public synchronized int getBodySize()
+   {
+      return (int)file.length();
+   }
+
+   /**
+    * @return
+    * @throws FileNotFoundException
+    * @throws IOException
+    */
+   private FileChannel newChannel() throws MessagingException
+   {
+      try
+      {
+         RandomAccessFile randomFile = new RandomAccessFile(getFile(), "rw");
+         
+         randomFile.seek(0);
+
+         FileChannel channel = randomFile.getChannel();
+         
+         return channel;
+      }
+      catch (IOException e)
+      {
+         throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
+      }
+   }
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -25,7 +25,7 @@
 import java.nio.ByteBuffer;
 
 import org.jboss.messaging.core.client.AcknowledgementHandler;
-import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.client.ClientFileMessage;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
@@ -74,14 +74,14 @@
    private final boolean blockOnPersistentSend;
 
    private final SimpleString groupID;
-   
+
    private final int minLargeMessageSize;
 
    // Static ---------------------------------------------------------------------------------------
 
    // Constructors ---------------------------------------------------------------------------------
 
-   public ClientProducerImpl(final ClientSessionInternal session,                           
+   public ClientProducerImpl(final ClientSessionInternal session,
                              final SimpleString address,
                              final TokenBucketLimiter rateLimiter,
                              final boolean blockOnNonPersistentSend,
@@ -93,13 +93,13 @@
       this.channel = channel;
 
       this.session = session;
-      
+
       this.address = address;
 
       this.rateLimiter = rateLimiter;
 
       this.blockOnNonPersistentSend = blockOnNonPersistentSend;
-      
+
       this.blockOnPersistentSend = blockOnPersistentSend;
 
       if (autoGroup)
@@ -110,9 +110,8 @@
       {
          this.groupID = null;
       }
-      
-      this.minLargeMessageSize = minLargeMessageSize;
 
+      this.minLargeMessageSize = minLargeMessageSize;
    }
 
    // ClientProducer implementation ----------------------------------------------------------------
@@ -153,7 +152,7 @@
          return;
       }
 
-      doCleanup();      
+      doCleanup();
    }
 
    public void cleanUp()
@@ -225,15 +224,14 @@
       }
 
       boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
-      
+
       SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
 
       if (msg.getEncodeSize() > minLargeMessageSize)
       {
          sendMessageInChunks(true, msg);
       }
-      else
-      if (sendBlocking)
+      else if (sendBlocking)
       {
          channel.sendBlocking(message);
       }
@@ -254,7 +252,7 @@
       if (headerSize > minLargeMessageSize)
       {
          throw new MessagingException(MessagingException.ILLEGAL_STATE,
-                                      "Header size is too big, use the messageBody for large data");
+                                      "Header size is too big, use the messageBody for large data, or increase minLargeMessageSize");
       }
 
       MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(headerSize));
@@ -262,16 +260,16 @@
 
       final int bodySize = msg.getBodySize();
 
-      int bodyLength = minLargeMessageSize - headerSize;
+      int chunkLength = minLargeMessageSize - headerSize;
 
-      MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+      MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
 
-      msg.encodeBody(bodyBuffer, 0, bodyLength);
+      msg.encodeBody(bodyBuffer, 0, chunkLength);
 
       SessionSendChunkMessage chunk = new SessionSendChunkMessage(-1,
                                                                   headerBuffer.array(),
                                                                   bodyBuffer.array(),
-                                                                  bodyLength < bodySize,
+                                                                  chunkLength < bodySize,
                                                                   sendBlocking);
 
       if (sendBlocking)
@@ -283,14 +281,15 @@
          channel.send(chunk);
       }
 
-      for (int pos = bodyLength; pos < bodySize;)
+      for (int pos = chunkLength; pos < bodySize;)
       {
-         bodyLength = Math.min(bodySize - pos, minLargeMessageSize);
-         bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+         chunkLength = Math.min(bodySize - pos, minLargeMessageSize);
+         
+         bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
 
-         msg.encodeBody(bodyBuffer, pos, bodyLength);
+         msg.encodeBody(bodyBuffer, pos, chunkLength);
 
-         chunk = new SessionSendChunkMessage(-1, null, bodyBuffer.array(), pos + bodyLength < bodySize, sendBlocking);
+         chunk = new SessionSendChunkMessage(-1, null, bodyBuffer.array(), pos + chunkLength < bodySize, sendBlocking);
 
          if (sendBlocking)
          {
@@ -301,14 +300,14 @@
             channel.send(chunk);
          }
 
-         pos += bodyLength;
+         pos += chunkLength;
       }
-      
-      if (msg instanceof FileClientMessage)
+
+      if (msg instanceof ClientFileMessage)
       {
          try
          {
-            ((FileClientMessage)msg).closeChannel();
+            ((ClientFileMessage)msg).closeChannel();
          }
          catch (Exception e)
          {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -37,7 +37,7 @@
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.client.ClientFileMessage;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Channel;
@@ -476,9 +476,9 @@
       return new ClientMessageImpl(durable, body);
    }
 
-   public FileClientMessage createFileMessage(final boolean durable)
+   public ClientFileMessage createFileMessage(final boolean durable)
    {
-      return new FileClientMessageImpl(durable);
+      return new ClientFileMessageImpl(durable);
    }
 
    public boolean isClosed()
@@ -622,7 +622,6 @@
       {
          consumer.handleChunk(chunk);
       }
-
    }
 
    public void close() throws MessagingException

Deleted: trunk/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -1,256 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.client.impl;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.FileClientMessage;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * A FileClientMessageImpl
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * 
- * Created Oct 13, 2008 4:33:56 PM
- *
- *
- */
-public class FileClientMessageImpl extends ClientMessageImpl implements FileClientMessage
-{
-
-   File file;
-
-   FileChannel currentChannel;
-
-   /**
-    * 
-    */
-   public FileClientMessageImpl()
-   {
-      super();
-   }
-
-   public FileClientMessageImpl(final boolean durable)
-   {
-      super(durable, null);
-   }
-
-   /**
-    * @param type
-    * @param durable
-    * @param expiration
-    * @param timestamp
-    * @param priority
-    * @param body
-    */
-   public FileClientMessageImpl(final byte type,
-                                final boolean durable,
-                                final long expiration,
-                                final long timestamp,
-                                final byte priority,
-                                final MessagingBuffer body)
-   {
-      super(type, durable, expiration, timestamp, priority, body);
-   }
-
-   /**
-    * @param type
-    * @param durable
-    * @param body
-    */
-   public FileClientMessageImpl(final byte type, final boolean durable, final MessagingBuffer body)
-   {
-      super(type, durable, body);
-   }
-
-   /**
-    * @param deliveryCount
-    */
-   public FileClientMessageImpl(final int deliveryCount)
-   {
-      super(deliveryCount);
-   }
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   /**
-    * @return the file
-    */
-   public File getFile()
-   {
-      return file;
-   }
-
-   /**
-    * @param file the file to set
-    */
-   public void setFile(final File file)
-   {
-      this.file = file;
-   }
-
-   @Override
-   public MessagingBuffer getBody()
-   {
-      // TODO: Throw an unsuported exception. (Make sure no tests are using this method first)
-
-      FileChannel channel = null;
-      try
-      {
-         // We open a new channel on getBody.
-         // for a better performance, users should be using the channels when using file
-         channel = newChannel();
-
-         ByteBuffer buffer = ByteBuffer.allocate((int)channel.size());
-
-         channel.position(0);
-         channel.read(buffer);
-
-         return new ByteBufferWrapper(buffer);
-      }
-      catch (Exception e)
-      {
-         throw new RuntimeException(e);
-      }
-      finally
-      {
-         try
-         {
-            channel.close();
-         }
-         catch (Throwable ignored)
-         {
-
-         }
-      }
-   }
-
-   @Override
-   public synchronized void encodeBody(final MessagingBuffer buffer, final long start, final int size)
-   {
-      try
-      {
-         FileChannel channel = getChannel();
-
-         ByteBuffer bufferRead = ByteBuffer.allocate(size);
-
-         channel.position(start);
-         channel.read(bufferRead);
-
-         buffer.putBytes(bufferRead.array());
-      }
-      catch (Exception e)
-      {
-         throw new RuntimeException(e.getMessage(), e);
-      }
-
-   }
-
-   @Override
-   public void setBody(final MessagingBuffer body)
-   {
-
-      throw new RuntimeException("Not supported");
-   }
-
-   public synchronized FileChannel getChannel() throws MessagingException
-   {
-      if (currentChannel == null)
-      {
-         currentChannel = newChannel();
-      }
-
-      return currentChannel;
-   }
-
-   public synchronized void closeChannel() throws MessagingException
-   {
-      if (currentChannel != null)
-      {
-         try
-         {
-            currentChannel.close();
-         }
-         catch (IOException e)
-         {
-            throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
-         }
-         currentChannel = null;
-      }
-
-   }
-
-   @Override
-   public synchronized int getBodySize()
-   {
-      return (int)file.length();
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   /**
-    * @return
-    * @throws FileNotFoundException
-    * @throws IOException
-    */
-   private FileChannel newChannel() throws MessagingException
-   {
-      try
-      {
-         RandomAccessFile randomFile = new RandomAccessFile(getFile(), "rw");
-         randomFile.seek(0);
-
-         FileChannel channel = randomFile.getChannel();
-         return channel;
-      }
-      catch (IOException e)
-      {
-         throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
-      }
-   }
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -84,7 +84,6 @@
       file.position(file.size());
 
       file.write(ByteBuffer.wrap(bytes), false);
-
    }
 
    @Override

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -44,7 +44,7 @@
 
    private boolean continues;
 
-   private long messageID = 0;
+   private long messageID;
 
    private boolean requiresResponse;
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -86,7 +86,6 @@
    
    public void encodeBody(final MessagingBuffer buffer)
    {
-
       if (clientMessage != null)
       {
          clientMessage.encode(buffer);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -33,7 +33,6 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.Channel;
@@ -88,7 +87,7 @@
    private final Filter filter;
 
    private final int minLargeMessageSize;
-   
+
    private final ServerSession session;
 
    private final Lock lock = new ReentrantLock();
@@ -96,9 +95,9 @@
    private AtomicInteger availableCredits = new AtomicInteger(0);
 
    private boolean started;
-   
-   private volatile LargeMessageControl largeMessage = null;
 
+   private volatile LargeMessageSender largeMessageSender = null;
+
    /**
     * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
     */
@@ -111,12 +110,8 @@
    private final PostOffice postOffice;
 
    private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
-   
-   
 
    private final Channel channel;
-   
-   private final PagingManager pager;
 
    private volatile boolean closed;
 
@@ -135,7 +130,6 @@
                              final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                              final PostOffice postOffice,
                              final Channel channel,
-                             final PagingManager pager,
                              final boolean preCommitAcks)
    {
       this.id = id;
@@ -149,7 +143,7 @@
       this.started = browseOnly || started;
 
       this.browseOnly = browseOnly;
-     
+
       this.storageManager = storageManager;
 
       this.queueSettingsRepository = queueSettingsRepository;
@@ -157,13 +151,11 @@
       this.postOffice = postOffice;
 
       this.channel = channel;
-      
-      this.pager = pager;
 
       this.preCommitAcks = preCommitAcks;
 
       messageQueue.addConsumer(this);
-      
+
       this.minLargeMessageSize = session.getMinLargeMessageSize();
    }
 
@@ -317,16 +309,16 @@
    }
 
    public void receiveCredits(final int credits) throws Exception
-   {      
+   {
       if (credits == -1)
       {
-         //No flow control
+         // No flow control
          availableCredits = null;
       }
       else
       {
          int previous = availableCredits.getAndAdd(credits);
-   
+
          if (previous <= 0 && previous + credits > 0)
          {
             promptDelivery();
@@ -339,8 +331,7 @@
       return messageQueue;
    }
 
-   public void acknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID)
-      throws Exception
+   public void acknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
    {
       if (browseOnly)
       {
@@ -375,7 +366,7 @@
             // Del count is not actually updated in storage unless it's
             // cancelled
             ref.incrementDeliveryCount();
-         }         
+         }
       }
       while (ref.getMessage().getMessageID() != messageID);
 
@@ -387,27 +378,27 @@
       {
          return null;
       }
-      
-      //Expiries can come in our of sequence with respect to delivery order
-      
+
+      // Expiries can come in our of sequence with respect to delivery order
+
       Iterator<MessageReference> iter = deliveringRefs.iterator();
-      
+
       MessageReference ref = null;
-      
+
       while (iter.hasNext())
       {
          MessageReference theRef = iter.next();
-         
+
          if (theRef.getMessage().getMessageID() == messageID)
          {
             iter.remove();
-            
+
             ref = theRef;
-            
+
             break;
          }
       }
-      
+
       if (ref == null)
       {
          throw new IllegalStateException("Could not find reference with id " + messageID +
@@ -416,7 +407,7 @@
                                          " closed " +
                                          closed);
       }
-      
+
       return ref;
    }
 
@@ -492,12 +483,12 @@
 
       queue.referenceAcknowledged(ref);
    }
-   
+
    private void promptDelivery()
    {
-      if (largeMessage != null)
+      if (largeMessageSender != null)
       {
-         if (largeMessage.sendLargeMessage())
+         if (largeMessageSender.sendLargeMessage())
          {
             // prompt Delivery only if chunk was finished
             session.promptDelivery(messageQueue);
@@ -522,7 +513,7 @@
       {
          // If there is a pendingLargeMessage we can't take another message
          // This has to be checked inside the lock as the set to null is done inside the lock
-         if (largeMessage != null)
+         if (largeMessageSender != null)
          {
             return HandleStatus.BUSY;
          }
@@ -547,19 +538,19 @@
             deliveringRefs.add(ref);
          }
 
-
          if (message instanceof ServerLargeMessage)
          {
             // TODO: How to inform the backup node about the LargeMessage being sent?
-            largeMessage = new LargeMessageControl((ServerLargeMessage)message);
-            largeMessage.sendLargeMessage();
+            largeMessageSender = new LargeMessageSender((ServerLargeMessage)message);
+
+            largeMessageSender.sendLargeMessage();
          }
          else
          {
-            sendRegularMessage(ref, message);
+            sendStandardMessage(ref, message);
          }
 
-         if(preCommitAcks)
+         if (preCommitAcks)
          {
             doAck(ref);
          }
@@ -576,7 +567,7 @@
     * @param ref
     * @param message
     */
-   private void sendRegularMessage(final MessageReference ref, final ServerMessage message)
+   private void sendStandardMessage(final MessageReference ref, final ServerMessage message)
    {
       if (availableCredits != null)
       {
@@ -605,45 +596,36 @@
       }
    }
 
-   
-   
-   
-   
-   
    // Inner classes
    // ------------------------------------------------------------------------
-   
-   
+
    /** Internal encapsulation of the logic on sending LargeMessages.
     *  This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent*/
-   class LargeMessageControl
+   private class LargeMessageSender
    {
-      private volatile long sizePendingLargeMessage;
+      private long sizePendingLargeMessage;
 
       /** The current message being processed */
-      private volatile ServerLargeMessage pendingLargeMessage;
-      
+      private ServerLargeMessage pendingLargeMessage;
+
       /** The current position on the message being processed */
-      private volatile long positionPendingLargeMessage;
-      
-      private SessionSendChunkMessage readAheadChunk = null;
-      
-      public LargeMessageControl(ServerLargeMessage message)
+      private long positionPendingLargeMessage;
+
+      private SessionSendChunkMessage readAheadChunk;
+
+      public LargeMessageSender(final ServerLargeMessage message)
       {
          pendingLargeMessage = (ServerLargeMessage)message;
-         positionPendingLargeMessage = 0;
+
          sizePendingLargeMessage = pendingLargeMessage.getBodySize();
       }
-      
-      
+
       public boolean sendLargeMessage()
       {
-         
          lock.lock();
 
          try
          {
-            
             if (pendingLargeMessage == null)
             {
                return true;
@@ -657,18 +639,21 @@
             if (readAheadChunk != null)
             {
                int chunkLen = readAheadChunk.getBody().length;
+               
                positionPendingLargeMessage += chunkLen;
+               
                channel.send(readAheadChunk);
+               
                readAheadChunk = null;
+               
                if (availableCredits != null)
                {
                   availableCredits.addAndGet(-chunkLen);
                }
             }
-            
+
             while (positionPendingLargeMessage < sizePendingLargeMessage)
             {
-               
                if (availableCredits != null && availableCredits.get() <= 0)
                {
                   if (readAheadChunk == null)
@@ -677,41 +662,39 @@
                   }
                   return false;
                }
-               
+
                SessionSendChunkMessage chunk = createChunkSend();
-               
+
                int chunkLen = chunk.getBody().length;
 
                if (availableCredits != null)
                {
                   availableCredits.addAndGet(-chunkLen);
                }
-      
+
                channel.send(chunk);
-               
+
                positionPendingLargeMessage += chunkLen;
             }
-      
+
             pendingLargeMessage.releaseResources();
 
-            ServerConsumerImpl.this.largeMessage = null;
-      
+            ServerConsumerImpl.this.largeMessageSender = null;
+
             return true;
          }
          finally
          {
             lock.unlock();
          }
-         
-
       }
 
       private SessionSendChunkMessage createChunkSend()
       {
          SessionSendChunkMessage chunk;
-         
+
          int localChunkLen = 0;
-         
+
          if (positionPendingLargeMessage == 0)
          {
             int headerSize = pendingLargeMessage.getPropertiesEncodeSize();
@@ -724,7 +707,6 @@
             MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
             pendingLargeMessage.encodeBody(bodyBuffer, 0, localChunkLen);
 
-
             chunk = new SessionSendChunkMessage(id,
                                                 headerBuffer.array(),
                                                 bodyBuffer.array(),
@@ -745,11 +727,8 @@
                                                 positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
                                                 false);
          }
-         
+
          return chunk;
-         
       }
-      
    }
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -200,7 +200,7 @@
 
    private final SimpleString managementAddress;
 
-   private ServerLargeMessage largeMessage;
+   private volatile ServerLargeMessage largeMessage;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -324,8 +324,7 @@
          }
          catch (Throwable error)
          {
-            log.warn(error.toString(), error);
-
+            log.error("Failed to delete large message file", error);
          }
       }
 
@@ -394,8 +393,7 @@
                                                           storageManager,
                                                           queueSettingsRepository,
                                                           postOffice,
-                                                          channel,
-                                                          pager,
+                                                          channel,                                                         
                                                           preCommitAcks);
 
          consumers.put(consumer.getID(), consumer);
@@ -2164,7 +2162,7 @@
       {
          if (packet.getHeader() != null)
          {
-            largeMessage = createLargeMessageStorage(packet.getTargetID(), packet.getMessageID(), packet.getHeader());
+            largeMessage = createLargeMessageStorage(packet.getMessageID(), packet.getHeader());
          }
 
          largeMessage.addBytes(packet.getBody());
@@ -2172,9 +2170,11 @@
          if (!packet.isContinues())
          {
             final ServerLargeMessage message = largeMessage;
+            
             largeMessage = null;
 
             message.complete();
+            
             send(message);
          }
 
@@ -2182,7 +2182,6 @@
          {
             response = new NullResponseMessage();
          }
-
       }
       catch (Exception e)
       {
@@ -2409,7 +2408,7 @@
    // Private
    // ----------------------------------------------------------------------------
 
-   private ServerLargeMessage createLargeMessageStorage(long producerID, long messageID, byte[] header) throws Exception
+   private ServerLargeMessage createLargeMessageStorage(long messageID, byte[] header) throws Exception
    {
       ServerLargeMessage largeMessage = storageManager.createLargeMessage();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -34,7 +34,7 @@
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.client.ClientFileMessage;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.impl.MessageImpl;
@@ -137,7 +137,7 @@
             for (int i = 0; i < numberOfMessages; i++)
             {
                ClientMessage message = session.createFileMessage(true);
-               ((FileClientMessage)message).setFile(tmpData);
+               ((ClientFileMessage)message).setFile(tmpData);
                message.putIntProperty(new SimpleString("counter-message"), i);
                long timeStart = System.currentTimeMillis();
                if (delayDelivery > 0)
@@ -223,7 +223,7 @@
             
             if (realFiles)
             {
-               assertTrue (message instanceof FileClientMessage);
+               assertTrue (message instanceof ClientFileMessage);
             }
 
             if (testTime)
@@ -250,9 +250,9 @@
 
             if (!testTime)
             {
-               if (message instanceof FileClientMessage)
+               if (message instanceof ClientFileMessage)
                {
-                  checkFileRead(((FileClientMessage)message).getFile(), numberOfIntegers);
+                  checkFileRead(((ClientFileMessage)message).getFile(), numberOfIntegers);
                }
                else
                {
@@ -298,10 +298,10 @@
 
    }
 
-   protected FileClientMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers) throws Exception
+   protected ClientFileMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers) throws Exception
    {
 
-      FileClientMessage clientMessage = session.createFileMessage(true);
+      ClientFileMessage clientMessage = session.createFileMessage(true);
 
       File tmpFile = createLargeFile(temporaryDir, "tmpUpload.data", numberOfIntegers);
 
@@ -374,17 +374,17 @@
 
       assertNotNull(clientMessage);
       
-      if (!(clientMessage instanceof FileClientMessage))
+      if (!(clientMessage instanceof ClientFileMessage))
       {
          System.out.println("Size = " + clientMessage.getBodySize());
       }
 
       
-      if (clientMessage instanceof FileClientMessage)
+      if (clientMessage instanceof ClientFileMessage)
       {
-         assertTrue(clientMessage instanceof FileClientMessage);
+         assertTrue(clientMessage instanceof ClientFileMessage);
    
-         FileClientMessage fileClientMessage = (FileClientMessage)clientMessage;
+         ClientFileMessage fileClientMessage = (ClientFileMessage)clientMessage;
    
          assertNotNull(fileClientMessage);
          File receivedFile = fileClientMessage.getFile();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-11-21 17:28:00 UTC (rev 5418)
@@ -34,7 +34,7 @@
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.client.ClientFileMessage;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientSessionImpl;
 import org.jboss.messaging.core.config.Configuration;
@@ -166,7 +166,7 @@
 
          ClientProducer producer = session.createProducer(ADDRESS);
 
-         FileClientMessage clientLarge = createLargeClientMessage(session, numberOfIntegersBigMessage);
+         ClientFileMessage clientLarge = createLargeClientMessage(session, numberOfIntegersBigMessage);
 
          try
          {
@@ -523,7 +523,7 @@
             producer.send(message);
          }
 
-         FileClientMessage clientFile = createLargeClientMessage(session, numberOfIntegersBigMessage);
+         ClientFileMessage clientFile = createLargeClientMessage(session, numberOfIntegersBigMessage);
 
          producer.send(clientFile);
 




More information about the jboss-cvs-commits mailing list