[jboss-cvs] JBoss Messaging SVN: r5158 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/config/impl and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Oct 20 17:14:08 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-20 17:14:08 -0400 (Mon, 20 Oct 2008)
New Revision: 5158
Removed:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java
Modified:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java
Log:
Dealing with LargeMessages and Paging
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/Configuration.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -150,6 +150,10 @@
void setPagingMaxGlobalSizeBytes(long maxGlobalSize);
+ long getPagingDefaultSize();
+
+ void setPagingDefaultSize(long pageSize);
+
// Large Messages Properties ------------------------------------------------------------
String getLargeMessagesDirectory();
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -59,6 +59,8 @@
public static final String DEFAULT_PAGING_DIR = "data/paging";
+ public static final long DEFAULT_DEFAULT_PAGE_SIZE = 10 * 1024 * 1024;
+
public static final String DEFAULT_LARGEMESSAGES_DIR = "data/largemessages";
public static final boolean DEFAULT_CREATE_JOURNAL_DIR = true;
@@ -112,6 +114,8 @@
// Paging related attributes ------------------------------------------------------------
protected long pagingMaxGlobalSize = -1;
+
+ protected long pagingDefaultSize = DEFAULT_DEFAULT_PAGE_SIZE;
protected String pagingDirectory = DEFAULT_PAGING_DIR;
@@ -407,7 +411,23 @@
pagingMaxGlobalSize = maxGlobalSize;
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.config.Configuration#getPagingDefaultSize()
+ */
+ public long getPagingDefaultSize()
+ {
+ return pagingDefaultSize;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.config.Configuration#setPagingDefaultSize(long)
+ */
+ public void setPagingDefaultSize(long pageSize)
+ {
+ this.pagingDefaultSize = pageSize;
+ }
+
public String getLargeMessagesDirectory()
{
return largeMessagesDirectory;
@@ -449,7 +469,11 @@
cother.getJournalMinFiles() == getJournalMinFiles() &&
cother.getJournalType() == getJournalType() &&
cother.getScheduledThreadPoolMaxSize() == getScheduledThreadPoolMaxSize() &&
- cother.getSecurityInvalidationInterval() == getSecurityInvalidationInterval();
+ cother.getSecurityInvalidationInterval() == getSecurityInvalidationInterval() &&
+ cother.getPagingDefaultSize() == getPagingDefaultSize();
}
+
+
+
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -317,6 +317,8 @@
pagingDirectory = getString(e, "paging-directory", pagingDirectory);
pagingMaxGlobalSize = getLong(e, "paging-max-global-size-bytes", pagingMaxGlobalSize);
+
+ pagingDefaultSize = getLong(e, "paging-default-size", pagingDefaultSize);
createJournalDir = getBoolean(e, "create-journal-dir", createJournalDir);
Deleted: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -1,42 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.paging;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * A PageLargeMessage
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created Oct 17, 2008 4:04:39 PM
- *
- *
- */
-public interface PageLargeMessage extends EncodingSupport
-{
- byte[] getBytes();
-
- MessagingBuffer getBuffer();
-}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.paging;
import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.util.TypedProperties;
@@ -36,7 +37,7 @@
*/
public interface PageMessage extends EncodingSupport
{
- Object getMessage();
+ ServerMessage getMessage(StorageManager storageManager);
long getTransactionID();
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -140,4 +140,9 @@
* */
void clearLastPageRecord(LastPageRecord lastRecord) throws Exception;
+ /**
+ * @return
+ */
+ long getDefaultPageSize();
+
}
Deleted: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -1,98 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.paging.impl;
-
-import java.nio.ByteBuffer;
-
-import org.jboss.messaging.core.paging.PageLargeMessage;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * <p>A PageLargeMessageImpl>/p>
- *
- * <p>A Paged Large Message needs to be instantiated later when the StorageManager instance is available.
- * This class will hold the bytes until it is possible to instantiate the LargeMessage properly</p>
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created Oct 17, 2008 2:46:32 PM
- *
- *
- */
-public class PageLargeMessageImpl implements PageLargeMessage
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- final byte[] bytes;
-
- /**
- * @param bytes
- */
- public PageLargeMessageImpl(byte[] bytes)
- {
- super();
- this.bytes = bytes;
- }
-
- /**
- * @return the bytes
- */
- public byte[] getBytes()
- {
- return bytes;
- }
-
- public MessagingBuffer getBuffer()
- {
- return new ByteBufferWrapper(ByteBuffer.wrap(bytes));
- }
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.journal.EncodingSupport#decode(org.jboss.messaging.core.remoting.spi.MessagingBuffer)
- */
- public void decode(MessagingBuffer buffer)
- {
- throw new IllegalStateException("Not supported");
- }
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.journal.EncodingSupport#encode(org.jboss.messaging.core.remoting.spi.MessagingBuffer)
- */
- public void encode(MessagingBuffer buffer)
- {
- buffer.putBytes(bytes);
- }
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.journal.EncodingSupport#getEncodeSize()
- */
- public int getEncodeSize()
- {
- return bytes.length;
- }
-
-}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -22,8 +22,11 @@
package org.jboss.messaging.core.paging.impl;
-import org.jboss.messaging.core.journal.EncodingSupport;
+import java.nio.ByteBuffer;
+
import org.jboss.messaging.core.paging.PageMessage;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
@@ -52,7 +55,10 @@
// Public --------------------------------------------------------
- private EncodingSupport message;
+ /** Large messages will need to be instatiated lazily during getMessage when the StorageManager is available */
+ private byte[] largeMessageLazyData;
+
+ private ServerMessage message;
private long transactionID = -1;
@@ -89,8 +95,15 @@
this.properties = properties;
}
- public Object getMessage()
+ public ServerMessage getMessage(StorageManager storage)
{
+ if (this.largeMessageLazyData != null)
+ {
+ this.message = storage.createLargeMessageStorage();
+ MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.wrap(largeMessageLazyData));
+ message.decode(buffer);
+ largeMessageLazyData = null;
+ }
return message;
}
@@ -116,12 +129,10 @@
{
int largeMessageHeaderSize = buffer.getInt();
- byte[] bytesLargeMessage = new byte[largeMessageHeaderSize];
+ this.largeMessageLazyData = new byte[largeMessageHeaderSize];
- buffer.getBytes(bytesLargeMessage);
+ buffer.getBytes(largeMessageLazyData);
- this.message = new PageLargeMessageImpl(bytesLargeMessage);
-
}
else
{
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -41,7 +41,6 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -60,7 +59,6 @@
{
// Constants -----------------------------------------------------
- private static final long WATERMARK_GLOBAL_PAGE = QueueSettings.DEFAULT_PAGE_SIZE_BYTES;
// Attributes ----------------------------------------------------
@@ -82,6 +80,8 @@
private final StorageManager storageManager;
+ private final long defaultPageSize;
+
private PostOffice postOffice;
private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
@@ -96,8 +96,6 @@
private static final SimpleString SCHEDULED_DELIVERY_PROP = new SimpleString("JBM_SCHEDULED_DELIVERY_PROP");
- private static final SimpleString MESSAGE_ID_PROP = new SimpleString("JBM_MESSAGE_ID");
-
// This is just a debug tool method.
// During debugs you could make log.trace as log.info, and change the
// variable isTrace above
@@ -113,11 +111,13 @@
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final StorageManager storageManager,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
- final long maxGlobalSize)
+ final long maxGlobalSize,
+ final long defaultPageSize)
{
this.pagingSPI = pagingSPI;
this.queueSettingsRepository = queueSettingsRepository;
this.storageManager = storageManager;
+ this.defaultPageSize = defaultPageSize;
this.maxGlobalSize = maxGlobalSize;
}
@@ -214,18 +214,9 @@
for (PageMessage msg : data)
{
ServerMessage pagedMessage = null;
-
- if (msg.getMessage() instanceof ServerMessage)
- {
- pagedMessage = (ServerMessage) msg.getMessage();
- }
- else
- {
- PageLargeMessageImpl pageLargeMessage = (PageLargeMessageImpl) msg.getMessage();
- ServerLargeMessage message = this.storageManager.createLargeMessageStorage();
- message.decode(pageLargeMessage.getBuffer());
- }
-
+
+ pagedMessage = (ServerMessage)msg.getMessage(storageManager);
+
final long transactionIdDuringPaging = msg.getTransactionID();
if (transactionIdDuringPaging >= 0)
{
@@ -238,7 +229,7 @@
{
if (isTrace)
{
- trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage());
+ trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + pagedMessage);
}
continue;
}
@@ -247,7 +238,7 @@
// before the commit arrived
if (!pageTransactionInfo.waitCompletion())
{
- trace("Rollback was called after prepare, ignoring message " + msg.getMessage());
+ trace("Rollback was called after prepare, ignoring message " + pagedMessage);
continue;
}
@@ -258,9 +249,9 @@
pageTransactionsToUpdate.add(pageTransactionInfo);
}
}
- Long scheduledDeliveryTime = (Long) msg.getProperties().getProperty(SCHEDULED_DELIVERY_PROP);
- //if this is a scheduled message we add it to the queue as just that
- if(scheduledDeliveryTime == null)
+ Long scheduledDeliveryTime = (Long)msg.getProperties().getProperty(SCHEDULED_DELIVERY_PROP);
+ // if this is a scheduled message we add it to the queue as just that
+ if (scheduledDeliveryTime == null)
{
refsToAdd.addAll(postOffice.route(pagedMessage));
}
@@ -277,10 +268,12 @@
if (pagedMessage.getDurableRefCount() != 0)
{
storageManager.storeMessageTransactional(depageTransactionID, pagedMessage);
- //write the scheduled message record if needed
- if(scheduledDeliveryTime != null)
+ // write the scheduled message record if needed
+ if (scheduledDeliveryTime != null)
{
- storageManager.storeMessageScheduledTransactional(depageTransactionID, pagedMessage, scheduledDeliveryTime);
+ storageManager.storeMessageScheduledTransactional(depageTransactionID,
+ pagedMessage,
+ scheduledDeliveryTime);
}
}
}
@@ -313,7 +306,8 @@
}
if (globalMode.get())
{
- return globalSize.get() < maxGlobalSize - WATERMARK_GLOBAL_PAGE && pagingStore.getMaxSizeBytes() <= 0 ||
+ // We use the Default Page Size when in global mode for the calculation of the Watermark
+ return globalSize.get() < maxGlobalSize - defaultPageSize && pagingStore.getMaxSizeBytes() <= 0 ||
pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes();
}
else
@@ -325,6 +319,11 @@
}
+ public long getDefaultPageSize()
+ {
+ return defaultPageSize;
+ }
+
public void setLastPage(final LastPageRecord lastPage) throws Exception
{
trace("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
@@ -487,8 +486,22 @@
{
// When in Global mode, we use the default page size as the minimal
// watermark to start depage
- if (globalMode.get() && currentGlobalSize < maxGlobalSize - QueueSettings.DEFAULT_PAGE_SIZE_BYTES)
+
+ if (isTrace)
{
+ log.trace("globalMode.get = " + globalMode.get() +
+ " currentGlobalSize = " +
+ currentGlobalSize +
+ " defaultPageSize = " +
+ defaultPageSize +
+ " maxGlobalSize = " +
+ maxGlobalSize +
+ "maxGlobalSize - defaultPageSize = " +
+ (maxGlobalSize - defaultPageSize));
+ }
+
+ if (globalMode.get() && currentGlobalSize < maxGlobalSize - defaultPageSize)
+ {
startGlobalDepage();
}
else if (maxSize > 0 && addressSize < maxSize - pageSize)
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -31,6 +31,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
@@ -72,7 +73,7 @@
private final boolean dropMessagesOnSize;
private boolean droppedMessages;
-
+
private final PagingManager pagingManager;
private final Executor executor;
@@ -113,7 +114,14 @@
this.fileFactory = fileFactory;
this.storeName = storeName;
maxSize = queueSettings.getMaxSizeBytes();
- pageSize = queueSettings.getPageSizeBytes();
+ if (queueSettings.getPageSizeBytes() != null)
+ {
+ this.pageSize = queueSettings.getPageSizeBytes();
+ }
+ else
+ {
+ this.pageSize = pagingManager.getDefaultPageSize();
+ }
dropMessagesOnSize = queueSettings.isDropMessagesWhenFull();
this.executor = executor;
this.pagingManager = pagingManager;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -70,9 +70,7 @@
void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
/** Create an area that will get LargeMessage bytes on the server size*/
- ServerLargeMessage createLargeMessageStorage() throws Exception;
-
-
+ ServerLargeMessage createLargeMessageStorage();
/** Used to delete non-messaging data (such as PageTransaction and LasPage) */
void storeDeleteTransactional(long txID, long recordID) throws Exception;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -217,7 +217,7 @@
}
/** Create an area that will get LargeMessage bytes on the server size*/
- public ServerLargeMessage createLargeMessageStorage() throws Exception
+ public ServerLargeMessage createLargeMessageStorage()
{
return new JournalServerLargeMessageImpl(this);
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -150,7 +150,7 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.persistence.StorageManager#createLargeMessageStorage(long, int, int)
*/
- public ServerLargeMessage createLargeMessageStorage() throws Exception
+ public ServerLargeMessage createLargeMessageStorage()
{
return new NullStorageServerLargeMessageImpl();
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -93,17 +93,6 @@
{
// nothing to be done here.. we don really have a file on this Storage
}
- public int decrementRefCount()
- {
- int currentRefCount = super.decrementRefCount();
-
- if (currentRefCount == 0)
- {
- System.out.println("I would delete the file if I had one now");
- }
-
- return currentRefCount;
- }
// Package protected ---------------------------------------------
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -491,7 +491,6 @@
private void doWrite(final Packet packet)
{
- //System.out.println(packet.getClass().getCanonicalName() + " size = " + packet.getPacketSize());
final MessagingBuffer buffer = transportConnection.createBuffer(packet.getPacketSize());
packet.encode(buffer);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.util.DataConstants;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -112,7 +113,12 @@
{
return requiresResponse;
}
-
+
+ public int getPacketSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + clientMessage.getEncodeSize() + DataConstants.BOOLEAN;
+ }
+
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putLong(producerID);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -199,7 +199,8 @@
pagingManager = new PagingManagerImpl(storeFactory,
storageManager,
queueSettingsRepository,
- configuration.getPagingMaxGlobalSizeBytes());
+ configuration.getPagingMaxGlobalSizeBytes(),
+ configuration.getPagingDefaultSize());
storeFactory.setPagingManager(pagingManager);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/settings/impl/QueueSettings.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -49,8 +49,6 @@
public static final Boolean DEFAULT_DROP_MESSAGES_WHEN_FULL = Boolean.FALSE;
- public static final Integer DEFAULT_PAGE_SIZE_BYTES = 10 * 1024 * 1024; // 10M Bytes
-
public static final Integer DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;
public static final Integer DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT = 0;
@@ -97,7 +95,7 @@
public Integer getPageSizeBytes()
{
- return pageSizeBytes != null ? pageSizeBytes : DEFAULT_PAGE_SIZE_BYTES;
+ return pageSizeBytes;
}
public Boolean isDropMessagesWhenFull()
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -28,6 +28,7 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.HashMap;
import junit.framework.AssertionFailedError;
@@ -37,12 +38,15 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.util.DataConstants;
import org.jboss.messaging.util.SimpleString;
@@ -138,30 +142,11 @@
session.createQueue(ADDRESS, queue[0], null, true, false);
session.createQueue(ADDRESS, queue[1], null, true, false);
- FileClientMessage clientFile = session.createFileMessage(true);
-
- File tmpFile = new File(temporaryDir + "/" + "tmpUpload.data");
-
- RandomAccessFile random = new RandomAccessFile(tmpFile, "rw");
- FileChannel channel = random.getChannel();
-
- ByteBuffer buffer = ByteBuffer.allocate(4);
-
+
int numberOfIntegers = 10000;
-
- for (int i = 0; i < numberOfIntegers; i++)
- {
- buffer.rewind();
- buffer.putInt(i);
- buffer.rewind();
- channel.write(buffer);
- }
-
- channel.close();
- random.close();
-
- clientFile.setFile(tmpFile);
-
+
+ FileClientMessage clientFile = createLargeClientMessage(session, numberOfIntegers);
+
ClientProducer producer = session.createProducer(ADDRESS);
producer.send(clientFile);
@@ -203,9 +188,196 @@
public void testPageOnLargeMessage() throws Exception
{
- // TODO: Write a test with LargeMessages and paging
+ testPageOnLargeMessage(true, false);
+
}
+
+ public void testPageOnLargeMessageNullPersistence() throws Exception
+ {
+ testPageOnLargeMessage(false, false);
+
+ }
+
+ private void testPageOnLargeMessage(boolean realFiles, boolean sendBlocking) throws Exception
+ {
+
+ clearData();
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(20 * 1024);
+ config.setPagingDefaultSize(10 * 1024);
+
+ messagingService = createService(realFiles, false, config, new HashMap<String, QueueSettings>());
+
+ messagingService.start();
+
+ final int numberOfIntegers = 256;
+
+ final int numberOfIntegersBigMessage = 10000;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ if (sendBlocking)
+ {
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ }
+
+ ClientSession session = sf.createSession(false, true, true, false);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+
+ // printBuffer("body to be sent : " , body);
+
+ ClientMessage message = null;
+
+ MessagingBuffer body = null;
+
+ for (int i = 0; i < 100; i++)
+ {
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ for (int j = 1; j <= numberOfIntegers; j++)
+ {
+ bodyLocal.putInt(j);
+ }
+ bodyLocal.flip();
+
+ if (i == 0)
+ {
+ body = bodyLocal;
+ }
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+
+ producer.send(message);
+ }
+
+ FileClientMessage clientFile = createLargeClientMessage(session, numberOfIntegersBigMessage);
+
+ producer.send(clientFile);
+
+ session.close();
+
+ if (realFiles)
+ {
+ messagingService.stop();
+
+ System.out.println("Acceptor "+ InVMRegistry.instance.getAcceptor(0));
+ InVMRegistry.instance.clear();
+
+ config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(20 * 1024);
+ config.setPagingDefaultSize(10 * 1024);
+
+ messagingService = createService(true, false, config, new HashMap<String, QueueSettings>());
+ messagingService.start();
+
+ sf = createInVMFactory();
+ }
+
+ session = sf.createSession(false, true, true, false);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+// if (realFiles)
+// {
+// consumer.setLargeMessagesAsFiles(true);
+// consumer.setLargeMessagesDir(new File(clientLargeMessagesDir));
+// }
+
+ session.start();
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 = consumer.receive(10000);
+
+ assertNotNull(message2);
+
+ message2.processed();
+
+ System.out.println("msg on client = " + message2.getMessageID());
+
+ assertNotNull(message2);
+
+ try
+ {
+ assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
+ }
+ catch (AssertionFailedError e)
+ {
+ log.info("Expected buffer:" + dumbBytesHex(body.array(), 40));
+ log.info("Arriving buffer:" + dumbBytesHex(message2.getBody().array(), 40));
+ throw e;
+ }
+ }
+
+ consumer.close();
+
+ session.close();
+
+ session = sf.createSession(false, true, true, false);
+
+ readMessage(session, ADDRESS, numberOfIntegersBigMessage);
+
+ // printBuffer("message received : ", message2.getBody());
+
+
+ session.close();
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+
+ }
+
+ private FileClientMessage createLargeClientMessage(ClientSession session, int numberOfIntegers) throws Exception
+ {
+
+ FileClientMessage clientMessage = session.createFileMessage(true);
+
+ File tmpFile = new File(temporaryDir + "/" + "tmpUpload.data");
+
+ RandomAccessFile random = new RandomAccessFile(tmpFile, "rw");
+ FileChannel channel = random.getChannel();
+
+ ByteBuffer buffer = ByteBuffer.allocate(4);
+
+ for (int i = 0; i < numberOfIntegers; i++)
+ {
+ buffer.rewind();
+ buffer.putInt(i);
+ buffer.rewind();
+ channel.write(buffer);
+ }
+
+ channel.close();
+ random.close();
+
+ clientMessage.setFile(tmpFile);
+
+ return clientMessage;
+ }
+
/**
* @param session
* @param queueToRead
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -69,7 +69,8 @@
PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir),
null,
queueSettings,
- -1);
+ -1,
+ 1024 * 1024);
managerImpl.start();
PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
@@ -92,7 +93,7 @@
assertEquals(1, msgs.length);
- assertEqualsByteArrays(msg.getBody().array(), ((ServerMessage)msgs[0].getMessage()).getBody().array());
+ assertEqualsByteArrays(msg.getBody().array(), (msgs[0].getMessage(null)).getBody().array());
assertTrue(store.isPaging());
@@ -115,7 +116,8 @@
PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir),
null,
queueSettings,
- -1);
+ -1,
+ 1024 * 1024);
managerImpl.start();
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -119,11 +119,11 @@
for (int i = 0; i < msgs.length; i++)
{
- assertEquals(i, ((ServerMessage)msgs[i].getMessage()).getMessageID());
+ assertEquals(i, (msgs[i].getMessage(null)).getMessageID());
- assertEquals(simpleDestination, ((ServerMessage)msgs[i].getMessage()).getDestination());
+ assertEquals(simpleDestination, (msgs[i].getMessage(null)).getDestination());
- assertEqualsByteArrays(buffers.get(i).array(), ((ServerMessage)msgs[i].getMessage()).getBody().array());
+ assertEqualsByteArrays(buffers.get(i).array(), (msgs[i].getMessage(null)).getBody().array());
}
impl.delete();
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -72,7 +72,7 @@
queueSettings.setDefault(new QueueSettings());
PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- PagingManagerImpl manager = new PagingManagerImpl(spi, null, queueSettings, -1);
+ PagingManagerImpl manager = new PagingManagerImpl(spi, null, queueSettings, -1, 1024 * 1024);
SimpleString destination = new SimpleString("some-destination");
@@ -124,7 +124,7 @@
public void testMultipleThreadsGetStore() throws Exception
{
PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- final PagingManagerImpl manager = new PagingManagerImpl(spi, null, repoSettings, -1);
+ final PagingManagerImpl manager = new PagingManagerImpl(spi, null, repoSettings, -1, 1024 * 1024);
final SimpleString destination = new SimpleString("some-destination");
@@ -209,7 +209,7 @@
PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
PagingStore store = EasyMock.createNiceMock(PagingStore.class);
StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
- PagingManagerImpl manager = new PagingManagerImpl(spi, storageManager, queueSettings, -1);
+ PagingManagerImpl manager = new PagingManagerImpl(spi, storageManager, queueSettings, -1, 1024 * 1024);
manager.setPostOffice(po);
ServerMessage message = EasyMock.createStrictMock(ServerMessage.class);
@@ -242,7 +242,7 @@
PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
PagingStore store = EasyMock.createNiceMock(PagingStore.class);
StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
- PagingManagerImpl manager = new PagingManagerImpl(spi, storageManager, queueSettings, -1);
+ PagingManagerImpl manager = new PagingManagerImpl(spi, storageManager, queueSettings, -1, 1024 * 1024);
manager.setPostOffice(po);
ServerMessage message = EasyMock.createStrictMock(ServerMessage.class);
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -22,6 +22,10 @@
package org.jboss.messaging.tests.unit.core.paging.impl;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PageMessage;
@@ -29,15 +33,10 @@
import org.jboss.messaging.core.paging.impl.PageMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.jboss.messaging.util.SimpleString;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
/**
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
@@ -202,8 +201,8 @@
for (int i = 0; i < 10; i++)
{
- assertEquals(0, ((ServerMessage)msg[i].getMessage()).getMessageID());
- assertEqualsByteArrays(buffers.get(i).array(),((ServerMessage)msg[i].getMessage()).getBody().array());
+ assertEquals(0, (msg[i].getMessage(null)).getMessageID());
+ assertEqualsByteArrays(buffers.get(i).array(), (msg[i].getMessage(null)).getBody().array());
}
}
@@ -265,8 +264,8 @@
for (int i = 0; i < 5; i++)
{
- assertEquals(0, ((ServerMessage)msg[i].getMessage()).getMessageID());
- assertEqualsByteArrays(buffers.get(pageNr * 5 + i).array(), ((ServerMessage)msg[i].getMessage()).getBody().array());
+ assertEquals(0, (msg[i].getMessage(null)).getMessageID());
+ assertEqualsByteArrays(buffers.get(pageNr * 5 + i).array(), (msg[i].getMessage(null)).getBody().array());
}
}
@@ -308,9 +307,9 @@
assertEquals(1, msgs.length);
- assertEquals(0l, ((ServerMessage)msgs[0].getMessage()).getMessageID());
+ assertEquals(0l, (msgs[0].getMessage(null)).getMessageID());
- assertEqualsByteArrays(buffers.get(0).array(), ((ServerMessage)msgs[0].getMessage()).getBody().array());
+ assertEqualsByteArrays(buffers.get(0).array(), (msgs[0].getMessage(null)).getBody().array());
assertEquals(1, storeImpl.getNumberOfPages());
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -229,17 +229,16 @@
for (PageMessage msg : msgs)
{
- ((ServerMessage)msg.getMessage()).getBody().rewind();
- long id = ((ServerMessage)msg.getMessage()).getBody().getLong();
- ((ServerMessage)msg.getMessage()).getBody().rewind();
+ (msg.getMessage(null)).getBody().rewind();
+ long id = (msg.getMessage(null)).getBody().getLong();
+ (msg.getMessage(null)).getBody().rewind();
PageMessageImpl msgWritten = buffers.remove(id);
buffers2.put(id, msg);
assertNotNull(msgWritten);
- assertEquals(((ServerMessage)msg.getMessage()).getDestination(),
- ((ServerMessage)msgWritten.getMessage()).getDestination());
- assertEqualsByteArrays(((ServerMessage)msgWritten.getMessage()).getBody().array(),
- ((ServerMessage)msg.getMessage()).getBody().array());
+ assertEquals((msg.getMessage(null)).getDestination(), (msgWritten.getMessage(null)).getDestination());
+ assertEqualsByteArrays((msgWritten.getMessage(null)).getBody().array(), (msg.getMessage(null)).getBody()
+ .array());
}
}
@@ -295,14 +294,13 @@
for (PageMessage msg : msgs)
{
- ((ServerMessage)msg.getMessage()).getBody().rewind();
- long id = ((ServerMessage)msg.getMessage()).getBody().getLong();
+ (msg.getMessage(null)).getBody().rewind();
+ long id = (msg.getMessage(null)).getBody().getLong();
PageMessage msgWritten = buffers2.remove(id);
assertNotNull(msgWritten);
- assertEquals(((ServerMessage)msg.getMessage()).getDestination(),
- ((ServerMessage)msgWritten.getMessage()).getDestination());
- assertEqualsByteArrays(((ServerMessage)msgWritten.getMessage()).getBody().array(),
- ((ServerMessage)msg.getMessage()).getBody().array());
+ assertEquals((msg.getMessage(null)).getDestination(), (msgWritten.getMessage(null)).getDestination());
+ assertEqualsByteArrays((msgWritten.getMessage(null)).getBody().array(), (msg.getMessage(null)).getBody()
+ .array());
}
}
@@ -311,10 +309,10 @@
lastPage.close();
assertEquals(1, lastMessages.length);
- ((ServerMessage)lastMessages[0].getMessage()).getBody().rewind();
- assertEquals(((ServerMessage)lastMessages[0].getMessage()).getBody().getLong(), lastMessageId);
- assertEqualsByteArrays(((ServerMessage)lastMessages[0].getMessage()).getBody().array(),
- ((ServerMessage)lastMsg.getMessage()).getBody().array());
+ (lastMessages[0].getMessage(null)).getBody().rewind();
+ assertEquals((lastMessages[0].getMessage(null)).getBody().getLong(), lastMessageId);
+ assertEqualsByteArrays((lastMessages[0].getMessage(null)).getBody().array(), (lastMsg.getMessage(null)).getBody()
+ .array());
assertEquals(0, buffers2.size());
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java 2008-10-20 14:14:24 UTC (rev 5157)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/settings/impl/QueueSettingsTest.java 2008-10-20 21:14:08 UTC (rev 5158)
@@ -41,7 +41,7 @@
assertEquals(queueSettings.getExpiryQueue(), null);
assertEquals(queueSettings.getMaxDeliveryAttempts(), QueueSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS);
assertEquals(queueSettings.getMaxSizeBytes(), QueueSettings.DEFAULT_MAX_SIZE_BYTES);
- assertEquals(queueSettings.getPageSizeBytes(), QueueSettings.DEFAULT_PAGE_SIZE_BYTES);
+ assertEquals(queueSettings.getPageSizeBytes(), null);
assertEquals(queueSettings.getMessageCounterHistoryDayLimit(), QueueSettings.DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT);
assertEquals(queueSettings.getRedeliveryDelay(), QueueSettings.DEFAULT_REDELIVER_DELAY);
More information about the jboss-cvs-commits
mailing list