[jboss-cvs] JBoss Messaging SVN: r5114 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/client/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 14 18:36:10 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-10-14 18:36:10 -0400 (Tue, 14 Oct 2008)
New Revision: 5114

Added:
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java
Modified:
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientConsumer.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
Treating largeMessages as files on client side also

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientConsumer.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientConsumer.java	2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientConsumer.java	2008-10-14 22:36:10 UTC (rev 5114)
@@ -18,10 +18,12 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.client;
 
+import java.io.File;
+
 import org.jboss.messaging.core.exception.MessagingException;
 
 /**
@@ -30,20 +32,29 @@
  * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  */
 public interface ClientConsumer
-{      
-	ClientMessage receive() throws MessagingException;
-	
+{
+   ClientMessage receive() throws MessagingException;
+
    ClientMessage receive(long timeout) throws MessagingException;
-   
+
    ClientMessage receiveImmediate() throws MessagingException;
-   
+
    MessageHandler getMessageHandler() throws MessagingException;
 
    void setMessageHandler(MessageHandler handler) throws MessagingException;
-   
+
    void close() throws MessagingException;
-   
-   boolean isClosed();   
-   
-   boolean isDirect();   
+
+   boolean isClosed();
+
+   boolean isDirect();
+
+   boolean isLargeMessagesAsFiles();
+
+   void setLargeMessagesAsFiles(boolean bvalue);
+
+   File getLargeMessagesDir();
+
+   void setLargeMessagesDir(File directory);
+
 }

Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java	                        (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java	2008-10-14 22:36:10 UTC (rev 5114)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.client;
+
+import java.io.File;
+import java.nio.channels.FileChannel;
+
+/**
+ * A FileClientMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Oct 14, 2008 3:21:15 PM
+ *
+ *
+ */
+public interface FileClientMessage
+{
+   File getFile();
+
+   void setFile(File file);
+   
+   FileChannel getChannel() throws Exception;
+   
+   void closeChannel() throws Exception;  
+   
+
+}

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-14 22:36:10 UTC (rev 5114)
@@ -12,6 +12,8 @@
 
 package org.jboss.messaging.core.client.impl;
 
+import java.io.File;
+import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.Executor;
@@ -21,8 +23,10 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.Future;
 
 /**
@@ -63,6 +67,11 @@
 
    private final Runner runner = new Runner();
 
+   private boolean largeMessagesAsFiles = false;
+   
+   private File largeMessagesDir;
+
+   
    private volatile Thread receiverThread;
 
    private volatile Thread onMessageThread;
@@ -100,6 +109,8 @@
       this.clientWindowSize = clientWindowSize;
 
       this.direct = direct;
+      
+      this.largeMessagesDir = new File(System.getProperty("user.dir") + File.separator + "jbm-large-messages");
    }
 
    // ClientConsumer implementation
@@ -187,6 +198,29 @@
       }
    }
 
+   
+   public ClientMessage createLargeMessage(MessagingBuffer propertiesBuffer) throws Exception
+   {
+      if (isLargeMessagesAsFiles())
+      {
+         if (!this.largeMessagesDir.exists())
+         {
+            largeMessagesDir.mkdirs();
+         }
+         
+         FileClientMessageImpl message = new FileClientMessageImpl();
+         message.decodeProperties(propertiesBuffer);
+         message.setFile(new File(this.largeMessagesDir, message.getMessageID() + "-" + this.getID() + ".jbm"));
+         return message;
+      }
+      else
+      {
+         ClientMessageImpl message = new ClientMessageImpl();
+         message.decodeProperties(propertiesBuffer);
+         return message;
+      }
+   }
+   
    public ClientMessage receive() throws MessagingException
    {
       return receive(0);
@@ -255,6 +289,27 @@
       return direct;
    }
 
+
+   public boolean isLargeMessagesAsFiles()
+   {
+      return largeMessagesAsFiles;
+   }
+
+   public void setLargeMessagesAsFiles(boolean bvalue)
+   {
+      this.largeMessagesAsFiles = bvalue;
+   }
+
+   public File getLargeMessagesDir()
+   {
+      return this.largeMessagesDir;
+   }
+
+   public void setLargeMessagesDir(File directory)
+   {
+      this.largeMessagesDir = directory;
+   }
+   
    // ClientConsumerInternal implementation
    // --------------------------------------------------------------
 

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-10-14 22:36:10 UTC (rev 5114)
@@ -24,6 +24,7 @@
 
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
 /**
  * 
@@ -51,4 +52,6 @@
    void cleanUp() throws Exception;
    
    void failover();
+   
+   ClientMessage createLargeMessage(MessagingBuffer propertiesBuffer) throws Exception;
 }

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java	2008-10-14 22:36:10 UTC (rev 5114)
@@ -72,7 +72,6 @@
       super((byte) 0, durable, 0, System.currentTimeMillis(), (byte)4, body);
    }
    
-   /* Only used in testing */
    public ClientMessageImpl()
    {      
    }

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-14 22:36:10 UTC (rev 5114)
@@ -705,7 +705,14 @@
          consumer.handleMessage(message);
       }
    }
+   
+   public ClientMessage createLargeMessage(final long consumerID, final MessagingBuffer header) throws Exception
+   {
+      ClientConsumerInternal consumer = consumers.get(consumerID);
 
+      return consumer.createLargeMessage(header);
+   }
+   
    public void receiveProducerCredits(final long producerID, final int credits) throws Exception
    {
       ClientProducerInternal producer = producers.get(producerID);

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java	2008-10-14 22:36:10 UTC (rev 5114)
@@ -20,6 +20,7 @@
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -59,5 +60,7 @@
 
    void handleReceiveMessage(long consumerID, ClientMessage message) throws Exception;
 
-   void handleFailover(final RemotingConnection backupConnection);
+   void handleFailover(RemotingConnection backupConnection);
+
+   ClientMessage createLargeMessage(long consumerID, MessagingBuffer header) throws Exception;
 }

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java	2008-10-14 22:36:10 UTC (rev 5114)
@@ -23,15 +23,21 @@
 package org.jboss.messaging.core.client.impl;
 
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.FileClientMessage;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
@@ -54,50 +60,55 @@
    private static final Logger log = Logger.getLogger(ClientSessionPacketHandler.class);
 
    private final ClientSessionInternal clientSession;
-   
+
    private Map<Long, ClientMessage> currentChunk = new ConcurrentHashMap<Long, ClientMessage>();
 
    public ClientSessionPacketHandler(final ClientSessionInternal clientSesssion)
-   {     
+   {
       this.clientSession = clientSesssion;
    }
-      
+
    public void handlePacket(final Packet packet)
    {
       byte type = packet.getType();
-       
+
       try
       {
          switch (type)
          {
             case SESS_RECEIVETOKENS:
             {
-               SessionProducerFlowCreditMessage message = (SessionProducerFlowCreditMessage) packet;
-   
+               SessionProducerFlowCreditMessage message = (SessionProducerFlowCreditMessage)packet;
+
                clientSession.receiveProducerCredits(message.getProducerID(), message.getTokens());
-               
+
                break;
             }
             case SESS_CHUNK_SEND:
             {
                System.out.println("received a chunk");
-               SessionSendChunkMessage chunk = (SessionSendChunkMessage) packet;
-               
+               SessionSendChunkMessage chunk = (SessionSendChunkMessage)packet;
+
                ClientMessage currentChunkMessage = null;
-               
+
                if (chunk.getHeader() != null)
                {
 
                   MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
 
-                  currentChunkMessage = new ClientMessageImpl();
-                  
-                  currentChunkMessage.decodeProperties(header);
-                  
-                  MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
-                  
-                  currentChunkMessage.setBody(initialBody);
-                  
+                  currentChunkMessage = clientSession.createLargeMessage(chunk.getTargetID(), header);
+
+                  if (currentChunkMessage instanceof FileClientMessage)
+                  {
+                     FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+                     addBytesBody(fileMessage, chunk.getBody());
+                  }
+                  else
+                  {
+                     MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
+                     currentChunkMessage.setBody(initialBody);
+                  }
+
                   currentChunk.put(chunk.getTargetID(), currentChunkMessage);
                }
                else
@@ -105,41 +116,52 @@
                   ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
 
                   currentChunkMessage = currentChunk.get(chunk.getMessageID());
-                  
-                  MessagingBuffer currentBody = currentChunkMessage.getBody();
-                  
-                  MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
-                  
-                  newBody.putBytes(currentBody.array());
-                  newBody.putBytes(body.array());
-                  
-                  currentChunkMessage.setBody(newBody);
+
+                  if (currentChunkMessage instanceof FileClientMessage)
+                  {
+                     FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+                     addBytesBody(fileMessage, chunk.getBody());
+                  }
+                  else
+                  {
+                     MessagingBuffer currentBody = currentChunkMessage.getBody();
+
+                     MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
+
+                     newBody.putBytes(currentBody.array());
+                     newBody.putBytes(body.array());
+
+                     currentChunkMessage.setBody(newBody);
+                  }
                }
-               
+
                if (!chunk.isContinues())
                {
+                  if (currentChunkMessage instanceof FileClientMessage)
+                  {
+                     ((FileClientMessage)currentChunkMessage).closeChannel();
+                  }
                   clientSession.handleReceiveMessage(chunk.getTargetID(), currentChunkMessage);
                }
-               
-               
+
                break;
             }
             case SESS_RECEIVE_MSG:
             {
-               SessionReceiveMessage message = (SessionReceiveMessage) packet;
-      
+               SessionReceiveMessage message = (SessionReceiveMessage)packet;
+
                clientSession.handleReceiveMessage(message.getConsumerID(), message.getClientMessage());
-               
+
                break;
             }
             case EXCEPTION:
             {
-               //TODO - we can provide a means for async exceptions to get back to to client
-               //For now we just log it
+               // TODO - we can provide a means for async exceptions to get back to to client
+               // For now we just log it
                MessagingExceptionMessage mem = (MessagingExceptionMessage)packet;
-               
+
                log.error("Received exception asynchronously from server", mem.getException());
-               
+
                break;
             }
             default:
@@ -153,4 +175,16 @@
          log.error("Failed to handle packet", e);
       }
    }
+
+   /**
+    * @param fileMessage
+    * @param body
+    * @throws FileNotFoundException
+    * @throws IOException
+    */
+   private void addBytesBody(FileClientMessage fileMessage, byte[] body) throws Exception
+   {
+      FileChannel channel = fileMessage.getChannel();
+      channel.write(ByteBuffer.wrap(body));
+   }
 }
\ No newline at end of file

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java	2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java	2008-10-14 22:36:10 UTC (rev 5114)
@@ -20,9 +20,19 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-
 package org.jboss.messaging.core.client.impl;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
 /**
  * A FileClientMessageImpl
  *
@@ -32,9 +42,29 @@
  *
  *
  */
-public class FileClientMessageImpl extends ClientMessageImpl
+public class FileClientMessageImpl extends ClientMessageImpl implements FileClientMessage
 {
 
+   File file;
+
+   FileChannel currentChannel;
+
+   /**
+    * 
+    */
+   public FileClientMessageImpl()
+   {
+      super();
+   }
+
+   /**
+    * @param deliveryCount
+    */
+   public FileClientMessageImpl(final int deliveryCount)
+   {
+      super(deliveryCount);
+   }
+
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -45,6 +75,103 @@
 
    // Public --------------------------------------------------------
 
+   /**
+    * @return the file
+    */
+   public File getFile()
+   {
+      return file;
+   }
+
+   /**
+    * @param file the file to set
+    */
+   public void setFile(final File file)
+   {
+      this.file = file;
+   }
+
+   @Override
+   public MessagingBuffer getBody()
+   {
+      FileChannel channel = null;
+      try
+      {
+         // We open a new channel on getBody.
+         // for a better performance, users should be using the channel, instead of reading the file
+         channel = newChannel();
+
+         ByteBuffer buffer = ByteBuffer.allocate((int)channel.size());
+
+         channel.position(0);
+         channel.read(buffer);
+
+         return new ByteBufferWrapper(buffer);
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e);
+      }
+      finally
+      {
+         try
+         {
+            channel.close();
+         }
+         catch (Throwable ignored)
+         {
+
+         }
+      }
+   }
+
+   /**
+    * @return
+    * @throws FileNotFoundException
+    * @throws IOException
+    */
+   private FileChannel newChannel() throws FileNotFoundException, IOException
+   {
+      RandomAccessFile randomFile = new RandomAccessFile(getFile(), "rw");
+      randomFile.seek(0);
+
+      FileChannel channel = randomFile.getChannel();
+      return channel;
+   }
+
+   @Override
+   public void setBody(final MessagingBuffer body)
+   {
+
+      throw new RuntimeException("Not supported");
+   }
+
+   public synchronized FileChannel getChannel() throws Exception
+   {
+      if (currentChannel == null)
+      {
+         currentChannel = newChannel();
+      }
+
+      return currentChannel;
+   }
+
+   public void closeChannel() throws Exception
+   {
+      if (currentChannel != null)
+      {
+         currentChannel.close();
+         currentChannel = null;
+      }
+
+   }
+
+   @Override
+   public int getBodySize()
+   {
+      return (int)file.length();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-14 22:36:10 UTC (rev 5114)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.tests.integration.chunkmessage;
 
+import java.io.File;
 import java.nio.ByteBuffer;
 
 import org.jboss.messaging.core.client.ClientConsumer;
@@ -59,91 +60,117 @@
 
    // Public --------------------------------------------------------
 
-   public void testMessageChunk() throws Exception
+   public void testMessageChunkNullPersistence() throws Exception
    {
-      ClientSessionFactory sf = createInVMFactory();
+      testInternal(false);
+   }
 
-      ClientSession session = sf.createSession(false, true, true, false);
+   public void testMessageChunkFilePersistence() throws Exception
+   {
+      testInternal(true);
+   }
 
-      session.createQueue(ADDRESS, ADDRESS, null, true, false);
+   public void testInternal(final boolean realFiles) throws Exception
+   {
 
-      ClientProducer producer = session.createProducer(ADDRESS);
-      
-      MessagingBuffer body = new ByteBufferWrapper(ByteBuffer.allocate(DataConstants.SIZE_INT * 150000));
-      
-      for (int i = 0; i < 15000; i++)
+      clearData();
+
+      messagingService = createService(realFiles);
+      messagingService.start();
+
+      try
       {
-         body.putInt(i);
-      }
-      body.flip();
-      
-      //printBuffer("body to be sent : " , body);
-      
-      ClientMessage message = session.createClientMessage(true); 
+         ClientSessionFactory sf = createInVMFactory();
 
-      message.setBody(body);
-      producer.send(message);
-      
-      session.close();
+         ClientSession session = sf.createSession(false, true, true, false);
 
-      if (this.realFiles)
+         session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         MessagingBuffer body = new ByteBufferWrapper(ByteBuffer.allocate(DataConstants.SIZE_INT * 150000));
+
+         for (int i = 0; i < 15000; i++)
+         {
+            body.putInt(i);
+         }
+         body.flip();
+
+         // printBuffer("body to be sent : " , body);
+
+         ClientMessage message = session.createClientMessage(true);
+
+         message.setBody(body);
+         producer.send(message);
+
+         session.close();
+
+         if (realFiles)
+         {
+            messagingService.stop();
+
+            messagingService = createService(realFiles);
+            messagingService.start();
+
+            sf = createInVMFactory();
+         }
+
+         session = sf.createSession(false, true, true, false);
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         if (realFiles)
+         {
+            consumer.setLargeMessagesAsFiles(true);
+            consumer.setLargeMessagesDir(new File(clientLargeMessagesDir));
+         }
+
+         session.start();
+
+         ClientMessage message2 = consumer.receive(5000);
+
+         assertNotNull(message2);
+
+         System.out.println("msg on client = " + message2.getMessageID());
+
+         // printBuffer("message received : ", message2.getBody());
+
+         assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
+
+         session.close();
+      }
+      finally
       {
          messagingService.stop();
-         
-         messagingService = createService(realFiles);
-         messagingService.start();
-   
-         sf = createInVMFactory();
       }
-
-      session = sf.createSession(false, true, true, false);
-
-      ClientConsumer consumer = session.createConsumer(ADDRESS);
-      
-      session.start();
-      
-      ClientMessage message2 = consumer.receive(5000);
-      
-      assertNotNull(message2);
-      
-      System.out.println("msg on client = " + message2.getMessageID());
-      
-      //printBuffer("message received : ", message2.getBody());
-      
-      assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
-      
-      session.close();
    }
 
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
+   @Override
    protected void setUp() throws Exception
    {
-      this.realFiles = true;
-      super.setUp();
    }
 
+   @Override
    protected void tearDown() throws Exception
    {
-      super.tearDown();
    }
-   
-   
+
    // Private -------------------------------------------------------
-   
-   public static void printBuffer(String msg, MessagingBuffer buffer)
+
+   public static void printBuffer(final String msg, final MessagingBuffer buffer)
    {
-      
+
       buffer.rewind();
-      
+
       int size = buffer.limit();
-    
+
       System.out.print(msg);
-    
-      
-      for (int i = 0; i < size; i ++)
+
+      for (int i = 0; i < size; i++)
       {
          System.out.print(String.format("%1$X", buffer.getByte()));
          if (i % 40 != 0 || i == 0)
@@ -158,7 +185,6 @@
       }
       buffer.rewind();
 
-      
    }
 
    // Inner classes -------------------------------------------------

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2008-10-14 22:36:10 UTC (rev 5114)
@@ -57,8 +57,6 @@
 
    // Attributes ----------------------------------------------------
 
-   protected boolean realFiles = false;
-
    protected static final String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName();
 
    protected static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
@@ -74,6 +72,8 @@
    protected String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/page";
 
    protected String largeMessagesDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/large-msg";
+   
+   protected String clientLargeMessagesDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/client-large-msg";
 
    protected MessagingService messagingService;
 
@@ -87,29 +87,14 @@
 
    // Protected -----------------------------------------------------
 
-   @Override
-   protected void setUp() throws Exception
-   {
-      if (realFiles)
-      {
-         clearData();
-      }
-      messagingService = createService(realFiles);
-      messagingService.start();
-   }
 
-   @Override
-   protected void tearDown() throws Exception
-   {
-      messagingService.stop();
-   }
-
    protected void clearData()
    {
       File file = new File(journalDir);
       File file2 = new File(bindingsDir);
       File file3 = new File(pageDir);
       File file4 = new File(largeMessagesDir);
+      File file5 = new File(clientLargeMessagesDir);
       deleteDirectory(file);
       file.mkdirs();
       deleteDirectory(file2);
@@ -118,6 +103,8 @@
       file3.mkdirs();
       deleteDirectory(file4);
       file4.mkdirs();
+      deleteDirectory(file5);
+      file5.mkdirs();
    }
 
    protected MessagingService createService(final boolean realFiles,
@@ -165,6 +152,7 @@
       Configuration configuration = new ConfigurationImpl();
       configuration.setSecurityEnabled(false);
       configuration.setJournalMinFiles(2);
+      configuration.setJournalFileSize(100*1024);
       configuration.setPagingDirectory(pageDir);
 
       return configuration;




More information about the jboss-cvs-commits mailing list