[jboss-cvs] JBoss Messaging SVN: r5119 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/journal/impl and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 15 20:06:24 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-10-15 20:06:24 -0400 (Wed, 15 Oct 2008)
New Revision: 5119

Modified:
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerMessage.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java
Log:
Implementing some reference-counting on deleting files

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -13,7 +13,6 @@
 package org.jboss.messaging.core.client.impl;
 
 import java.io.File;
-import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.Executor;
@@ -23,7 +22,6 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -68,7 +68,7 @@
  * 
  * <p>A JournalImpl</p
  * 
- * <p>WIKI Page: <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal"> http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal</a></p>
+ * <p>WIKI Page: <a href="http://wiki.jboss.org/wiki/JBossMessaging2Journal"> http://wiki.jboss.org/wiki/JBossMessaging2Journal</a></p>
  * 
  * 
  * <p>Look at {@link JournalImpl#load(LoadManager)} for the file layout

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -32,7 +32,7 @@
 
 /**
  * 
- * <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
+ * <p>Look at the <a href="http://wiki.jboss.org/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
  * 
 <PRE>
 

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -24,7 +24,9 @@
 
 import java.nio.ByteBuffer;
 
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -43,6 +45,8 @@
 
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(JournalServerLargeMessageImpl.class);
+
    // Attributes ----------------------------------------------------
 
    final SequentialFile file;
@@ -68,7 +72,7 @@
       {
          file.open();
       }
-      
+
       file.position(file.size());
 
       file.write(ByteBuffer.wrap(bytes), false);
@@ -91,8 +95,7 @@
          file.position(start);
 
          bytesRead = file.read(bufferRead);
-         
-         
+
          bufferRead.flip();
 
          if (bytesRead > 0)
@@ -100,7 +103,7 @@
             bufferOut.putBytes(bufferRead.array(), 0, bytesRead);
          }
 
-         //releaseResources();
+         // releaseResources();
       }
       catch (Exception e)
       {
@@ -127,6 +130,44 @@
       }
    }
 
+   
+   public int decrementRefCount()
+   {
+      int currentRefCount = super.decrementRefCount();
+      
+      if (currentRefCount == 0)
+      {
+         log.info("Deleting file " + this.file + " as the usage was complete");
+
+         try
+         {
+            deleteFile();
+         }
+         catch (Exception e)
+         {
+            log.error(e.getMessage(), e);
+         }
+      }
+      
+      return currentRefCount;
+   }
+
+   
+   public void deleteFile() throws MessagingException
+   {
+      
+      // TODO: This should use an executor somewhere...
+      //       We can't take the risk of blocking the queue when lots of ServerLargeMessages are being deleted
+      try
+      {
+         file.delete();
+      }
+      catch (Exception e)
+      {
+         throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
+      }
+   }
+
    @Override
    public synchronized int getMemoryEstimate()
    {

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -86,6 +86,25 @@
       buffer.putBytes(bytes);
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.ServerLargeMessage#deleteFile()
+    */
+   public void deleteFile() throws Exception
+   {
+      // nothing to be done here.. we don really have a file on this Storage
+   }
+   public int decrementRefCount()
+   {
+      int currentRefCount = super.decrementRefCount();
+      
+      if (currentRefCount == 0)
+      {
+         System.out.println("I would delete the file if I had one now");
+      }
+      
+      return currentRefCount;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -37,5 +37,7 @@
 
    /** Close the files if opened */
    void releaseResources() throws Exception;
+   
+   void deleteFile() throws Exception;
 
 }

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerMessage.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerMessage.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -41,12 +41,12 @@
 
    int decrementDurableRefCount();
    
-   int incrementReference(boolean durable);
-   
    int getDurableRefCount();
    
    int decrementRefCount();
    
+   int incrementDurableRefCount();
+   
    int getRefCount();
    
    ServerMessage copy();

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -16,6 +16,7 @@
 import org.jboss.messaging.core.list.PriorityLinkedList;
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
@@ -104,6 +105,8 @@
    private AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
 
    private final Runnable deliverRunner = new DeliverRunner();
+   
+   private final PagingManager pagingManager;
 
    private volatile boolean backup;
 
@@ -135,6 +138,8 @@
       this.scheduledExecutor = scheduledExecutor;
 
       this.postOffice = postOffice;
+      
+      this.pagingManager = postOffice.getPagingManager();
 
       direct = true;
    }
@@ -421,6 +426,12 @@
       deliveringCount.decrementAndGet();
 
       sizeBytes.addAndGet(-ref.getMessage().getEncodeSize());
+      
+      
+      if (ref.getMessage().decrementRefCount() == 0)
+      {
+         pagingManager.messageDone(ref.getMessage());
+      }
 
       // if (flowController != null)
       // {

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -108,16 +108,11 @@
       return durableRefCount.decrementAndGet();
    }
 
-   public int incrementReference(final boolean durable)
+   public int incrementDurableRefCount()
    {
-      if (durable)
-      {
-         durableRefCount.incrementAndGet();
-      }
-
-      return refCount.incrementAndGet();
+      return durableRefCount.incrementAndGet();
    }
-
+   
    public int decrementRefCount()
    {
       return refCount.decrementAndGet();

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -22,16 +22,13 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.ServerLargeMessage;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerProducer;
@@ -49,6 +46,12 @@
  */
 public class ServerProducerImpl implements ServerProducer
 {
+
+   // Static -----------------------------------------------------------------------
+   private static final Logger log = Logger.getLogger(ServerProducerImpl.class);
+
+   // Attributes--------------------------------------------------------------------
+   
    private final long id;
 
    private final ServerSession session;
@@ -65,7 +68,7 @@
 
    private final Channel channel;
 
-   private ServerLargeMessage currentlargeMessage;
+   private ServerLargeMessage currentLargeMessage;
 
 	// Constructors ----------------------------------------------------------------
 	
@@ -97,6 +100,19 @@
 	
 	public void close() throws Exception
 	{
+	   if (currentLargeMessage != null)
+	   {
+	      try
+	      {
+	         currentLargeMessage.deleteFile();
+	      }
+	      catch (Throwable error)
+	      {
+	         log.warn(error.toString(), error);
+	         
+	      }
+	   }
+	   
 		session.removeProducer(this);
 	}
 	
@@ -116,12 +132,12 @@
 
    public ServerLargeMessage getCurrentChunk()
    {
-      return currentlargeMessage;
+      return currentLargeMessage;
    }
 
    public void setCurrentChunk(final ServerLargeMessage message)
    {
-      currentlargeMessage = message;
+      currentLargeMessage = message;
    }
 
    public void requestAndSendCredits() throws Exception
@@ -151,8 +167,6 @@
       return waiting;
    }
 
-
-
    private void doFlowControl(final ServerMessage message) throws Exception
    {
       if (this.address != null)

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -1353,11 +1353,6 @@
 
       Queue queue = ref.getQueue();
 
-      if (message.decrementRefCount() == 0)
-      {
-         pager.messageDone(message);
-      }
-
       if (message.isDurable() && queue.isDurable())
       {
          int count = message.decrementDurableRefCount();

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -410,6 +410,7 @@
                
                if (!message.isContinues())
                {
+                  session.clearCurrentLargeMessage(message.getTargetID());
                   session.sendProducerMessage(message.getTargetID(), largeMessage);
                }
                

Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -195,14 +195,6 @@
 
       ServerMessage message = acknowledgement.getMessage();
 
-      if (message.decrementRefCount() == 0)
-      {
-         if (pagingManager != null)
-         {
-            pagingManager.messageDone(message);
-         }
-      }
-
       if (message.isDurable())
       {
          Queue queue = acknowledgement.getQueue();
@@ -365,9 +357,9 @@
          ServerMessage message = ref.getMessage();
 
          // Putting back the size on pagingManager, and reverting the counters
-         if (message.incrementReference(message.isDurable() && queue.isDurable()) == 1)
+         if (message.isDurable() && queue.isDurable())
          {
-            pagingManager.addSize(message);
+            message.incrementDurableRefCount();
          }
 
          LinkedList<MessageReference> list = queueMap.get(queue);

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -111,6 +111,16 @@
    {
       testInternal(true, false, 4, false);
    }
+   
+   public void testTwoBindingsOneAckAndrestart() throws Exception
+   {
+      // TODO: Write a test where there are two bindings.. one is ACKed, the other is not, the server is restarted
+      //       The other binding is acked... and we must delete the file.
+      
+      ///      Play with the scenario over XA also.
+      
+      // Validate Message counters
+   }
 
    public void testInternal(final boolean realFiles,
                             final boolean useFile,
@@ -178,7 +188,7 @@
 
          }
 
-         //validateCopy(message);
+         validateCopy(message);
 
          producer.send(message);
 
@@ -207,6 +217,8 @@
          session.start();
 
          ClientMessage message2 = consumer.receive(0);
+         
+         message2.processed();
 
          assertNotNull(message2);
 

Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java	2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java	2008-10-16 00:06:24 UTC (rev 5119)
@@ -134,10 +134,10 @@
       
       assertEquals(2, msg.getDurableRefCount());
       
-      msg.incrementReference(true);
+      msg.incrementDurableRefCount();
       assertEquals(3, msg.getDurableRefCount());
       
-      msg.incrementReference(true);
+      msg.incrementDurableRefCount();
       assertEquals(4, msg.getDurableRefCount());
       
       msg.decrementDurableRefCount();




More information about the jboss-cvs-commits mailing list