[jboss-cvs] JBoss Messaging SVN: r5140 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/journal/impl and 11 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 17 17:34:39 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-10-17 17:34:38 -0400 (Fri, 17 Oct 2008)
New Revision: 5140

Added:
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java
Modified:
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
LargeMessages paging and few other tweaks

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -35,7 +35,7 @@
  */
 public interface SequentialFileFactory
 {
-   SequentialFile createSequentialFile(String fileName, int maxIO) throws Exception;
+   SequentialFile createSequentialFile(String fileName, int maxIO);
 
    List<String> listFiles(String extension) throws Exception;
 

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -67,7 +67,7 @@
    // AIO using a single thread.
    private ExecutorService executor;
 
-   public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO) throws Exception
+   public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO)
    {
       this.journalDir = journalDir;
       this.fileName = fileName;

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -41,7 +41,7 @@
       super(journalDir);
    }
 
-   public SequentialFile createSequentialFile(final String fileName, final int maxIO) throws Exception
+   public SequentialFile createSequentialFile(final String fileName, final int maxIO)
    {
       return new AIOSequentialFile(journalDir, fileName, maxIO);
    }

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -48,7 +48,7 @@
 
    private final String fileName;
 
-   private File file;
+   private final File file;
 
    private FileChannel channel;
 
@@ -61,6 +61,9 @@
       this.journalDir = journalDir;
 
       this.fileName = fileName;
+
+      this.file = new File(journalDir + "/" + fileName);
+
    }
 
    public int getAlignment()
@@ -78,15 +81,13 @@
       return fileName;
    }
    
-   public boolean isOpen()
+   public synchronized boolean isOpen()
    {
-      return file != null;
+      return channel != null;
    }
 
    public synchronized void open() throws Exception
    {
-      file = new File(journalDir + "/" + fileName);
-
       rfile = new RandomAccessFile(file, "rw");
 
       channel = rfile.getChannel();
@@ -131,15 +132,16 @@
       channel = null;
 
       rfile = null;
-
-      file = null;
    }
 
    public void delete() throws Exception
    {
+      if (isOpen())
+      {
+         close();
+      }
+
       file.delete();
-
-      close();
    }
 
    public int read(final ByteBuffer bytes) throws Exception

Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java	                        (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A PageLargeMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Oct 17, 2008 4:04:39 PM
+ *
+ *
+ */
+public interface PageLargeMessage extends EncodingSupport
+{
+   byte[] getBytes();
+
+   MessagingBuffer getBuffer();
+}

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -36,7 +36,7 @@
  */
 public interface PageMessage extends EncodingSupport
 {
-   ServerMessage getMessage();
+   Object getMessage();
 
    long getTransactionID();
 

Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java	                        (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.paging.impl;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.paging.PageLargeMessage;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * <p>A PageLargeMessageImpl>/p>
+ * 
+ * <p>A Paged Large Message needs to be instantiated later when the StorageManager instance is available. 
+ * This class will hold the bytes until it is possible to instantiate the LargeMessage properly</p>
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Oct 17, 2008 2:46:32 PM
+ *
+ *
+ */
+public class PageLargeMessageImpl implements PageLargeMessage
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   final byte[] bytes;
+
+   /**
+    * @param bytes
+    */
+   public PageLargeMessageImpl(byte[] bytes)
+   {
+      super();
+      this.bytes = bytes;
+   }
+
+   /**
+    * @return the bytes
+    */
+   public byte[] getBytes()
+   {
+      return bytes;
+   }
+
+   public MessagingBuffer getBuffer()
+   {
+      return new ByteBufferWrapper(ByteBuffer.wrap(bytes));
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.EncodingSupport#decode(org.jboss.messaging.core.remoting.spi.MessagingBuffer)
+    */
+   public void decode(MessagingBuffer buffer)
+   {
+      throw new IllegalStateException("Not supported");
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.EncodingSupport#encode(org.jboss.messaging.core.remoting.spi.MessagingBuffer)
+    */
+   public void encode(MessagingBuffer buffer)
+   {
+      buffer.putBytes(bytes);
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.EncodingSupport#getEncodeSize()
+    */
+   public int getEncodeSize()
+   {
+      return bytes.length;
+   }
+
+}

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -22,8 +22,10 @@
 
 package org.jboss.messaging.core.paging.impl;
 
+import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.paging.PageMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.util.DataConstants;
@@ -50,7 +52,7 @@
 
    // Public --------------------------------------------------------
 
-   private final ServerMessage message;
+   private EncodingSupport message;
 
    private long transactionID = -1;
 
@@ -87,7 +89,7 @@
       this.properties = properties;
    }
 
-   public ServerMessage getMessage()
+   public Object getMessage()
    {
       return message;
    }
@@ -107,20 +109,51 @@
    public void decode(final MessagingBuffer buffer)
    {
       transactionID = buffer.getLong();
-      message.decode(buffer);
+      
+      boolean isLargeMessage = buffer.getBoolean();
+      
+      if (isLargeMessage)
+      {
+         int largeMessageHeaderSize = buffer.getInt();
+         
+         byte[] bytesLargeMessage = new byte[largeMessageHeaderSize];
+         
+         buffer.getBytes(bytesLargeMessage);
+         
+         this.message = new PageLargeMessageImpl(bytesLargeMessage);
+         
+      }
+      else
+      {
+         message = new ServerMessageImpl();
+         message.decode(buffer);
+      }
+      
       properties.decode(buffer);
    }
 
    public void encode(final MessagingBuffer buffer)
    {
       buffer.putLong(transactionID);
-      message.encode(buffer);
+      buffer.putBoolean(message instanceof ServerLargeMessage);
+      if (message instanceof ServerLargeMessage)
+      {
+         buffer.putInt(message.getEncodeSize());
+         message.encode(buffer);
+      }
+      else
+      {
+         message.encode(buffer);
+      }
       properties.encode(buffer);
    }
 
    public int getEncodeSize()
    {
-      return DataConstants.SIZE_LONG  + message.getEncodeSize() + properties.getEncodeSize();
+      return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE +
+             (message instanceof ServerLargeMessage ? DataConstants.SIZE_INT : 0) +
+             message.getEncodeSize() +
+             properties.getEncodeSize();
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -22,6 +22,15 @@
 
 package org.jboss.messaging.core.paging.impl;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.PageMessage;
@@ -32,21 +41,13 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TypedProperties;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  *  <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
  * 
@@ -95,6 +96,8 @@
 
    private static final SimpleString SCHEDULED_DELIVERY_PROP = new SimpleString("JBM_SCHEDULED_DELIVERY_PROP");
 
+   private static final SimpleString MESSAGE_ID_PROP = new SimpleString("JBM_MESSAGE_ID");
+
    // This is just a debug tool method.
    // During debugs you could make log.trace as log.info, and change the
    // variable isTrace above
@@ -210,6 +213,19 @@
 
       for (PageMessage msg : data)
       {
+         ServerMessage pagedMessage = null;
+         
+         if (msg.getMessage() instanceof ServerMessage)
+         {
+            pagedMessage = (ServerMessage) msg.getMessage(); 
+         }
+         else
+         {
+            PageLargeMessageImpl pageLargeMessage = (PageLargeMessageImpl) msg.getMessage();
+            ServerLargeMessage message = this.storageManager.createLargeMessageStorage();
+            message.decode(pageLargeMessage.getBuffer());
+         }
+         
          final long transactionIdDuringPaging = msg.getTransactionID();
          if (transactionIdDuringPaging >= 0)
          {
@@ -236,7 +252,7 @@
             }
 
             // / Update information about transactions
-            if (msg.getMessage().isDurable())
+            if (pagedMessage.isDurable())
             {
                pageTransactionInfo.decrement();
                pageTransactionsToUpdate.add(pageTransactionInfo);
@@ -246,11 +262,11 @@
          //if this is a scheduled message we add it to the queue as just that
          if(scheduledDeliveryTime == null)
          {
-            refsToAdd.addAll(postOffice.route(msg.getMessage()));
+            refsToAdd.addAll(postOffice.route(pagedMessage));
          }
          else
          {
-            List<MessageReference> refs = postOffice.route(msg.getMessage());
+            List<MessageReference> refs = postOffice.route(pagedMessage);
             for (MessageReference ref : refs)
             {
                ref.setScheduledDeliveryTime(scheduledDeliveryTime);
@@ -258,13 +274,13 @@
             scheduledRefsToAdd.addAll(refs);
          }
 
-         if (msg.getMessage().getDurableRefCount() != 0)
+         if (pagedMessage.getDurableRefCount() != 0)
          {
-            storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
+            storageManager.storeMessageTransactional(depageTransactionID, pagedMessage);
             //write the scheduled message record if needed
             if(scheduledDeliveryTime != null)
             {
-               storageManager.storeMessageScheduledTransactional(depageTransactionID, msg.getMessage(), scheduledDeliveryTime);
+               storageManager.storeMessageScheduledTransactional(depageTransactionID, pagedMessage, scheduledDeliveryTime);
             }
          }
       }

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -70,7 +70,7 @@
    void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
    
    /** Create an area that will get LargeMessage bytes on the server size*/
-   ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception;
+   ServerLargeMessage createLargeMessageStorage() throws Exception;
    
    
 

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -49,16 +49,17 @@
 
    // Attributes ----------------------------------------------------
 
-   final SequentialFile file;
+   private final JournalStorageManager storageManager;
 
+   private SequentialFile file;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public JournalServerLargeMessageImpl(final long id, final SequentialFile file)
+   public JournalServerLargeMessageImpl(JournalStorageManager storageManager)
    {
-      super(id);
-      this.file = file;
+      this.storageManager = storageManager;
    }
 
    // Public --------------------------------------------------------
@@ -68,6 +69,8 @@
     */
    public synchronized void addBytes(final byte[] bytes) throws Exception
    {
+      testFile();
+
       if (!file.isOpen())
       {
          file.open();
@@ -80,8 +83,10 @@
    }
 
    @Override
-   public synchronized void encodeBody(MessagingBuffer bufferOut, int start, int size)
+   public synchronized void encodeBody(final MessagingBuffer bufferOut, final int start, final int size)
    {
+      testFile();
+
       try
       {
          // This could maybe be optimized (maybe reading directly into bufferOut)
@@ -103,7 +108,6 @@
             bufferOut.putBytes(bufferRead.array(), 0, bytesRead);
          }
 
-         // releaseResources();
       }
       catch (Exception e)
       {
@@ -114,6 +118,8 @@
    @Override
    public synchronized int getBodySize()
    {
+      testFile();
+
       try
       {
          if (!file.isOpen())
@@ -130,14 +136,32 @@
       }
    }
 
-   
+   @Override
+   public synchronized int getEncodeSize()
+   {
+      return getPropertiesEncodeSize();
+   }
+
+   @Override
+   public void encode(final MessagingBuffer buffer)
+   {
+      encodeProperties(buffer);
+   }
+
+   @Override
+   public void decode(final MessagingBuffer buffer)
+   {
+      decodeProperties(buffer);
+   }
+
+   @Override
    public int decrementRefCount()
    {
       int currentRefCount = super.decrementRefCount();
-      
+
       if (currentRefCount == 0)
       {
-         log.info("Deleting file " + this.file + " as the usage was complete");
+         log.info("Deleting file " + file + " as the usage was complete");
 
          try
          {
@@ -148,16 +172,15 @@
             log.error(e.getMessage(), e);
          }
       }
-      
+
       return currentRefCount;
    }
 
-   
    public void deleteFile() throws MessagingException
    {
-      
+
       // TODO: This should use an executor somewhere...
-      //       We can't take the risk of blocking the queue when lots of ServerLargeMessages are being deleted
+      // We can't take the risk of blocking the queue when lots of ServerLargeMessages are being deleted
       try
       {
          file.delete();
@@ -187,6 +210,19 @@
 
    // Protected -----------------------------------------------------
 
+   protected void testFile()
+   {
+      if (file == null)
+      {
+         if (this.messageID <= 0)
+         {
+            throw new RuntimeException("MessageID not set on LargeMessage");
+         }
+
+         file = storageManager.createFileForLargeMessage(this.getMessageID());
+      }
+   }
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -46,6 +46,7 @@
 import org.jboss.messaging.core.journal.Journal;
 import org.jboss.messaging.core.journal.PreparedTransactionInfo;
 import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.JournalImpl;
@@ -216,10 +217,9 @@
    }
 
    /** Create an area that will get LargeMessage bytes on the server size*/
-   public ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception
+   public ServerLargeMessage createLargeMessageStorage() throws Exception
    {
-      return new JournalServerLargeMessageImpl(messageID, largeMessagesFactory.createSequentialFile(messageID + ".msg",
-                                                                                                    -1));
+      return new JournalServerLargeMessageImpl(this);
    }
 
    // Non transactional operations
@@ -255,7 +255,9 @@
 
    public void storeMessageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
    {
-      messageJournal.appendUpdateRecord(message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME,  new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+      messageJournal.appendUpdateRecord(message.getMessageID(),
+                                        SET_SCHEDULED_DELIVERY_TIME,
+                                        new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
    }
 
    // Transactional operations
@@ -322,9 +324,15 @@
       messageJournal.appendDeleteRecordTransactional(txID, recordID, null);
    }
 
-   public void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+   public void storeMessageScheduledTransactional(final long txID,
+                                                  final ServerMessage message,
+                                                  final long scheduledDeliveryTime) throws Exception
    {
-      messageJournal.appendUpdateRecordTransactional(txID, message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME,  new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+      messageJournal.appendUpdateRecordTransactional(txID,
+                                                     message.getMessageID(),
+                                                     SET_SCHEDULED_DELIVERY_TIME,
+                                                     new ScheduledDeliveryEncoding(message.getMessageID(),
+                                                                                   scheduledDeliveryTime));
    }
 
    public void storeDeleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -381,10 +389,10 @@
          {
             case ADD_LARGE_MESSAGE:
             {
-               ServerLargeMessage largeMessage = this.createLargeMessageStorage(record.id);
+               ServerLargeMessage largeMessage = this.createLargeMessageStorage();
 
                LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
-               
+
                messageEncoding.decode(buff);
 
                List<MessageReference> refs = postOffice.route(largeMessage);
@@ -495,7 +503,8 @@
                ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
                scheduledDeliveryEncoding.decode(buff);
                List<MessageReference> refs = routedRefs.get(record.id);
-               //for any references that have already been routed, we need to remove them from t he queue and re add them as scheduled
+               // for any references that have already been routed, we need to remove them from t he queue and re add
+               // them as scheduled
                for (MessageReference ref : refs)
                {
                   ref.getQueue().removeReferenceWithID(ref.getMessage().getMessageID());
@@ -687,8 +696,7 @@
       return started;
    }
 
-   // Public
-   // -----------------------------------------------------------------------------------
+   // Public -----------------------------------------------------------------------------------
 
    public Journal getMessageJournal()
    {
@@ -700,9 +708,19 @@
       return bindingsJournal;
    }
 
-   // Private
-   // ----------------------------------------------------------------------------------
+   // Package protected ---------------------------------------------
 
+   /**
+    * @param messageID
+    * @return
+    */
+   SequentialFile createFileForLargeMessage(long messageID)
+   {
+      return largeMessagesFactory.createSequentialFile(messageID + ".msg", -1);
+   }
+
+   // Private ----------------------------------------------------------------------------------
+
    private void loadPreparedTransactions(final PostOffice postOffice,
                                          final Map<Long, Queue> queues,
                                          final ResourceManager resourceManager,
@@ -795,7 +813,8 @@
                   ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
                   scheduledDeliveryEncoding.decode(buff);
                   List<MessageReference> refs = routedRefs.get(record.id);
-                  //for any references that have already been routed, we need to remove them from the queue and re add them as scheduled
+                  // for any references that have already been routed, we need to remove them from the queue and re add
+                  // them as scheduled
                   for (MessageReference ref : refs)
                   {
                      ref.setScheduledDeliveryTime(scheduledDeliveryEncoding.getScheduledDeliveryTime());
@@ -996,7 +1015,7 @@
        */
       public void decode(final MessagingBuffer buffer)
       {
-         message.decodeProperties(buffer);
+         message.decode(buffer);
       }
 
       /* (non-Javadoc)
@@ -1004,7 +1023,7 @@
        */
       public void encode(final MessagingBuffer buffer)
       {
-         message.encodeProperties(buffer);
+         message.encode(buffer);
       }
 
       /* (non-Javadoc)
@@ -1012,7 +1031,7 @@
        */
       public int getEncodeSize()
       {
-         return message.getPropertiesEncodeSize();
+         return message.getEncodeSize();
       }
 
    }
@@ -1109,9 +1128,11 @@
          super(queueID);
       }
    }
+
    private static class ScheduledDeliveryEncoding implements EncodingSupport
    {
       long messageId;
+
       long scheduledDeliveryTime;
 
       private ScheduledDeliveryEncoding(long messageId, long scheduledDeliveryTime)
@@ -1145,4 +1166,5 @@
          return scheduledDeliveryTime;
       }
    }
+
 }

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -150,9 +150,9 @@
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.persistence.StorageManager#createLargeMessageStorage(long, int, int)
     */
-   public ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception
+   public ServerLargeMessage createLargeMessageStorage() throws Exception
    {
-      return new NullStorageServerLargeMessageImpl(messageID);
+      return new NullStorageServerLargeMessageImpl();
    }
 
 

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -51,9 +51,9 @@
 
    // Public --------------------------------------------------------
 
-   public NullStorageServerLargeMessageImpl(final long messageID)
+   public NullStorageServerLargeMessageImpl()
    {
-      super(messageID);
+      super();
    }
 
    /* (non-Javadoc)

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -1174,13 +1174,13 @@
     */
    public ServerLargeMessage createLargeMessageStorage(long producerID, long messageID, byte[] header) throws Exception
    {
-      ServerLargeMessage largeMessage = storageManager.createLargeMessageStorage(messageID);
+      ServerLargeMessage largeMessage = storageManager.createLargeMessageStorage();
       
       MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
       
       largeMessage.decodeProperties(headerBuffer);
 
-      // decodeProperties will clean this, as the client didn send the ID originally
+      // client didn send the ID originally
       largeMessage.setMessageID(messageID);
       
       ServerProducer producer = producers.get(producerID);

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -356,7 +356,6 @@
 
          ServerMessage message = ref.getMessage();
 
-         // Putting back the size on pagingManager, and reverting the counters
          if (message.isDurable() && queue.isDurable())
          {
             message.incrementDurableRefCount();

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -200,6 +200,11 @@
       }
 
    }
+   
+   public void testPageOnLargeMessage() throws Exception
+   {
+      // TODO: Write a test with LargeMessages and paging
+   }
 
    /**
     * @param session

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -92,7 +92,7 @@
 
       assertEquals(1, msgs.length);
 
-      assertEqualsByteArrays(msg.getBody().array(), msgs[0].getMessage().getBody().array());
+      assertEqualsByteArrays(msg.getBody().array(), ((ServerMessage)msgs[0].getMessage()).getBody().array());
 
       assertTrue(store.isPaging());
 

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -80,7 +80,7 @@
 
    // Public --------------------------------------------------------
 
-   public SequentialFile createSequentialFile(final String fileName, final int maxAIO) throws Exception
+   public SequentialFile createSequentialFile(final String fileName, final int maxAIO)
    {
       FakeSequentialFile sf = fileMap.get(fileName);
 

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -119,11 +119,11 @@
 
       for (int i = 0; i < msgs.length; i++)
       {
-         assertEquals(i, msgs[i].getMessage().getMessageID());
+         assertEquals(i, ((ServerMessage)msgs[i].getMessage()).getMessageID());
 
-         assertEquals(simpleDestination, msgs[i].getMessage().getDestination());
+         assertEquals(simpleDestination, ((ServerMessage)msgs[i].getMessage()).getDestination());
 
-         assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getMessage().getBody().array());
+         assertEqualsByteArrays(buffers.get(i).array(), ((ServerMessage)msgs[i].getMessage()).getBody().array());
       }
 
       impl.delete();

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -29,6 +29,7 @@
 import org.jboss.messaging.core.paging.impl.PageMessageImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
 import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 import org.jboss.messaging.util.SimpleString;
@@ -201,8 +202,8 @@
 
       for (int i = 0; i < 10; i++)
       {
-         assertEquals(0, msg[i].getMessage().getMessageID());
-         assertEqualsByteArrays(buffers.get(i).array(), msg[i].getMessage().getBody().array());
+         assertEquals(0, ((ServerMessage)msg[i].getMessage()).getMessageID());
+         assertEqualsByteArrays(buffers.get(i).array(),((ServerMessage)msg[i].getMessage()).getBody().array());
       }
 
    }
@@ -264,8 +265,8 @@
 
          for (int i = 0; i < 5; i++)
          {
-            assertEquals(0, msg[i].getMessage().getMessageID());
-            assertEqualsByteArrays(buffers.get(pageNr * 5 + i).array(), msg[i].getMessage().getBody().array());
+            assertEquals(0, ((ServerMessage)msg[i].getMessage()).getMessageID());
+            assertEqualsByteArrays(buffers.get(pageNr * 5 + i).array(), ((ServerMessage)msg[i].getMessage()).getBody().array());
          }
       }
 
@@ -307,9 +308,9 @@
 
       assertEquals(1, msgs.length);
 
-      assertEquals(0l, msgs[0].getMessage().getMessageID());
+      assertEquals(0l, ((ServerMessage)msgs[0].getMessage()).getMessageID());
 
-      assertEqualsByteArrays(buffers.get(0).array(), msgs[0].getMessage().getBody().array());
+      assertEqualsByteArrays(buffers.get(0).array(), ((ServerMessage)msgs[0].getMessage()).getBody().array());
 
       assertEquals(1, storeImpl.getNumberOfPages());
 

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-10-17 21:34:38 UTC (rev 5140)
@@ -229,15 +229,17 @@
 
          for (PageMessage msg : msgs)
          {
-            msg.getMessage().getBody().rewind();
-            long id = msg.getMessage().getBody().getLong();
-            msg.getMessage().getBody().rewind();
+            ((ServerMessage)msg.getMessage()).getBody().rewind();
+            long id = ((ServerMessage)msg.getMessage()).getBody().getLong();
+            ((ServerMessage)msg.getMessage()).getBody().rewind();
 
             PageMessageImpl msgWritten = buffers.remove(id);
             buffers2.put(id, msg);
             assertNotNull(msgWritten);
-            assertEquals(msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
-            assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
+            assertEquals(((ServerMessage)msg.getMessage()).getDestination(),
+                         ((ServerMessage)msgWritten.getMessage()).getDestination());
+            assertEqualsByteArrays(((ServerMessage)msgWritten.getMessage()).getBody().array(),
+                                   ((ServerMessage)msg.getMessage()).getBody().array());
          }
       }
 
@@ -293,12 +295,14 @@
          for (PageMessage msg : msgs)
          {
 
-            msg.getMessage().getBody().rewind();
-            long id = msg.getMessage().getBody().getLong();
+            ((ServerMessage)msg.getMessage()).getBody().rewind();
+            long id = ((ServerMessage)msg.getMessage()).getBody().getLong();
             PageMessage msgWritten = buffers2.remove(id);
             assertNotNull(msgWritten);
-            assertEquals(msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
-            assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
+            assertEquals(((ServerMessage)msg.getMessage()).getDestination(),
+                         ((ServerMessage)msgWritten.getMessage()).getDestination());
+            assertEqualsByteArrays(((ServerMessage)msgWritten.getMessage()).getBody().array(),
+                                   ((ServerMessage)msg.getMessage()).getBody().array());
          }
       }
 
@@ -307,9 +311,10 @@
       lastPage.close();
       assertEquals(1, lastMessages.length);
 
-      lastMessages[0].getMessage().getBody().rewind();
-      assertEquals(lastMessages[0].getMessage().getBody().getLong(), lastMessageId);
-      assertEqualsByteArrays(lastMessages[0].getMessage().getBody().array(), lastMsg.getMessage().getBody().array());
+      ((ServerMessage)lastMessages[0].getMessage()).getBody().rewind();
+      assertEquals(((ServerMessage)lastMessages[0].getMessage()).getBody().getLong(), lastMessageId);
+      assertEqualsByteArrays(((ServerMessage)lastMessages[0].getMessage()).getBody().array(),
+                             ((ServerMessage)lastMsg.getMessage()).getBody().array());
 
       assertEquals(0, buffers2.size());
 




More information about the jboss-cvs-commits mailing list