[jboss-cvs] JBoss Messaging SVN: r5119 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/journal/impl and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 15 20:06:24 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-15 20:06:24 -0400 (Wed, 15 Oct 2008)
New Revision: 5119
Modified:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java
Log:
Implementing some reference-counting on deleting files
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -13,7 +13,6 @@
package org.jboss.messaging.core.client.impl;
import java.io.File;
-import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
@@ -23,7 +22,6 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -68,7 +68,7 @@
*
* <p>A JournalImpl</p
*
- * <p>WIKI Page: <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal"> http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal</a></p>
+ * <p>WIKI Page: <a href="http://wiki.jboss.org/wiki/JBossMessaging2Journal"> http://wiki.jboss.org/wiki/JBossMessaging2Journal</a></p>
*
*
* <p>Look at {@link JournalImpl#load(LoadManager)} for the file layout
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -32,7 +32,7 @@
/**
*
- * <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
+ * <p>Look at the <a href="http://wiki.jboss.org/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
*
<PRE>
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -24,7 +24,9 @@
import java.nio.ByteBuffer;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -43,6 +45,8 @@
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(JournalServerLargeMessageImpl.class);
+
// Attributes ----------------------------------------------------
final SequentialFile file;
@@ -68,7 +72,7 @@
{
file.open();
}
-
+
file.position(file.size());
file.write(ByteBuffer.wrap(bytes), false);
@@ -91,8 +95,7 @@
file.position(start);
bytesRead = file.read(bufferRead);
-
-
+
bufferRead.flip();
if (bytesRead > 0)
@@ -100,7 +103,7 @@
bufferOut.putBytes(bufferRead.array(), 0, bytesRead);
}
- //releaseResources();
+ // releaseResources();
}
catch (Exception e)
{
@@ -127,6 +130,44 @@
}
}
+
+ public int decrementRefCount()
+ {
+ int currentRefCount = super.decrementRefCount();
+
+ if (currentRefCount == 0)
+ {
+ log.info("Deleting file " + this.file + " as the usage was complete");
+
+ try
+ {
+ deleteFile();
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ }
+ }
+
+ return currentRefCount;
+ }
+
+
+ public void deleteFile() throws MessagingException
+ {
+
+ // TODO: This should use an executor somewhere...
+ // We can't take the risk of blocking the queue when lots of ServerLargeMessages are being deleted
+ try
+ {
+ file.delete();
+ }
+ catch (Exception e)
+ {
+ throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
+ }
+ }
+
@Override
public synchronized int getMemoryEstimate()
{
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -86,6 +86,25 @@
buffer.putBytes(bytes);
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.ServerLargeMessage#deleteFile()
+ */
+ public void deleteFile() throws Exception
+ {
+ // nothing to be done here.. we don really have a file on this Storage
+ }
+ public int decrementRefCount()
+ {
+ int currentRefCount = super.decrementRefCount();
+
+ if (currentRefCount == 0)
+ {
+ System.out.println("I would delete the file if I had one now");
+ }
+
+ return currentRefCount;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerLargeMessage.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -37,5 +37,7 @@
/** Close the files if opened */
void releaseResources() throws Exception;
+
+ void deleteFile() throws Exception;
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerMessage.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/ServerMessage.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -41,12 +41,12 @@
int decrementDurableRefCount();
- int incrementReference(boolean durable);
-
int getDurableRefCount();
int decrementRefCount();
+ int incrementDurableRefCount();
+
int getRefCount();
ServerMessage copy();
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -16,6 +16,7 @@
import org.jboss.messaging.core.list.PriorityLinkedList;
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
@@ -104,6 +105,8 @@
private AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
private final Runnable deliverRunner = new DeliverRunner();
+
+ private final PagingManager pagingManager;
private volatile boolean backup;
@@ -135,6 +138,8 @@
this.scheduledExecutor = scheduledExecutor;
this.postOffice = postOffice;
+
+ this.pagingManager = postOffice.getPagingManager();
direct = true;
}
@@ -421,6 +426,12 @@
deliveringCount.decrementAndGet();
sizeBytes.addAndGet(-ref.getMessage().getEncodeSize());
+
+
+ if (ref.getMessage().decrementRefCount() == 0)
+ {
+ pagingManager.messageDone(ref.getMessage());
+ }
// if (flowController != null)
// {
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -108,16 +108,11 @@
return durableRefCount.decrementAndGet();
}
- public int incrementReference(final boolean durable)
+ public int incrementDurableRefCount()
{
- if (durable)
- {
- durableRefCount.incrementAndGet();
- }
-
- return refCount.incrementAndGet();
+ return durableRefCount.incrementAndGet();
}
-
+
public int decrementRefCount()
{
return refCount.decrementAndGet();
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -22,16 +22,13 @@
package org.jboss.messaging.core.server.impl;
-import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
-import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerProducer;
@@ -49,6 +46,12 @@
*/
public class ServerProducerImpl implements ServerProducer
{
+
+ // Static -----------------------------------------------------------------------
+ private static final Logger log = Logger.getLogger(ServerProducerImpl.class);
+
+ // Attributes--------------------------------------------------------------------
+
private final long id;
private final ServerSession session;
@@ -65,7 +68,7 @@
private final Channel channel;
- private ServerLargeMessage currentlargeMessage;
+ private ServerLargeMessage currentLargeMessage;
// Constructors ----------------------------------------------------------------
@@ -97,6 +100,19 @@
public void close() throws Exception
{
+ if (currentLargeMessage != null)
+ {
+ try
+ {
+ currentLargeMessage.deleteFile();
+ }
+ catch (Throwable error)
+ {
+ log.warn(error.toString(), error);
+
+ }
+ }
+
session.removeProducer(this);
}
@@ -116,12 +132,12 @@
public ServerLargeMessage getCurrentChunk()
{
- return currentlargeMessage;
+ return currentLargeMessage;
}
public void setCurrentChunk(final ServerLargeMessage message)
{
- currentlargeMessage = message;
+ currentLargeMessage = message;
}
public void requestAndSendCredits() throws Exception
@@ -151,8 +167,6 @@
return waiting;
}
-
-
private void doFlowControl(final ServerMessage message) throws Exception
{
if (this.address != null)
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -1353,11 +1353,6 @@
Queue queue = ref.getQueue();
- if (message.decrementRefCount() == 0)
- {
- pager.messageDone(message);
- }
-
if (message.isDurable() && queue.isDurable())
{
int count = message.decrementDurableRefCount();
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -410,6 +410,7 @@
if (!message.isContinues())
{
+ session.clearCurrentLargeMessage(message.getTargetID());
session.sendProducerMessage(message.getTargetID(), largeMessage);
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -195,14 +195,6 @@
ServerMessage message = acknowledgement.getMessage();
- if (message.decrementRefCount() == 0)
- {
- if (pagingManager != null)
- {
- pagingManager.messageDone(message);
- }
- }
-
if (message.isDurable())
{
Queue queue = acknowledgement.getQueue();
@@ -365,9 +357,9 @@
ServerMessage message = ref.getMessage();
// Putting back the size on pagingManager, and reverting the counters
- if (message.incrementReference(message.isDurable() && queue.isDurable()) == 1)
+ if (message.isDurable() && queue.isDurable())
{
- pagingManager.addSize(message);
+ message.incrementDurableRefCount();
}
LinkedList<MessageReference> list = queueMap.get(queue);
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -111,6 +111,16 @@
{
testInternal(true, false, 4, false);
}
+
+ public void testTwoBindingsOneAckAndrestart() throws Exception
+ {
+ // TODO: Write a test where there are two bindings.. one is ACKed, the other is not, the server is restarted
+ // The other binding is acked... and we must delete the file.
+
+ /// Play with the scenario over XA also.
+
+ // Validate Message counters
+ }
public void testInternal(final boolean realFiles,
final boolean useFile,
@@ -178,7 +188,7 @@
}
- //validateCopy(message);
+ validateCopy(message);
producer.send(message);
@@ -207,6 +217,8 @@
session.start();
ClientMessage message2 = consumer.receive(0);
+
+ message2.processed();
assertNotNull(message2);
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java 2008-10-15 22:05:30 UTC (rev 5118)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java 2008-10-16 00:06:24 UTC (rev 5119)
@@ -134,10 +134,10 @@
assertEquals(2, msg.getDurableRefCount());
- msg.incrementReference(true);
+ msg.incrementDurableRefCount();
assertEquals(3, msg.getDurableRefCount());
- msg.incrementReference(true);
+ msg.incrementDurableRefCount();
assertEquals(4, msg.getDurableRefCount());
msg.decrementDurableRefCount();
More information about the jboss-cvs-commits
mailing list