[jboss-cvs] JBoss Messaging SVN: r5093 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/config and 20 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 8 23:09:56 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-10-08 23:09:55 -0400 (Wed, 08 Oct 2008)
New Revision: 5093

Added:
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Modified:
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/Packet.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerProducer.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
Log:
1st commit

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -50,10 +50,6 @@
    {      
       super();
       
-      this.session = session;
-      
-      this.consumerID = consumerID;
-      
       this.deliveryCount = deliveryCount;
    }
    

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -1,32 +1,46 @@
 /*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
 package org.jboss.messaging.core.client.impl;
 
+import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
+
 import org.jboss.messaging.core.client.AcknowledgementHandler;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TokenBucketLimiter;
 
-import java.util.concurrent.Semaphore;
-
 /**
  * The client-side Producer connectionFactory class.
  * 
@@ -43,8 +57,12 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   private final boolean trace = log.isTraceEnabled();
+   // TODO This is temporary, make this better
+   private static final int BIG_PACKAGE_SIZE = 10 * 1024;
 
+   // TODO This is temporary, make this better
+   public static final int CHUNK_SIZE = 10 * 1024;
+
    private final SimpleString address;
 
    private final long id;
@@ -150,46 +168,46 @@
    public void sendManagement(final ClientMessage msg) throws MessagingException
    {
       checkClosed();
-      
+
       if (address != null)
       {
          msg.setDestination(address);
       }
       else
       {
-         msg.setDestination(this.address);
+         msg.setDestination(address);
       }
-      
+
       if (rateLimiter != null)
       {
          // Rate flow control
-                  
+
          rateLimiter.limit();
       }
-      
+
       boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
-      
+
       SessionSendManagementMessage message = new SessionSendManagementMessage(id, msg, false);
-      
+
       if (sendBlocking)
-      {        
+      {
          channel.sendBlocking(message);
       }
       else
       {
          channel.send(message);
-      }      
-      
-      //We only flow control with non-anonymous producers
+      }
+
+      // We only flow control with non-anonymous producers
       if (address == null && creditFlowControl)
       {
          try
          {
-            availableCredits.acquire(message.getClientMessage().getEncodeSize());
+            availableCredits.acquire(msg.getEncodeSize());
          }
          catch (InterruptedException e)
-         {           
-         }         
+         {
+         }
       }
    }
 
@@ -305,40 +323,88 @@
          rateLimiter.limit();
       }
 
-      if(autoGroupId != null)
+      if (autoGroupId != null)
       {
          msg.putStringProperty(MessageImpl.GROUP_ID, autoGroupId);
       }
 
       boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
 
-      SessionSendMessage message;
-      //check to see if this message need to be scheduled.
-      if(scheduledDeliveryTime <= 0)
+
+      if (msg.getEncodeSize() > BIG_PACKAGE_SIZE)
       {
-         message = new SessionSendMessage(id, msg, sendBlocking);
+         int headerSize = msg.getPropertiesEncodeSize();
+
+         if (headerSize > BIG_PACKAGE_SIZE)
+         {
+            throw new MessagingException(MessagingException.ILLEGAL_STATE,
+                                         "Header size is too big, use the messageBody for large data");
+         }
+
+         MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(headerSize));
+         msg.encodeProperties(headerBuffer);
+
+         final int bodySize = msg.getBodyEncodeSize();
+
+         int bodyLength = BIG_PACKAGE_SIZE - headerSize;
+
+         MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+
+         msg.encodeBody(bodyBuffer, 0, bodyLength);
+
+         SessionSendChunkMessage chunk = new SessionSendChunkMessage(id,
+                                                                     headerBuffer.array(),
+                                                                     bodyBuffer.array(),
+                                                                     true,
+                                                                     true);
+
+         channel.sendBlocking(chunk);
+
+         for (int pos = bodyLength; pos < bodySize;)
+         {
+            bodyLength = Math.min(bodySize - pos, CHUNK_SIZE);
+            bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+
+            msg.encodeBody(bodyBuffer, pos, bodyLength);
+
+            chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, true);
+
+            channel.sendBlocking(chunk);
+
+            pos += bodyLength;
+         }
+
       }
       else
       {
-         message = new SessionScheduledSendMessage(id, msg, sendBlocking, scheduledDeliveryTime);
-      }
+         SessionSendMessage message;
 
+         //check to see if this message need to be scheduled.
+         if(scheduledDeliveryTime <= 0)
+         {
+            message = new SessionSendMessage(id, msg, sendBlocking);
+         }
+         else
+         {
+            message = new SessionScheduledSendMessage(id, msg, sendBlocking, scheduledDeliveryTime);
+         }
 
-      if (sendBlocking)
-      {
-         channel.sendBlocking(message);
+         if (sendBlocking)
+         {
+            channel.sendBlocking(message);
+         }
+         else
+         {
+            channel.send(message);
+         }
       }
-      else
-      {
-         channel.send(message);
-      }
 
       // We only flow control with non-anonymous producers
       if (address == null && creditFlowControl)
       {
          try
          {
-            availableCredits.acquire(message.getClientMessage().getEncodeSize());
+            availableCredits.acquire(msg.getEncodeSize());
          }
          catch (InterruptedException e)
          {

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -37,8 +37,7 @@
  */
 public interface Configuration extends Serializable
 {
-   // General attributes
-   // -------------------------------------------------------------------
+   // General attributes -------------------------------------------------------------------
 
    boolean isClustered();
 
@@ -92,9 +91,10 @@
 
    void setBackupConnectorConfiguration(TransportConfiguration config);
 
-   // Journal related attributes
-   // ------------------------------------------------------------
+   boolean isWildcardRoutingEnabled();
 
+   // Journal related attributes ------------------------------------------------------------
+
    String getBindingsDirectory();
 
    void setBindingsDirectory(String dir);
@@ -103,10 +103,6 @@
 
    void setJournalDirectory(String dir);
 
-   String getPagingDirectory();
-
-   void setPagingDirectory(String dir);
-
    JournalType getJournalType();
 
    void setJournalType(JournalType type);
@@ -143,9 +139,20 @@
 
    void setCreateJournalDir(boolean create);
 
+   
+   // Paging Properties --------------------------------------------------------------------
+   
+   String getPagingDirectory();
+
+   void setPagingDirectory(String dir);
+
    long getPagingMaxGlobalSizeBytes();
 
    void setPagingMaxGlobalSizeBytes(long maxGlobalSize);
-
-   boolean isWildcardRoutingEnabled();
+   
+   // Large Messages Properties ------------------------------------------------------------
+   
+   String getLargeMessagesDirectory();
+   
+   void setLargeMessagesDirectory(String directory);
 }

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -58,6 +58,8 @@
    public static final String DEFAULT_JOURNAL_DIR = "data/journal";
 
    public static final String DEFAULT_PAGING_DIR = "data/paging";
+   
+   public static final String DEFAULT_LARGEMESSAGES_DIR = "data/largemessages";
 
    public static final boolean DEFAULT_CREATE_JOURNAL_DIR = true;
 
@@ -105,14 +107,19 @@
 
    protected TransportConfiguration backupConnectorConfig;
 
-   // Paging related attributes
+   protected boolean wildcardRoutingEnabled = DEFAULT_WILDCARD_ROUTING_ENABLED;
 
+   // Paging related attributes ------------------------------------------------------------
+
    protected long pagingMaxGlobalSize = -1;
 
    protected String pagingDirectory = DEFAULT_PAGING_DIR;
+   
 
-   // Journal related attributes
+   // File related attributes -----------------------------------------------------------
 
+   protected String largeMessagesDirectory = DEFAULT_LARGEMESSAGES_DIR;
+   
    protected String bindingsDirectory = DEFAULT_BINDINGS_DIRECTORY;
 
    protected boolean createBindingsDir = DEFAULT_CREATE_BINDINGS_DIR;
@@ -135,8 +142,6 @@
 
    protected int journalBufferReuseSize = DEFAULT_JOURNAL_REUSE_BUFFER_SIZE;
 
-   protected boolean wildcardRoutingEnabled = DEFAULT_WILDCARD_ROUTING_ENABLED;
-
    public boolean isClustered()
    {
       return clustered;
@@ -401,6 +406,18 @@
    {
       pagingMaxGlobalSize = maxGlobalSize;
    }
+   
+   
+   public String getLargeMessagesDirectory()
+   {
+      return largeMessagesDirectory;
+   }
+   
+   public void setLargeMessagesDirectory(final String directory)
+   {
+      this.largeMessagesDirectory = directory;
+   }
+   
 
    @Override
    public boolean equals(final Object other)
@@ -424,6 +441,7 @@
              cother.isRequireDestinations() == isRequireDestinations() &&
              cother.isSecurityEnabled() == isSecurityEnabled() &&
              cother.isWildcardRoutingEnabled() == isWildcardRoutingEnabled() &&
+             cother.getLargeMessagesDirectory().equals(getLargeMessagesDirectory()) &&
              cother.getBindingsDirectory().equals(getBindingsDirectory()) &&
              cother.getJournalDirectory().equals(getJournalDirectory()) &&
              cother.getJournalFileSize() == getJournalFileSize() &&

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -306,6 +306,8 @@
 
       // Persistence config
 
+      largeMessagesDirectory = getString(e, "large-messages-directory", largeMessagesDirectory);
+      
       bindingsDirectory = getString(e, "bindings-directory", bindingsDirectory);
 
       createBindingsDir = getBoolean(e, "create-bindings-dir", createBindingsDir);

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -38,6 +38,8 @@
     * Creates the file if it doesn't already exist, then opens it
     */
    void open() throws Exception;
+   
+   boolean isOpen();
 
    /**
     * For certain operations (like loading) we don't need open the file with full maxIO

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -74,6 +74,11 @@
       this.maxIO = maxIO;
    }
 
+   public boolean isOpen() 
+   {
+      return opened;
+   }
+   
    public int getAlignment() throws Exception
    {
       checkOpened();

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -77,6 +77,11 @@
    {
       return fileName;
    }
+   
+   public boolean isOpen()
+   {
+      return file != null;
+   }
 
    public synchronized void open() throws Exception
    {

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -74,6 +74,26 @@
    
    void decode(MessagingBuffer buffer);
    
+   
+   int getPropertiesEncodeSize();
+   
+   void encodeProperties(MessagingBuffer buffer);
+   
+   void decodeProperties(MessagingBuffer buffer);
+   
+   
+   int getBodyEncodeSize();
+   
+   
+   // Used on Message chunk
+   void encodeBody(MessagingBuffer buffer, int start, int size);
+   
+   void encodeBody(MessagingBuffer buffer);
+   
+   void decodeBody(MessagingBuffer buffer);
+   
+   
+   
    // Properties
    // ------------------------------------------------------------------
    

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -18,19 +18,26 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.message.impl;
 
+import static org.jboss.messaging.util.DataConstants.SIZE_BOOLEAN;
+import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import static org.jboss.messaging.util.DataConstants.*;
+import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TypedProperties;
 
-import java.util.Set;
-
 /**
  * A concrete implementation of a message
  *
@@ -61,7 +68,7 @@
    // Attributes ----------------------------------------------------
 
    protected long messageID;
-   
+
    private SimpleString destination;
 
    private byte type;
@@ -83,7 +90,7 @@
 
    protected MessageImpl()
    {
-      this.properties = new TypedProperties();
+      properties = new TypedProperties();
    }
 
    /**
@@ -95,8 +102,12 @@
     * @param priority
     * @param body
     */
-   protected MessageImpl(final byte type, final boolean durable, final long expiration,
-                         final long timestamp, final byte priority, MessagingBuffer body)
+   protected MessageImpl(final byte type,
+                         final boolean durable,
+                         final long expiration,
+                         final long timestamp,
+                         final byte priority,
+                         final MessagingBuffer body)
    {
       this();
       this.type = type;
@@ -113,17 +124,17 @@
    protected MessageImpl(final MessageImpl other)
    {
       this();
-      this.messageID = other.messageID;
-      this.destination = other.destination;
-      this.type = other.type;
-      this.durable = other.durable;
-      this.expiration = other.expiration;
-      this.timestamp = other.timestamp;
-      this.priority = other.priority;
-      this.properties = new TypedProperties(other.properties);
-      this.body = other.body;
+      messageID = other.messageID;
+      destination = other.destination;
+      type = other.type;
+      durable = other.durable;
+      expiration = other.expiration;
+      timestamp = other.timestamp;
+      priority = other.priority;
+      properties = new TypedProperties(other.properties);
+      body = other.body;
    }
-   
+
    protected MessageImpl(final long messageID)
    {
       this();
@@ -132,34 +143,69 @@
 
    // Message implementation ----------------------------------------
 
-   public void encode(MessagingBuffer buff)
+   public void encode(final MessagingBuffer buffer)
    {
-      buff.putLong(messageID);
-      buff.putSimpleString(destination);
-      buff.putByte(type);
-      buff.putBoolean(durable);
-      buff.putLong(expiration);
-      buff.putLong(timestamp);
-      buff.putByte(priority);
-      properties.encode(buff);
-      buff.putInt(body.limit());    
-      buff.putBytes(body.array(), 0, body.limit());
+      encodeProperties(buffer);
+      encodeBody(buffer);
    }
 
    public int getEncodeSize()
    {
-      return SIZE_LONG + /* Destination */ SimpleString.sizeofString(destination) +
-      /* Type */ SIZE_BYTE +
-      /* Durable */ SIZE_BOOLEAN +
-      /* Expiration */ SIZE_LONG +
-      /* Timestamp */ SIZE_LONG +
-      /* Priority */ SIZE_BYTE +
-      /* PropertySize and Properties */ properties.getEncodeSize() +
-      /* BodySize and Body */ SIZE_INT + body.limit();
+      return getPropertiesEncodeSize() + getBodyEncodeSize();
    }
 
+   public int getPropertiesEncodeSize()
+   {
+      return SIZE_LONG + /* Destination */SimpleString.sizeofString(destination) +
+      /* Type */SIZE_BYTE +
+      /* Durable */SIZE_BOOLEAN +
+      /* Expiration */SIZE_LONG +
+      /* Timestamp */SIZE_LONG +
+      /* Priority */SIZE_BYTE +
+      /* PropertySize and Properties */properties.getEncodeSize();
+   }
+
+   public int getBodyEncodeSize()
+   {
+      return /* BodySize and Body */SIZE_INT + body.limit();
+   }
+
+   
+   public void encodeProperties(MessagingBuffer buffer)
+   {
+      buffer.putLong(messageID);
+      buffer.putSimpleString(destination);
+      buffer.putByte(type);
+      buffer.putBoolean(durable);
+      buffer.putLong(expiration);
+      buffer.putLong(timestamp);
+      buffer.putByte(priority);
+      properties.encode(buffer);
+   }
+
+
+   public void encodeBody(MessagingBuffer buffer)
+   {
+      buffer.putInt(body.limit());
+      buffer.putBytes(body.array(), 0, body.limit());
+   }
+   
+   // Used on Message chunk
+   public void encodeBody(MessagingBuffer buffer, int start, int size)
+   {
+      buffer.putBytes(body.array(), start, size);
+   }
+   
+
    public void decode(final MessagingBuffer buffer)
    {
+      decodeProperties(buffer);
+
+      decodeBody(buffer);
+   }
+
+   public void decodeProperties(final MessagingBuffer buffer)
+   {
       messageID = buffer.getLong();
       destination = buffer.getSimpleString();
       type = buffer.getByte();
@@ -167,32 +213,33 @@
       expiration = buffer.getLong();
       timestamp = buffer.getLong();
       priority = buffer.getByte();
+      properties.decode(buffer);
+   }
 
-      properties.decode(buffer);
+   public void decodeBody(final MessagingBuffer buffer)
+   {
       int len = buffer.getInt();
-
-      //TODO - this can be optimised
+      // TODO - this can be optimised
       byte[] bytes = new byte[len];
       buffer.getBytes(bytes);
-      body = buffer.createNewBuffer(len);
-      body.putBytes(bytes);      
+      this.body = new ByteBufferWrapper(ByteBuffer.wrap(bytes));;
    }
-   
+
    public long getMessageID()
    {
       return messageID;
    }
-   
+
    public SimpleString getDestination()
    {
       return destination;
    }
-   
-   public void setDestination(SimpleString destination)
+
+   public void setDestination(final SimpleString destination)
    {
       this.destination = destination;
    }
-   
+
    public byte getType()
    {
       return type;
@@ -202,7 +249,7 @@
    {
       return durable;
    }
-   
+
    public void setDurable(final boolean durable)
    {
       this.durable = durable;
@@ -222,12 +269,12 @@
    {
       return timestamp;
    }
-   
+
    public void setTimestamp(final long timestamp)
    {
       this.timestamp = timestamp;
    }
- 
+
    public byte getPriority()
    {
       return priority;
@@ -237,105 +284,105 @@
    {
       this.priority = priority;
    }
-     
+
    public boolean isExpired()
    {
       if (expiration == 0)
       {
          return false;
       }
-      
+
       return System.currentTimeMillis() - expiration >= 0;
    }
-   
-   // Properties 
+
+   // Properties
    // ---------------------------------------------------------------------------------------
-   
+
    public void putBooleanProperty(final SimpleString key, final boolean value)
    {
       properties.putBooleanProperty(key, value);
    }
-            
+
    public void putByteProperty(final SimpleString key, final byte value)
    {
       properties.putByteProperty(key, value);
    }
-   
+
    public void putBytesProperty(final SimpleString key, final byte[] value)
    {
       properties.putBytesProperty(key, value);
    }
-   
+
    public void putShortProperty(final SimpleString key, final short value)
    {
       properties.putShortProperty(key, value);
    }
-   
+
    public void putIntProperty(final SimpleString key, final int value)
    {
       properties.putIntProperty(key, value);
    }
-   
+
    public void putLongProperty(final SimpleString key, final long value)
    {
       properties.putLongProperty(key, value);
    }
-   
+
    public void putFloatProperty(final SimpleString key, final float value)
    {
       properties.putFloatProperty(key, value);
    }
-   
+
    public void putDoubleProperty(final SimpleString key, final double value)
    {
       properties.putDoubleProperty(key, value);
    }
-   
+
    public void putStringProperty(final SimpleString key, final SimpleString value)
    {
       properties.putStringProperty(key, value);
    }
-   
+
    public Object getProperty(final SimpleString key)
    {
       return properties.getProperty(key);
-   }  
-   
+   }
+
    public Object removeProperty(final SimpleString key)
    {
       return properties.removeProperty(key);
    }
-   
+
    public boolean containsProperty(final SimpleString key)
    {
       return properties.containsProperty(key);
    }
-   
+
    public Set<SimpleString> getPropertyNames()
    {
       return properties.getPropertyNames();
    }
-   
+
    // Body
    // -------------------------------------------------------------------------------------
-   
+
    public MessagingBuffer getBody()
    {
       return body;
    }
-   
+
    public void setBody(final MessagingBuffer body)
    {
       this.body = body;
    }
-      
+
    // Public --------------------------------------------------------
-   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
 
-   // Inner classes -------------------------------------------------  
+   // Inner classes -------------------------------------------------
 }

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -30,6 +30,7 @@
 import org.jboss.messaging.core.server.MessagingComponent;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.util.SimpleString;
@@ -67,6 +68,11 @@
    void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception;
 
    void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
+   
+   /** Create an area that will get LargeMessage bytes on the server size*/
+   ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception;
+   
+   
 
    /** Used to delete non-messaging data (such as PageTransaction and LasPage) */
    void storeDeleteTransactional(long txID, long recordID) throws Exception;

Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	                        (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -0,0 +1,155 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.persistence.impl.journal;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.client.impl.ClientProducerImpl;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerLargeMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+
+/**
+ * A ServerLargeMessageImpl
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created 30-Sep-08 12:02:45 PM
+ *
+ *
+ */
+public class JournalServerLargeMessageImpl extends ServerMessageImpl implements ServerLargeMessage
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   final SequentialFile file;
+
+   ByteBuffer headersBuffer;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public JournalServerLargeMessageImpl(final long id, final SequentialFile file)
+   {
+      super(id);
+      this.file = file;
+   }
+
+   // Public --------------------------------------------------------
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.ServerLargeMessage#addBytes(byte[])
+    */
+   public void addBytes(final byte[] bytes) throws Exception
+   {
+      if (!file.isOpen())
+      {
+         file.open();
+      }
+
+      file.write(ByteBuffer.wrap(bytes), false);
+
+   }
+
+   @Override
+   public void encodeBody(final MessagingBuffer bufferOut)
+   {
+      try
+      {
+         ByteBuffer bufferRead = ByteBuffer.allocate(ClientProducerImpl.CHUNK_SIZE);
+         if (!file.isOpen())
+         {
+            file.open();
+         }
+
+         int bytesRead = 0;
+         file.position(0);
+         do
+         {
+            bufferRead.clear();
+            bytesRead = file.read(bufferRead);
+            bufferRead.flip();
+
+            if (bytesRead > 0)
+            {
+               bufferOut.putBytes(bufferRead.array(), 0, bytesRead);
+            }
+         }
+         while (bytesRead == ClientProducerImpl.CHUNK_SIZE);
+
+         releaseResources();
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public int getBodyEncodeSize()
+   {
+      try
+      {
+         if (!file.isOpen())
+         {
+            file.open();
+         }
+
+         return (int)file.size();
+      }
+
+      catch (Exception e)
+      {
+         throw new RuntimeException("Can't get the file size on " + file.getFileName());
+      }
+   }
+
+   @Override
+   public int getMemoryEstimate()
+   {
+      // The body won't be on memory (aways on-file), so we don't consider this for paging
+      return super.getPropertiesEncodeSize();
+   }
+
+   public void releaseResources() throws Exception
+   {
+      if (file.isOpen())
+      {
+         file.close();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -22,7 +22,24 @@
 
 package org.jboss.messaging.core.persistence.impl.journal;
 
+import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.journal.EncodingSupport;
@@ -50,6 +67,7 @@
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.core.transaction.ResourceManager;
@@ -59,17 +77,6 @@
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TimeAndCounterIDGenerator;
 
-import javax.transaction.xa.Xid;
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * 
  * A JournalStorageManager
@@ -83,12 +90,6 @@
 {
    private static final Logger log = Logger.getLogger(JournalStorageManager.class);
 
-   private static final int SIZE_LONG = 8;
-
-   private static final int SIZE_INT = 4;
-
-   private static final int SIZE_BYTE = 1;
-
    // Bindings journal record type
 
    public static final byte BINDING_RECORD = 21;
@@ -100,6 +101,8 @@
 
    // Message journal record types
 
+   public static final byte ADD_LARGE_MESSAGE = 30;
+
    public static final byte ADD_MESSAGE = 31;
 
    public static final byte ACKNOWLEDGE_REF = 32;
@@ -112,7 +115,7 @@
 
    public static final byte SET_SCHEDULED_DELIVERY_TIME = 44;
 
-   //This will produce a unique id **for this node only**
+   // This will produce a unique id **for this node only**
    private final IDGenerator idGenerator = new TimeAndCounterIDGenerator();
 
    private final AtomicLong bindingIDSequence = new AtomicLong(0);
@@ -121,6 +124,8 @@
 
    private final Journal bindingsJournal;
 
+   private final SequentialFileFactory largeMessagesFactory;
+
    private final ConcurrentMap<SimpleString, Long> destinationIDMap = new ConcurrentHashMap<SimpleString, Long>();
 
    private volatile boolean started;
@@ -191,13 +196,18 @@
                                        "jbm",
                                        config.getJournalMaxAIO(),
                                        config.getJournalBufferReuseSize());
+
+      largeMessagesFactory = new NIOSequentialFileFactory(config.getLargeMessagesDirectory());
    }
 
    /* This constructor is only used for testing */
-   public JournalStorageManager(final Journal messageJournal, final Journal bindingsJournal)
+   public JournalStorageManager(final Journal messageJournal,
+                                final Journal bindingsJournal,
+                                final SequentialFileFactory largeMessagesFactory)
    {
       this.messageJournal = messageJournal;
       this.bindingsJournal = bindingsJournal;
+      this.largeMessagesFactory = largeMessagesFactory;
    }
 
    public long generateUniqueID()
@@ -205,11 +215,32 @@
       return idGenerator.generateID();
    }
 
+   /** Create an area that will get LargeMessage bytes on the server size*/
+   public ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception
+   {
+      return new JournalServerLargeMessageImpl(messageID, largeMessagesFactory.createSequentialFile(messageID + ".msg",
+                                                                                                    -1));
+   }
+
    // Non transactional operations
 
    public void storeMessage(final ServerMessage message) throws Exception
    {
-      messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message);
+      if (message.getMessageID() <= 0)
+      {
+         throw new MessagingException(MessagingException.ILLEGAL_STATE, "MessageId was not assigned to Message");
+      }
+
+      if (message instanceof ServerLargeMessage)
+      {
+         messageJournal.appendAddRecord(message.getMessageID(),
+                                        ADD_LARGE_MESSAGE,
+                                        new LargeMessageEncoding((ServerLargeMessage)message));
+      }
+      else
+      {
+         messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message);
+      }
    }
 
    public void storeAcknowledge(final long queueID, final long messageID) throws Exception
@@ -231,7 +262,23 @@
 
    public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
    {
-      messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), ADD_MESSAGE, message);
+      if (message.getMessageID() <= 0)
+      {
+         throw new MessagingException(MessagingException.ILLEGAL_STATE, "MessageId was not assigned to Message");
+      }
+
+      if (message instanceof ServerLargeMessage)
+      {
+         messageJournal.appendAddRecordTransactional(txID,
+                                                     message.getMessageID(),
+                                                     ADD_LARGE_MESSAGE,
+                                                     new LargeMessageEncoding(((ServerLargeMessage)message)));
+      }
+      else
+      {
+         messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), ADD_MESSAGE, message);
+      }
+
    }
 
    public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
@@ -332,6 +379,23 @@
 
          switch (recordType)
          {
+            case ADD_LARGE_MESSAGE:
+            {
+               ServerLargeMessage largeMessage = this.createLargeMessageStorage(record.id);
+
+               LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
+               
+               messageEncoding.decode(buff);
+
+               List<MessageReference> refs = postOffice.route(largeMessage);
+
+               for (MessageReference ref : refs)
+               {
+                  ref.getQueue().addLast(ref);
+               }
+
+               break;
+            }
             case ADD_MESSAGE:
             {
                ServerMessage message = new ServerMessageImpl(record.id);
@@ -883,7 +947,7 @@
       public int getEncodeSize()
       {
          return SimpleString.sizeofString(queueName) + SimpleString.sizeofString(address) + 1 + // HasFilter?
-                ((filter != null) ? SimpleString.sizeofString(filter) : 0);
+                (filter != null ? SimpleString.sizeofString(filter) : 0);
       }
    }
 
@@ -917,6 +981,42 @@
 
    }
 
+   private static class LargeMessageEncoding implements EncodingSupport
+   {
+
+      private final ServerLargeMessage message;
+
+      public LargeMessageEncoding(ServerLargeMessage message)
+      {
+         this.message = message;
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.EncodingSupport#decode(org.jboss.messaging.core.remoting.spi.MessagingBuffer)
+       */
+      public void decode(final MessagingBuffer buffer)
+      {
+         message.decodeProperties(buffer);
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.EncodingSupport#encode(org.jboss.messaging.core.remoting.spi.MessagingBuffer)
+       */
+      public void encode(final MessagingBuffer buffer)
+      {
+         message.encodeProperties(buffer);
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.EncodingSupport#getEncodeSize()
+       */
+      public int getEncodeSize()
+      {
+         return message.getPropertiesEncodeSize();
+      }
+
+   }
+
    private static class DeliveryCountUpdateEncoding implements EncodingSupport
    {
       long queueID;

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -31,6 +31,7 @@
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.util.IDGenerator;
@@ -145,7 +146,16 @@
    public void updateDeliveryCount(MessageReference ref) throws Exception
 	{
 	}
+   
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.persistence.StorageManager#createLargeMessageStorage(long, int, int)
+    */
+   public ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception
+   {
+      return new NullStorageServerLargeMessageImpl(messageID);
+   }
 
+
 	public long generateUniqueID()
 	{
 	   //FIXME - this needs to use Howard's ID generator from JBM 1.4

Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java	                        (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.persistence.impl.nullpm;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerLargeMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+
+/**
+ * A NullStorageServerLargeMessageImpl
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created 30-Sep-08 1:51:42 PM
+ *
+ *
+ */
+public class NullStorageServerLargeMessageImpl extends ServerMessageImpl implements ServerLargeMessage
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public NullStorageServerLargeMessageImpl(final long messageID)
+   {
+      super(messageID);
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.ServerLargeMessage#release()
+    */
+   public void releaseResources() throws Exception
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.ServerLargeMessage#addBytes(byte[])
+    */
+   public synchronized void addBytes(final byte[] bytes)
+   {
+      MessagingBuffer buffer = this.getBody();
+      
+      if (buffer != null)
+      {
+         ByteBuffer newBuffer = ByteBuffer.allocate(buffer.limit() + bytes.length);
+         newBuffer.put(buffer.array());
+         buffer = new ByteBufferWrapper(newBuffer);
+         this.setBody(buffer);
+      }
+      else
+      {
+         buffer = new ByteBufferWrapper(ByteBuffer.allocate(bytes.length));
+         this.setBody(buffer);
+      }
+      
+      buffer.putBytes(bytes);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/Packet.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -45,6 +45,8 @@
       
    void decode(MessagingBuffer buffer);
    
+   int getPacketSize();
+   
    boolean isRequiresConfirmations();
    
    boolean isReplicateBlocking();

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -12,21 +12,6 @@
 
 package org.jboss.messaging.core.remoting.impl;
 
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.ResponseNotifier;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.DuplicablePacket;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
@@ -44,6 +29,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
@@ -84,6 +70,37 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.ResponseNotifier;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.DuplicablePacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
@@ -116,6 +133,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
@@ -138,21 +156,6 @@
 import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.messaging.util.SimpleIDGenerator;
 
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 /**
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -488,8 +491,8 @@
 
    private void doWrite(final Packet packet)
    {
-      final MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
-
+      //System.out.println(packet.getClass().getCanonicalName() + " size = " + packet.getPacketSize());
+      final MessagingBuffer buffer = transportConnection.createBuffer(packet.getPacketSize());
       packet.encode(buffer);
 
       transportConnection.write(buffer);
@@ -793,6 +796,11 @@
             packet = new SessionSendManagementMessage();
             break;
          }
+         case SESS_CHUNK_SEND:
+         {
+            packet = new SessionSendChunkMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -28,7 +28,9 @@
 
    private static final Logger log = Logger.getLogger(PacketImpl.class);
 
-   public static final int INITIAL_BUFFER_SIZE = 1024;
+   public static final int DEFAULT_PACKET_SIZE = 1024;
+   
+   protected static final int BASIC_PACKET_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
 
    private long channelID;
 
@@ -159,6 +161,8 @@
 
    public static final byte SESS_SCHEDULED_SEND = 91;
 
+   public static final byte SESS_CHUNK_SEND = 95;
+
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)
@@ -168,6 +172,12 @@
 
    // Public --------------------------------------------------------
 
+   
+   public int getPacketSize()
+   {
+      return DEFAULT_PACKET_SIZE;
+   }
+   
    public byte getType()
    {
       return type;

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -27,6 +27,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.DataConstants;
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -95,6 +96,12 @@
       return deliveryCount;
    }
    
+   
+   public int getPacketSize()
+   {
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT + serverMessage.getEncodeSize();
+   }
+   
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.putLong(consumerID);

Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java	                        (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -0,0 +1,192 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionSendChunkMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long targetID;
+
+   private byte[] header;
+
+   private byte[] body;
+
+   private boolean continues;
+
+   private long messageID = 0;
+
+   private boolean requiresResponse;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionSendChunkMessage(final long targetID,
+                                  final byte[] header,
+                                  final byte[] body,
+                                  final boolean continues,
+                                  final boolean requiresResponse)
+   {
+      super(SESS_CHUNK_SEND);
+      this.targetID = targetID;
+      this.header = header;
+      this.body = body;
+      this.continues = continues;
+      this.requiresResponse = requiresResponse;
+   }
+
+   public SessionSendChunkMessage()
+   {
+      super(SESS_CHUNK_SEND);
+   }
+
+   // Public --------------------------------------------------------
+
+   public long getTargetID()
+   {
+      return targetID;
+   }
+
+   public boolean isRequiresResponse()
+   {
+      return requiresResponse;
+   }
+
+   public byte[] getHeader()
+   {
+      return header;
+   }
+
+   public byte[] getBody()
+   {
+      return body;
+   }
+
+   public long getMessageID()
+   {
+      return messageID;
+   }
+
+   public void setMessageID(final long messageId)
+   {
+      messageID = messageId;
+   }
+
+   public boolean isContinues()
+   {
+      return continues;
+   }
+
+   @Override
+   public int getPacketSize()
+   {
+      return DEFAULT_PACKET_SIZE + DataConstants.SIZE_LONG /* TargetID */+
+             DataConstants.SIZE_INT /* HeaderLength */+
+             (header != null ? header.length : 0) /* Header bytes */+
+             DataConstants.SIZE_INT /* BodyLength */+
+             body.length /* Body bytes */+
+             DataConstants.SIZE_BOOLEAN /* hasContinuations */+
+             DataConstants.SIZE_BOOLEAN /* requiresResponse */+
+             DataConstants.SIZE_BOOLEAN /* has MessageId */+
+             (messageID > 0 ? DataConstants.SIZE_LONG : 0);
+   }
+
+   @Override
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putLong(targetID);
+
+      if (header != null)
+      {
+         buffer.putInt(header.length);
+         buffer.putBytes(header);
+      }
+      else
+      {
+         buffer.putInt(0);
+      }
+
+      buffer.putInt(body.length);
+      buffer.putBytes(body);
+
+      buffer.putBoolean(continues);
+
+      buffer.putBoolean(requiresResponse);
+
+      buffer.putBoolean(messageID > 0);
+
+      if (messageID > 0)
+      {
+         buffer.putLong(messageID);
+      }
+   }
+
+   @Override
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putLong(targetID);
+
+      final int headerLength = buffer.getInt();
+
+      if (headerLength > 0)
+      {
+         header = new byte[headerLength];
+         buffer.getBytes(header);
+      }
+
+      final int bodyLength = buffer.getInt();
+
+      body = new byte[bodyLength];
+      buffer.getBytes(body);
+
+      continues = buffer.getBoolean();
+
+      requiresResponse = buffer.getBoolean();
+
+      final boolean hasMessageID = buffer.getBoolean();
+
+      if (hasMessageID)
+      {
+         messageID = buffer.getLong();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java	                        (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server;
+
+/**
+ * A LargeMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created 30-Sep-08 10:58:04 AM
+ *
+ *
+ */
+public interface ServerLargeMessage extends ServerMessage
+{
+   void addBytes(byte[] bytes) throws Exception;
+
+   /** Close the files if opened */
+   void releaseResources() throws Exception;
+
+}

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerProducer.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerProducer.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerProducer.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -18,11 +18,10 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.server;
 
-
 /**
  * 
  * A ServerProducer
@@ -39,6 +38,12 @@
 	
 	void send(ServerMessage msg) throws Exception;
 
+   /** Current LargeMessage being sent in chunks */
+   ServerLargeMessage getCurrentChunk();
+   
+   /** Current LargeMessage being sent in chunks */
+   void setCurrentChunk(ServerLargeMessage message);
+
    void sendScheduled(ServerMessage message, long scheduledDeliveryTime) throws Exception;
 	
 	void requestAndSendCredits() throws Exception;

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -145,6 +145,12 @@
 
    void sendProducerMessage(long producerID, ServerMessage message) throws Exception;
 
+   ServerLargeMessage getCurrentLargeMessage(long producerID);
+   
+   ServerLargeMessage createLargeMessageStorage(long producerID, byte[] header) throws Exception;
+   
+   void clearCurrentLargeMessage(long producerID);
+   
    void sendScheduledProducerMessage(long producerID, ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
 
    boolean browserHasNextMessage(long browserID) throws Exception;

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -24,6 +24,7 @@
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.JournalImpl;
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.core.management.ManagementService;
@@ -78,7 +79,7 @@
       return new MessagingServiceImpl(server, storageManager, remotingService);
    }
 
-   public static MessagingServiceImpl newNioStorageMessagingServer(final Configuration config, String journalDir, String bindingsDir)
+   public static MessagingServiceImpl newNioStorageMessagingServer(final Configuration config, String journalDir, String bindingsDir, String largeMessagesDir)
    {
       NIOSequentialFileFactory sequentialFileFactory = new NIOSequentialFileFactory(journalDir);
       NIOSequentialFileFactory sequentialFileFactory2 = new NIOSequentialFileFactory(bindingsDir);
@@ -92,8 +93,10 @@
 	   		config.getJournalMinFiles(), config.isJournalSyncTransactional(),
 	   		config.isJournalSyncNonTransactional(), sequentialFileFactory,
 	   		"jbm-bindings", "jbm", config.getJournalMaxAIO(), 0);
+      
+      SequentialFileFactory largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDir);
 
-      StorageManager storageManager = new JournalStorageManager(msgs, bindings);
+      StorageManager storageManager = new JournalStorageManager(msgs, bindings, largeMessagesFactory);
 
       RemotingService remotingService = new RemotingServiceImpl(config);
 

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -18,50 +18,52 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.server.impl;
 
-import org.jboss.messaging.core.logging.Logger;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
+import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerProducer;
 import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.util.SimpleString;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * 
  * A ServerProducerImpl
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  *
  */
 public class ServerProducerImpl implements ServerProducer
 {
-	private static final Logger log = Logger.getLogger(ServerProducerImpl.class);
-	
-	private final long id;
-	
-	private final ServerSession session;
-	
-	private final SimpleString address;
-	
-	private final FlowController flowController;
-	
-	private final int windowSize;
-	
-	private volatile boolean waiting;
-	
-   private AtomicInteger creditsToSend = new AtomicInteger(0);
-   
+   private final long id;
+
+   private final ServerSession session;
+
+   private final SimpleString address;
+
+   private final FlowController flowController;
+
+   private final int windowSize;
+
+   private volatile boolean waiting;
+
+   private final AtomicInteger creditsToSend = new AtomicInteger(0);
+
    private final Channel channel;
-     	
+
+   private ServerLargeMessage currentlargeMessage;
+
 	// Constructors ----------------------------------------------------------------
 	
 	public ServerProducerImpl(final long id, final ServerSession session,
@@ -109,35 +111,61 @@
       session.sendScheduled(message, scheduledDeliveryTime);
    }
 
+   public ServerLargeMessage getCurrentChunk()
+   {
+      return currentlargeMessage;
+   }
+
+   public void setCurrentChunk(final ServerLargeMessage message)
+   {
+      currentlargeMessage = message;
+   }
+
+
+   public void sendChunk(final byte[] message, final int position, final int totalSize) throws Exception
+   {
+      currentlargeMessage.addBytes(message);
+      if (position + message.length > totalSize)
+      {
+         throw new MessagingException(MessagingException.ILLEGAL_STATE, "Invalid ChunkSize sent over the wire");
+      }
+      else if (position + message.length == totalSize)
+      {
+         System.out.println("Sending complete message now");
+         send(currentlargeMessage);
+         this.currentlargeMessage = null;
+      }
+   }
+
    public void requestAndSendCredits() throws Exception
-	{	 
-	   if (!waiting)
-	   {
-	      flowController.requestAndSendCredits(this, creditsToSend.get());
-	   }
-	}
+   {
+      if (!waiting)
+      {
+         flowController.requestAndSendCredits(this, creditsToSend.get());
+      }
+   }
 
-	public void sendCredits(final int credits) throws Exception
-	{
-	   creditsToSend.addAndGet(-credits);
-	   
-		Packet packet = new SessionProducerFlowCreditMessage(id, credits);
-		
-		channel.send( packet);	
-	}
-	
-	public void setWaiting(final boolean waiting)
-	{
-		this.waiting = waiting;
-	}
-	
-	public boolean isWaiting()
-	{
-		return waiting;
-	}
+   public void sendCredits(final int credits) throws Exception
+   {
+      creditsToSend.addAndGet(-credits);
 
+      Packet packet = new SessionProducerFlowCreditMessage(id, credits);
 
+      channel.send(packet);
+   }
 
+   public void setWaiting(final boolean waiting)
+   {
+      this.waiting = waiting;
+   }
+
+   public boolean isWaiting()
+   {
+      return waiting;
+   }
+
+
+
    private void doFlowControl(final ServerMessage message) throws Exception
    {
       if (this.address != null)

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -34,12 +34,14 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.security.CheckType;
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConsumer;
+import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerProducer;
 import org.jboss.messaging.core.server.ServerSession;
@@ -86,8 +88,6 @@
 
    // Attributes ----------------------------------------------------------------------------
 
-   private final boolean trace = log.isTraceEnabled();
-
    private final long id;
 
    private final String username;
@@ -1092,7 +1092,7 @@
       int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
 
       SimpleString groupId = null;
-      if(autoGroupId)
+      if (autoGroupId)
       {
          groupId = simpleStringIdGenerator.generateID();
       }
@@ -1169,6 +1169,44 @@
       producers.get(producerID).send(message);
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.ServerSession#createLargeMessage(long, int, byte[])
+    */
+   public ServerLargeMessage createLargeMessageStorage(long producerID, byte[] header) throws Exception
+   {
+      ServerLargeMessage largeMessage = storageManager.createLargeMessageStorage(storageManager.generateUniqueID());
+      
+      MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
+      
+      largeMessage.decodeProperties(headerBuffer);
+      
+      ServerProducer producer = producers.get(producerID);
+       
+      producer.setCurrentChunk(largeMessage);
+      
+      return largeMessage;
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.ServerSession#getCurrentLargeMessage(long)
+    */
+   public ServerLargeMessage getCurrentLargeMessage(long producerID)
+   {
+      ServerProducer producer = producers.get(producerID);
+      
+      return producer.getCurrentChunk();
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.ServerSession#sendCurrentLargeMessage(long)
+    */
+   public void clearCurrentLargeMessage(long producerID)
+   {
+      ServerProducer producer = producers.get(producerID);
+      
+      producer.setCurrentChunk(null);      
+   }
+
    public void sendScheduledProducerMessage(final long producerID, final ServerMessage message, final long scheduledDeliveryTime) throws Exception
    {
        producers.get(producerID).sendScheduled(message, scheduledDeliveryTime);  

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -26,6 +26,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
@@ -76,6 +77,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
@@ -91,6 +93,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
 
@@ -149,7 +152,20 @@
             send.getServerMessage().setMessageID(id);
          }
       }
+      else
+      if (type == SESS_CHUNK_SEND)
+      {
+         SessionSendChunkMessage send = (SessionSendChunkMessage)packet;
 
+         if (send.getHeader() != null && send.getMessageID() <= 0L)
+         {
+            // must generate message id here, so we know they are in sync
+            long id = storageManager.generateUniqueID();
+            
+            send.setMessageID(id);
+         }
+      }
+
       Packet response = null;
 
       try
@@ -207,7 +223,10 @@
             case SESS_CREATEPRODUCER:
             {
                SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
-               response = session.createProducer(request.getAddress(), request.getWindowSize(), request.getMaxRate(), request.isAutoGroupId());
+               response = session.createProducer(request.getAddress(),
+                                                 request.getWindowSize(),
+                                                 request.getMaxRate(),
+                                                 request.isAutoGroupId());
                break;
             }
             case SESS_PROCESSED:
@@ -229,7 +248,7 @@
             case SESS_ROLLBACK:
             {
                session.rollback();
-               //Rollback response is handled in the rollback() method
+               // Rollback response is handled in the rollback() method
                break;
             }
             case SESS_XA_COMMIT:
@@ -370,6 +389,37 @@
                }
                break;
             }
+            case SESS_CHUNK_SEND:
+            {
+               SessionSendChunkMessage message = (SessionSendChunkMessage)packet;
+               
+               ServerLargeMessage largeMessage = null;
+               
+               if (message.getHeader() != null)
+               {
+                  largeMessage = session.createLargeMessageStorage(message.getTargetID(), message.getHeader());
+                  largeMessage.setMessageID(message.getMessageID());
+               }
+               else
+               {
+                  largeMessage = session.getCurrentLargeMessage(message.getTargetID());
+               }
+               
+
+               largeMessage.addBytes(message.getBody());
+               
+               
+               if (!message.isContinues())
+               {
+                  session.sendProducerMessage(message.getTargetID(), largeMessage);
+               }
+               
+               if (message.isRequiresResponse())
+               {
+                  response = new NullResponseMessage(false);
+               }
+               break;
+            }
             case SESS_SCHEDULED_SEND:
             {
                SessionScheduledSendMessage message = (SessionScheduledSendMessage)packet;

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/base/IntegrationTestBase.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -1,183 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.tests.integration.base;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
-import org.jboss.messaging.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.jms.client.JBossBytesMessage;
-import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
-
-/**
- * 
- * Base class with basic utilities on starting up a basic server
- * 
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class IntegrationTestBase extends UnitTestCase
-{
-   
-   // Constants -----------------------------------------------------
-   
-   // Attributes ----------------------------------------------------
-   
-   protected static final String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName();
-   protected static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
-   
-   protected static final String NETTY_ACCEPTOR_FACTORY = NettyAcceptorFactory.class.getCanonicalName();
-   protected static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
-   
-   protected String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/journal";
-   protected String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/bindings";
-   protected String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/integration-test/page";
-   protected MessagingService messagingService;
-
-   
-   // Static --------------------------------------------------------
-   
-   // Constructors --------------------------------------------------
-   
-   // Public --------------------------------------------------------
-   
-   // Package protected ---------------------------------------------
-   
-   // Protected -----------------------------------------------------
-   
-   protected void clearData()
-   {
-      File file = new File(journalDir);
-      File file2 = new File(bindingsDir);
-      File file3 = new File(pageDir);
-      deleteDirectory(file);
-      file.mkdirs();
-      deleteDirectory(file2);
-      file2.mkdirs();
-      deleteDirectory(file3);
-      file3.mkdirs();
-   }
-
-
-   protected MessagingService createService(boolean realFiles, boolean netty, Configuration configuration, Map<String, QueueSettings> settings)
-   {
-      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      
-      if (netty)
-      {
-         configuration.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY));
-      }
-      
-      MessagingService service;
-      
-      if (realFiles)
-      {
-         service = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
-      }
-      else
-      {
-         service = MessagingServiceImpl.newNullStorageMessagingServer(configuration);
-      }
-         
-      
-      for (Map.Entry<String, QueueSettings> setting: settings.entrySet())
-      {
-         service.getServer().getQueueSettingsRepository().addMatch(setting.getKey(), setting.getValue());
-      }
-   
-      
-      return service;
-   }
-
-   protected MessagingService createService(boolean realFiles)
-   {
-      return createService(realFiles, false, createDefaultConfig(), new HashMap<String, QueueSettings>());
-   }
-
-
-   protected Configuration createDefaultConfig()
-   {
-      Configuration configuration = new ConfigurationImpl();
-      configuration.setSecurityEnabled(false);
-      configuration.setJournalMinFiles(2);
-      configuration.setPagingDirectory(pageDir);
-      
-      return configuration;
-   }
-
-
-   protected ClientSessionFactory createInVMFactory()
-   {
-      return new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-   }
-   
-   protected ClientSessionFactory createNettyFactory()
-   {
-      return new ClientSessionFactoryImpl(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
-   }
-   
-   protected ClientMessage createTextMessage(ClientSession session, String s)
-   {
-      return createTextMessage(session, s, true);
-   }
-
-   protected ClientMessage createTextMessage(ClientSession session, String s, boolean durable)
-   {
-      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
-      message.getBody().putString(s);
-      message.getBody().flip();
-      return message;
-   }
-
-   protected ClientMessage createBytesMessage(ClientSession session, byte[] b, boolean durable)
-   {
-      ClientMessage message = session.createClientMessage(JBossBytesMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
-      message.getBody().putBytes(b);
-      message.getBody().flip();
-      return message;
-   }
-
-   
-   
-   // Private -------------------------------------------------------
-   
-   // Inner classes -------------------------------------------------
-   
-}

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -32,10 +32,10 @@
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.tests.integration.base.IntegrationTestBase;
+import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.util.SimpleString;
 
-public class DestroyConsumerTest extends IntegrationTestBase
+public class DestroyConsumerTest extends ServiceTestBase
 {
    
    // Constants -----------------------------------------------------

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -55,6 +55,7 @@
 
    private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/journal";
    private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/bindings";
+   private String largeMessagesDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/largemsg";
    private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/page";
    private MessagingService messagingService;
    private ClientSession clientSession;
@@ -83,7 +84,7 @@
 
       TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
       configuration.getAcceptorConfigurations().add(transportConfig);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, largeMessagesDir);
       //start the server
       messagingService.start();
       //then we create a client as normal
@@ -1188,7 +1189,7 @@
       clientSession = null;
       messagingService.stop();
       messagingService = null;
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, largeMessagesDir);
       
       addSettings();
       

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/stress/paging/MultipleDestinationPagingTest.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -23,7 +23,7 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.tests.integration.base.IntegrationTestBase;
+import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -31,7 +31,7 @@
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  */
-public class MultipleDestinationPagingTest extends IntegrationTestBase
+public class MultipleDestinationPagingTest extends ServiceTestBase
 {
 
    // Constants -----------------------------------------------------

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/timing/core/client/ScheduledMessageTest.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -103,7 +103,7 @@
 
       TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
       configuration.getAcceptorConfigurations().add(transportConfig);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
       //start the server
       messagingService.start();
       //then we create a client as normal
@@ -124,7 +124,7 @@
       session.close();
       messagingService.stop();
       messagingService = null;
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
       messagingService.start();
 
       sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
@@ -148,7 +148,7 @@
        Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
       TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
       configuration.getAcceptorConfigurations().add(transportConfig);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
       //start the server
       messagingService.start();
       //then we create a client as normal
@@ -171,7 +171,7 @@
       session.close();
       messagingService.stop();
       messagingService = null;
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
       messagingService.start();
 
       sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
@@ -196,7 +196,7 @@
       TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
       configuration.getAcceptorConfigurations().add(transportConfig);
       configuration.setPagingMaxGlobalSizeBytes(0);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
       //start the server
       messagingService.start();
       //then we create a client as normal

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java	2008-10-09 03:05:32 UTC (rev 5092)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -76,7 +76,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       ServerMessage msg = EasyMock.createStrictMock(ServerMessage.class);
       long msgID = 1021092;
@@ -92,7 +92,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       final long queueID = 1210981;
       final long messageID = 101921092;
@@ -110,7 +110,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       final long messageID = 101921092;
 
@@ -125,7 +125,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       ServerMessage msg = EasyMock.createStrictMock(ServerMessage.class);
       long msgID = 1021092;
@@ -142,7 +142,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       final long queueID = 1210981;
       final long messageID = 101921092;
@@ -162,7 +162,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       final long messageID = 101921092;
       final long txID = 1209373;
@@ -178,7 +178,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       final long txID = 1209373;
 
@@ -196,7 +196,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       final long txID = 1209373;
 
@@ -211,7 +211,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       final long txID = 1209373;
 
@@ -226,7 +226,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       final long msgID = 120912901;
       final long queueID = 1283743;
@@ -254,7 +254,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       messageJournal.load((List<RecordInfo>)EasyMock.anyObject(), (List<PreparedTransactionInfo>)EasyMock.anyObject());
 
@@ -422,7 +422,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       Queue queue = EasyMock.createStrictMock(Queue.class);
       SimpleString queueName = new SimpleString("saiohsiudh");
@@ -493,7 +493,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       Binding binding = EasyMock.createStrictMock(Binding.class);
       Queue queue = EasyMock.createStrictMock(Queue.class);
@@ -515,7 +515,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       Binding binding = EasyMock.createStrictMock(Binding.class);
       Queue queue = EasyMock.createStrictMock(Queue.class);
@@ -543,7 +543,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       SimpleString dest = new SimpleString("oaskokas");
 
@@ -667,7 +667,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       bindingsJournal.load((List<RecordInfo>)EasyMock.anyObject(), (List<PreparedTransactionInfo>)EasyMock.anyObject());
 
@@ -734,7 +734,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       assertFalse(jsm.isStarted());
       bindingsJournal.start();
@@ -788,7 +788,7 @@
       Journal messageJournal = EasyMock.createStrictMock(Journal.class);
       Journal bindingsJournal = EasyMock.createStrictMock(Journal.class);
 
-      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal);
+      JournalStorageManager jsm = new JournalStorageManager(messageJournal, bindingsJournal, null);
 
       long id = jsm.generateUniqueID();
 

Added: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	                        (rev 0)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2008-10-09 03:09:55 UTC (rev 5093)
@@ -0,0 +1,216 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.util;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+
+/**
+ * 
+ * Base class with basic utilities on starting up a basic server
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class ServiceTestBase extends UnitTestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   protected boolean realFiles = false;
+
+   protected static final String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName();
+
+   protected static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
+
+   protected static final String NETTY_ACCEPTOR_FACTORY = NettyAcceptorFactory.class.getCanonicalName();
+
+   protected static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
+
+   protected String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/journal";
+
+   protected String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/bindings";
+
+   protected String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/page";
+
+   protected String largeMessagesDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/large-msg";
+
+   protected MessagingService messagingService;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      if (realFiles)
+      {
+         clearData();
+      }
+      messagingService = createService(realFiles);
+      messagingService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      messagingService.stop();
+   }
+
+   protected void clearData()
+   {
+      File file = new File(journalDir);
+      File file2 = new File(bindingsDir);
+      File file3 = new File(pageDir);
+      File file4 = new File(largeMessagesDir);
+      deleteDirectory(file);
+      file.mkdirs();
+      deleteDirectory(file2);
+      file2.mkdirs();
+      deleteDirectory(file3);
+      file3.mkdirs();
+      deleteDirectory(file4);
+      file4.mkdirs();
+   }
+
+   protected MessagingService createService(final boolean realFiles,
+                                            final boolean netty,
+                                            final Configuration configuration,
+                                            final Map<String, QueueSettings> settings)
+   {
+      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+
+      if (netty)
+      {
+         configuration.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY));
+      }
+
+      MessagingService service;
+
+      if (realFiles)
+      {
+         service = MessagingServiceImpl.newNioStorageMessagingServer(configuration,
+                                                                     journalDir,
+                                                                     bindingsDir,
+                                                                     largeMessagesDir);
+      }
+      else
+      {
+         service = MessagingServiceImpl.newNullStorageMessagingServer(configuration);
+      }
+
+      for (Map.Entry<String, QueueSettings> setting : settings.entrySet())
+      {
+         service.getServer().getQueueSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+      }
+
+      return service;
+   }
+
+   protected MessagingService createService(final boolean realFiles)
+   {
+      return createService(realFiles, false, createDefaultConfig(), new HashMap<String, QueueSettings>());
+   }
+
+   protected Configuration createDefaultConfig()
+   {
+      Configuration configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      configuration.setJournalMinFiles(2);
+      configuration.setPagingDirectory(pageDir);
+
+      return configuration;
+   }
+
+   protected ClientSessionFactory createInVMFactory()
+   {
+      return new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+   }
+
+   protected ClientSessionFactory createNettyFactory()
+   {
+      return new ClientSessionFactoryImpl(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
+   }
+
+   protected ClientMessage createTextMessage(final ClientSession session, final String s)
+   {
+      return createTextMessage(session, s, true);
+   }
+
+   protected ClientMessage createTextMessage(final ClientSession session, final String s, final boolean durable)
+   {
+      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+                                                          durable,
+                                                          0,
+                                                          System.currentTimeMillis(),
+                                                          (byte)1);
+      message.getBody().putString(s);
+      message.getBody().flip();
+      return message;
+   }
+
+   protected ClientMessage createBytesMessage(final ClientSession session, final byte[] b, final boolean durable)
+   {
+      ClientMessage message = session.createClientMessage(JBossBytesMessage.TYPE,
+                                                          durable,
+                                                          0,
+                                                          System.currentTimeMillis(),
+                                                          (byte)1);
+      message.getBody().putBytes(b);
+      message.getBody().flip();
+      return message;
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}




More information about the jboss-cvs-commits mailing list