Author: clebert.suconic(a)jboss.com
Date: 2009-11-05 14:01:30 -0500 (Thu, 05 Nov 2009)
New Revision: 8229
Added:
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
Removed:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
tweaks
Copied:
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java (from
rev 8213,
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java)
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-05
19:01:30 UTC (rev 8229)
@@ -0,0 +1,400 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import static org.hornetq.utils.DataConstants.SIZE_INT;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+
+/**
+ * A JournalLargeServerMessage
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ * Created 30-Sep-08 12:02:45 PM
+ *
+ *
+ */
+public class FileLargeServerMessage extends ServerMessageImpl implements
LargeServerMessage
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(FileLargeServerMessage.class);
+
+ private static boolean isTrace = log.isTraceEnabled();
+
+ // Attributes ----------------------------------------------------
+
+ private final JournalStorageManager storageManager;
+
+ private LargeServerMessage linkMessage;
+
+ // We should only use the NIO implementation on the Journal
+ private SequentialFile file;
+
+ private long bodySize = -1;
+
+ private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public FileLargeServerMessage(final JournalStorageManager storageManager)
+ {
+ this.storageManager = storageManager;
+ }
+
+ /**
+ * Copy constructor
+ * @param copy
+ * @param fileCopy
+ */
+ private FileLargeServerMessage(final FileLargeServerMessage copy,
+ final SequentialFile fileCopy,
+ final long newID)
+ {
+ super(copy);
+ this.linkMessage = copy;
+ storageManager = copy.storageManager;
+ file = fileCopy;
+ bodySize = copy.bodySize;
+ setMessageID(newID);
+ }
+
+ // Public --------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.LargeServerMessage#addBytes(byte[])
+ */
+ public synchronized void addBytes(final byte[] bytes) throws Exception
+ {
+ validateFile();
+
+ if (!file.isOpen())
+ {
+ file.open();
+ }
+
+ storageManager.addBytesToLargeMessage(file, this.getMessageID(), bytes);
+
+ bodySize += bytes.length;
+ }
+
+ public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext
context, int size)
+ {
+ try
+ {
+ // This could maybe be optimized (maybe reading directly into bufferOut)
+ ByteBuffer bufferRead = ByteBuffer.allocate(size);
+
+ int bytesRead = context.write(bufferRead);
+
+ bufferRead.flip();
+
+ if (bytesRead > 0)
+ {
+ bufferOut.writeBytes(bufferRead.array(), 0, bytesRead);
+ }
+
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public synchronized int getBodySize()
+ {
+ try
+ {
+ validateFile();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ return (int)Math.min(bodySize, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public synchronized long getLargeBodySize()
+ {
+ try
+ {
+ validateFile();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ return bodySize;
+ }
+
+ @Override
+ public synchronized int getEncodeSize()
+ {
+ return getHeadersAndPropertiesEncodeSize();
+ }
+
+ @Override
+ public void encode(final HornetQBuffer buffer)
+ {
+ encodeHeadersAndProperties(buffer);
+ }
+
+ @Override
+ public void decode(final HornetQBuffer buffer)
+ {
+ file = null;
+ decodeHeadersAndProperties(buffer);
+ }
+
+ public synchronized void incrementDelayDeletionCount()
+ {
+ this.delayDeletionCount.incrementAndGet();
+ }
+
+ public synchronized void decrementDelayDeletionCount() throws Exception
+ {
+ int count = this.delayDeletionCount.decrementAndGet();
+
+ if (count == 0)
+ {
+ checkDelete();
+ }
+ }
+
+ public LargeMessageEncodingContext createNewContext()
+ {
+ return new DecodingContext();
+ }
+
+ private void checkDelete() throws Exception
+ {
+ if (getRefCount() <= 0)
+ {
+ if (linkMessage != null)
+ {
+ // This file is linked to another message, deleting the reference where it
belongs on this case
+ linkMessage.decrementDelayDeletionCount();
+ }
+ else
+ {
+ if (isTrace)
+ {
+ log.trace("Deleting file " + file + " as the usage was
complete");
+ }
+
+ try
+ {
+ deleteFile();
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public synchronized int decrementRefCount(PagingStore pagingStore, MessageReference
reference) throws Exception
+ {
+ int currentRefCount = super.decrementRefCount(pagingStore, reference);
+
+ // We use <= as this could be used by load.
+ // because of a failure, no references were loaded, so we have 0... and we still
need to delete the associated
+ // files
+ if (delayDeletionCount.get() <= 0)
+ {
+ checkDelete();
+ }
+
+ return currentRefCount;
+ }
+
+ @Override
+ public boolean isLargeMessage()
+ {
+ return true;
+ }
+
+ public synchronized void deleteFile() throws Exception
+ {
+ validateFile();
+ releaseResources();
+ storageManager.deleteFile(file);
+ }
+
+ public boolean isFileExists() throws Exception
+ {
+ SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(),
durable);
+ return localfile.exists();
+ }
+
+ // We cache this
+ private volatile int memoryEstimate = -1;
+
+ @Override
+ public synchronized int getMemoryEstimate()
+ {
+ if (memoryEstimate == -1)
+ {
+ // The body won't be on memory (aways on-file), so we don't consider
this for paging
+ memoryEstimate = getHeadersAndPropertiesEncodeSize() + SIZE_INT +
getEncodeSize() + (16 + 4) * 2 + 1;
+ }
+
+ return memoryEstimate;
+ }
+
+ public synchronized void releaseResources()
+ {
+ if (file != null && file.isOpen())
+ {
+ try
+ {
+ file.close();
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized ServerMessage copy(final long newID) throws Exception
+ {
+ incrementDelayDeletionCount();
+
+ long idToUse = messageID;
+
+ if (linkMessage != null)
+ {
+ idToUse = linkMessage.getMessageID();
+ }
+
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
+
+ ServerMessage newMessage = new FileLargeServerMessage(linkMessage == null ? this
+ :
(FileLargeServerMessage)linkMessage,
+ newfile,
+ newID);
+
+ return newMessage;
+ }
+
+ public SequentialFile getFile()
+ {
+ return file;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void finalize() throws Throwable
+ {
+ releaseResources();
+ super.finalize();
+ }
+
+ // Private -------------------------------------------------------
+
+ private synchronized void validateFile() throws Exception
+ {
+ if (file == null)
+ {
+ if (messageID <= 0)
+ {
+ throw new RuntimeException("MessageID not set on LargeMessage");
+ }
+
+ file = storageManager.createFileForLargeMessage(getMessageID(), durable);
+
+ file.open();
+
+ bodySize = file.size();
+
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
+ */
+ public void setLinkedMessage(LargeServerMessage message)
+ {
+ if (file != null)
+ {
+ // Sanity check.. it shouldn't happen
+ throw new IllegalStateException("LargeMessage file was already set");
+ }
+
+ this.linkMessage = message;
+
+ file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
+ try
+ {
+ file.open();
+ this.bodySize = file.size();
+ file.close();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("could not setup linked file", e);
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+ class DecodingContext implements LargeMessageEncodingContext
+ {
+ private SequentialFile cFile;
+
+ public void open() throws Exception
+ {
+ cFile = file.copy();
+ cFile.open();
+ }
+
+ public void close() throws Exception
+ {
+ cFile.close();
+ }
+
+ public int write(ByteBuffer bufferRead) throws Exception
+ {
+ return cFile.read(bufferRead);
+ }
+
+ public int write(HornetQBuffer bufferOut, int size)
+ {
+ return -1;
+ }
+ }
+}
Deleted:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-11-05
17:57:42 UTC (rev 8228)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-11-05
19:01:30 UTC (rev 8229)
@@ -1,403 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.persistence.impl.journal;
-
-import static org.hornetq.utils.DataConstants.SIZE_INT;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.MessageReference;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.message.LargeMessageEncodingContext;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-
-/**
- * A JournalLargeServerMessage
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
- * Created 30-Sep-08 12:02:45 PM
- *
- *
- */
-
-
-//FIXME - this class should be renamed to just large message
-public class JournalLargeServerMessage extends ServerMessageImpl implements
LargeServerMessage
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(JournalLargeServerMessage.class);
-
- private static boolean isTrace = log.isTraceEnabled();
-
- // Attributes ----------------------------------------------------
-
- private final JournalStorageManager storageManager;
-
- private LargeServerMessage linkMessage;
-
- // We should only use the NIO implementation on the Journal
- private SequentialFile file;
-
- private long bodySize = -1;
-
- private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public JournalLargeServerMessage(final JournalStorageManager storageManager)
- {
- this.storageManager = storageManager;
- }
-
- /**
- * Copy constructor
- * @param copy
- * @param fileCopy
- */
- private JournalLargeServerMessage(final JournalLargeServerMessage copy,
- final SequentialFile fileCopy,
- final long newID)
- {
- super(copy);
- this.linkMessage = copy;
- storageManager = copy.storageManager;
- file = fileCopy;
- bodySize = copy.bodySize;
- setMessageID(newID);
- }
-
- // Public --------------------------------------------------------
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#addBytes(byte[])
- */
- public synchronized void addBytes(final byte[] bytes) throws Exception
- {
- validateFile();
-
- if (!file.isOpen())
- {
- file.open();
- }
-
- storageManager.addBytesToLargeMessage(file, this.getMessageID(), bytes);
-
- bodySize += bytes.length;
- }
-
- public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext
context, int size)
- {
- try
- {
- // This could maybe be optimized (maybe reading directly into bufferOut)
- ByteBuffer bufferRead = ByteBuffer.allocate(size);
-
- int bytesRead = context.write(bufferRead);
-
- bufferRead.flip();
-
- if (bytesRead > 0)
- {
- bufferOut.writeBytes(bufferRead.array(), 0, bytesRead);
- }
-
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- @Override
- public synchronized int getBodySize()
- {
- try
- {
- validateFile();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
- return (int)Math.min(bodySize, Integer.MAX_VALUE);
- }
-
- @Override
- public synchronized long getLargeBodySize()
- {
- try
- {
- validateFile();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
- return bodySize;
- }
-
- @Override
- public synchronized int getEncodeSize()
- {
- return getHeadersAndPropertiesEncodeSize();
- }
-
- @Override
- public void encode(final HornetQBuffer buffer)
- {
- encodeHeadersAndProperties(buffer);
- }
-
- @Override
- public void decode(final HornetQBuffer buffer)
- {
- file = null;
- decodeHeadersAndProperties(buffer);
- }
-
- public synchronized void incrementDelayDeletionCount()
- {
- this.delayDeletionCount.incrementAndGet();
- }
-
- public synchronized void decrementDelayDeletionCount() throws Exception
- {
- int count = this.delayDeletionCount.decrementAndGet();
-
- if (count == 0)
- {
- checkDelete();
- }
- }
-
- public LargeMessageEncodingContext createNewContext()
- {
- return new DecodingContext();
- }
-
- private void checkDelete() throws Exception
- {
- if (getRefCount() <= 0)
- {
- if (linkMessage != null)
- {
- // This file is linked to another message, deleting the reference where it
belongs on this case
- linkMessage.decrementDelayDeletionCount();
- }
- else
- {
- if (isTrace)
- {
- log.trace("Deleting file " + file + " as the usage was
complete");
- }
-
- try
- {
- deleteFile();
- }
- catch (Exception e)
- {
- log.error(e.getMessage(), e);
- }
- }
- }
- }
-
- @Override
- public synchronized int decrementRefCount(PagingStore pagingStore, MessageReference
reference) throws Exception
- {
- int currentRefCount = super.decrementRefCount(pagingStore, reference);
-
- // We use <= as this could be used by load.
- // because of a failure, no references were loaded, so we have 0... and we still
need to delete the associated
- // files
- if (delayDeletionCount.get() <= 0)
- {
- checkDelete();
- }
-
- return currentRefCount;
- }
-
- @Override
- public boolean isLargeMessage()
- {
- return true;
- }
-
- public synchronized void deleteFile() throws Exception
- {
- validateFile();
- releaseResources();
- storageManager.deleteFile(file);
- }
-
- public boolean isFileExists() throws Exception
- {
- SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(),
durable);
- return localfile.exists();
- }
-
- // We cache this
- private volatile int memoryEstimate = -1;
-
- @Override
- public synchronized int getMemoryEstimate()
- {
- if (memoryEstimate == -1)
- {
- // The body won't be on memory (aways on-file), so we don't consider
this for paging
- memoryEstimate = getHeadersAndPropertiesEncodeSize() + SIZE_INT +
getEncodeSize() + (16 + 4) * 2 + 1;
- }
-
- return memoryEstimate;
- }
-
- public synchronized void releaseResources()
- {
- if (file != null && file.isOpen())
- {
- try
- {
- file.close();
- }
- catch (Exception e)
- {
- log.error(e.getMessage(), e);
- }
- }
- }
-
- @Override
- public synchronized ServerMessage copy(final long newID) throws Exception
- {
- incrementDelayDeletionCount();
-
- long idToUse = messageID;
-
- if (linkMessage != null)
- {
- idToUse = linkMessage.getMessageID();
- }
-
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
-
- ServerMessage newMessage = new JournalLargeServerMessage(linkMessage == null ?
this
- :
(JournalLargeServerMessage)linkMessage,
- newfile,
- newID);
-
- return newMessage;
- }
-
- public SequentialFile getFile()
- {
- return file;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void finalize() throws Throwable
- {
- releaseResources();
- super.finalize();
- }
-
- // Private -------------------------------------------------------
-
- private synchronized void validateFile() throws Exception
- {
- if (file == null)
- {
- if (messageID <= 0)
- {
- throw new RuntimeException("MessageID not set on LargeMessage");
- }
-
- file = storageManager.createFileForLargeMessage(getMessageID(), durable);
-
- file.open();
-
- bodySize = file.size();
-
- }
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
- */
- public void setLinkedMessage(LargeServerMessage message)
- {
- if (file != null)
- {
- // Sanity check.. it shouldn't happen
- throw new IllegalStateException("LargeMessage file was already set");
- }
-
- this.linkMessage = message;
-
- file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
- try
- {
- file.open();
- this.bodySize = file.size();
- file.close();
- }
- catch (Exception e)
- {
- throw new RuntimeException("could not setup linked file", e);
- }
- }
-
- // Inner classes -------------------------------------------------
-
- class DecodingContext implements LargeMessageEncodingContext
- {
- private SequentialFile cFile;
-
- public void open() throws Exception
- {
- cFile = file.copy();
- cFile.open();
- }
-
- public void close() throws Exception
- {
- cFile.close();
- }
-
- public int write(ByteBuffer bufferRead) throws Exception
- {
- return cFile.read(bufferRead);
- }
-
- public int write(HornetQBuffer bufferOut, int size)
- {
- return -1;
- }
- }
-}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-05
17:57:42 UTC (rev 8228)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-05
19:01:30 UTC (rev 8229)
@@ -398,7 +398,7 @@
public LargeServerMessage createLargeMessage()
{
- return new JournalLargeServerMessage(this);
+ return new FileLargeServerMessage(this);
}
public void addBytesToLargeMessage(SequentialFile file, long messageId, final byte[]
bytes) throws Exception
@@ -420,7 +420,7 @@
replicator.largeMessageBegin(id);
}
- JournalLargeServerMessage largeMessage =
(JournalLargeServerMessage)createLargeMessage();
+ FileLargeServerMessage largeMessage =
(FileLargeServerMessage)createLargeMessage();
HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);