[jboss-cvs] JBoss Messaging SVN: r5403 - in trunk: src/main/org/jboss/messaging/core/paging/impl and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 20 14:10:37 EST 2008
Author: timfox
Date: 2008-11-20 14:10:37 -0500 (Thu, 20 Nov 2008)
New Revision: 5403
Added:
trunk/src/main/org/jboss/messaging/core/paging/PagedMessage.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagedMessageImpl.java
Removed:
trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
Modified:
trunk/src/main/org/jboss/messaging/core/paging/Page.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
More tweaks to paging
Modified: trunk/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/Page.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/paging/Page.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -34,9 +34,9 @@
{
int getPageId();
- void write(PageMessage message) throws Exception;
+ void write(PagedMessage message) throws Exception;
- PageMessage[] read() throws Exception;
+ PagedMessage[] read() throws Exception;
int getSize();
Deleted: trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -1,42 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.paging;
-
-import org.jboss.messaging.core.journal.EncodingSupport;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.server.ServerMessage;
-
-/**
- *
- * The record taken by Page.
- * We can't just record the ServerMessage as we need other information (such as the TransactionID used during paging)
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public interface PageMessage extends EncodingSupport
-{
- ServerMessage getMessage(StorageManager storageManager);
-
- long getTransactionID();
-}
Copied: trunk/src/main/org/jboss/messaging/core/paging/PagedMessage.java (from rev 5399, trunk/src/main/org/jboss/messaging/core/paging/PageMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagedMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagedMessage.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.server.ServerMessage;
+
+/**
+ *
+ * A Paged message
+ *
+ * We can't just record the ServerMessage as we need other information (such as the TransactionID used during paging)
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface PagedMessage extends EncodingSupport
+{
+ ServerMessage getMessage(StorageManager storageManager);
+
+ long getTransactionID();
+}
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -80,7 +80,7 @@
* @param pagingStoreImpl
* @return false if the listener can't handle more pages
*/
- boolean onDepage(int pageId, SimpleString destination, PagingStore pagingStoreImpl, PageMessage[] data) throws Exception;
+ boolean onDepage(int pageId, SimpleString destination, PagingStore pagingStoreImpl, PagedMessage[] data) throws Exception;
/**
* To be used by transactions only.
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -67,7 +67,7 @@
public boolean readPage() throws Exception;
- boolean page(PageMessage message) throws Exception;
+ boolean page(PagedMessage message) throws Exception;
/**
* Remove the first page from the Writing Queue.
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -29,13 +29,11 @@
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.Page;
-import org.jboss.messaging.core.paging.PageMessage;
+import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.util.VariableLatch;
/**
*
@@ -62,8 +60,6 @@
private final SequentialFileFactory fileFactory;
- private final PagingCallback callback;
-
private final AtomicInteger size = new AtomicInteger(0);
// Static --------------------------------------------------------
@@ -75,14 +71,6 @@
this.pageId = pageId;
this.file = file;
fileFactory = factory;
- if (factory.isSupportsCallbacks())
- {
- callback = new PagingCallback();
- }
- else
- {
- callback = null;
- }
}
// Public --------------------------------------------------------
@@ -94,9 +82,9 @@
return pageId;
}
- public PageMessage[] read() throws Exception
+ public PagedMessage[] read() throws Exception
{
- ArrayList<PageMessage> messages = new ArrayList<PageMessage>();
+ ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
ByteBuffer buffer = fileFactory.newBuffer((int)file.size());
file.position(0);
@@ -118,7 +106,7 @@
int oldPos = buffer.position();
if (buffer.position() + messageSize < buffer.limit() && buffer.get(oldPos + messageSize) == END_BYTE)
{
- PageMessage msg = instantiateObject();
+ PagedMessage msg = new PagedMessageImpl();
msg.decode(messageBuffer);
messages.add(msg);
}
@@ -136,10 +124,10 @@
numberOfMessages.set(messages.size());
- return messages.toArray(instantiateArray(messages.size()));
+ return messages.toArray(new PagedMessage[messages.size()]);
}
- public void write(final PageMessage message) throws Exception
+ public void write(final PagedMessage message) throws Exception
{
ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + SIZE_RECORD);
buffer.put(START_BYTE);
@@ -148,31 +136,15 @@
buffer.put(END_BYTE);
buffer.rewind();
- if (callback != null)
- {
- callback.countUp();
- file.write(buffer, callback);
- }
- else
- {
- file.write(buffer, false);
- }
+ file.write(buffer, false);
numberOfMessages.incrementAndGet();
size.addAndGet(buffer.limit());
-
}
public void sync() throws Exception
{
- if (callback != null)
- {
- callback.waitCompletion();
- }
- else
- {
- file.sync();
- }
+ file.sync();
}
public void open() throws Exception
@@ -206,53 +178,7 @@
// Protected -----------------------------------------------------
- protected PageMessage instantiateObject()
- {
- return new PageMessageImpl();
- }
-
- protected PageMessage[] instantiateArray(final int size)
- {
- return new PageMessage[size];
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
- private static class PagingCallback implements IOCallback
- {
- private final VariableLatch countLatch = new VariableLatch();
-
- private volatile String errorMessage = null;
-
- private volatile int errorCode = 0;
-
- public void countUp()
- {
- countLatch.up();
- }
-
- public void done()
- {
- countLatch.down();
- }
-
- public void waitCompletion() throws InterruptedException
- {
- countLatch.waitCompletion();
-
- if (errorMessage != null)
- {
- throw new IllegalStateException("Error on Callback: " + errorCode + " - " + errorMessage);
- }
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- this.errorMessage = errorMessage;
- this.errorCode = errorCode;
- countLatch.down();
- }
- }
}
Deleted: trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -1,146 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.paging.impl;
-
-import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
-import static org.jboss.messaging.util.DataConstants.SIZE_INT;
-import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
-
-import java.nio.ByteBuffer;
-
-import org.jboss.messaging.core.paging.PageMessage;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerLargeMessage;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-
-/**
- *
- * This class is used to encapsulate ServerMessage and TransactionID on Paging
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
- *
- */
-public class PageMessageImpl implements PageMessage
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- /** Large messages will need to be instatiated lazily during getMessage when the StorageManager is available */
- private byte[] largeMessageLazyData;
-
- private ServerMessage message;
-
- private long transactionID = -1;
-
- public PageMessageImpl(final ServerMessage message, final long transactionID)
- {
- this.message = message;
- this.transactionID = transactionID;
- }
-
- public PageMessageImpl(final ServerMessage message)
- {
- this.message = message;
- }
-
- public PageMessageImpl()
- {
- this(new ServerMessageImpl());
- }
-
- public ServerMessage getMessage(final StorageManager storage)
- {
- if (this.largeMessageLazyData != null)
- {
- this.message = storage.createLargeMessageStorage();
- MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.wrap(largeMessageLazyData));
- message.decode(buffer);
- largeMessageLazyData = null;
- }
- return message;
- }
-
- public long getTransactionID()
- {
- return transactionID;
- }
-
- // EncodingSupport implementation --------------------------------
-
- public void decode(final MessagingBuffer buffer)
- {
- transactionID = buffer.getLong();
-
- boolean isLargeMessage = buffer.getBoolean();
-
- if (isLargeMessage)
- {
- int largeMessageHeaderSize = buffer.getInt();
-
- this.largeMessageLazyData = new byte[largeMessageHeaderSize];
-
- buffer.getBytes(largeMessageLazyData);
-
- }
- else
- {
- buffer.getInt(); // This value is only used on LargeMessages for now
- message = new ServerMessageImpl();
- message.decode(buffer);
- }
-
- }
-
- public void encode(final MessagingBuffer buffer)
- {
- buffer.putLong(transactionID);
- buffer.putBoolean(message instanceof ServerLargeMessage);
- buffer.putInt(message.getEncodeSize());
- message.encode(buffer);
- }
-
- public int getEncodeSize()
- {
- return SIZE_LONG + SIZE_BYTE + SIZE_INT + message.getEncodeSize();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -30,7 +30,6 @@
import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.DataConstants;
/**
*
@@ -161,6 +160,7 @@
public void markIncomplete()
{
complete = false;
+
countDownCompleted = new CountDownLatch(1);
}
Copied: trunk/src/main/org/jboss/messaging/core/paging/impl/PagedMessageImpl.java (from rev 5401, trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagedMessageImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagedMessageImpl.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -0,0 +1,150 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.paging.impl;
+
+import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
+import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.paging.PagedMessage;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerLargeMessage;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+
+/**
+ *
+ * This class represents a paged message
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
+ *
+ */
+public class PagedMessageImpl implements PagedMessage
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /** Large messages will need to be instatiated lazily during getMessage when the StorageManager is available */
+ private byte[] largeMessageLazyData;
+
+ private ServerMessage message;
+
+ private long transactionID = -1;
+
+ public PagedMessageImpl(final ServerMessage message, final long transactionID)
+ {
+ this.message = message;
+ this.transactionID = transactionID;
+ }
+
+ public PagedMessageImpl(final ServerMessage message)
+ {
+ this.message = message;
+ }
+
+ public PagedMessageImpl()
+ {
+ this(new ServerMessageImpl());
+ }
+
+ public ServerMessage getMessage(final StorageManager storage)
+ {
+ if (largeMessageLazyData != null)
+ {
+ message = storage.createLargeMessage();
+ MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.wrap(largeMessageLazyData));
+ message.decode(buffer);
+ largeMessageLazyData = null;
+ }
+ return message;
+ }
+
+ public long getTransactionID()
+ {
+ return transactionID;
+ }
+
+ // EncodingSupport implementation --------------------------------
+
+ public void decode(final MessagingBuffer buffer)
+ {
+ transactionID = buffer.getLong();
+
+ boolean isLargeMessage = buffer.getBoolean();
+
+ if (isLargeMessage)
+ {
+ int largeMessageHeaderSize = buffer.getInt();
+
+ largeMessageLazyData = new byte[largeMessageHeaderSize];
+
+ buffer.getBytes(largeMessageLazyData);
+ }
+ else
+ {
+ buffer.getInt(); // This value is only used on LargeMessages for now
+
+ message = new ServerMessageImpl();
+
+ message.decode(buffer);
+ }
+
+ }
+
+ public void encode(final MessagingBuffer buffer)
+ {
+ buffer.putLong(transactionID);
+
+ buffer.putBoolean(message instanceof ServerLargeMessage);
+
+ buffer.putInt(message.getEncodeSize());
+
+ message.encode(buffer);
+ }
+
+ public int getEncodeSize()
+ {
+ return SIZE_LONG + SIZE_BYTE + SIZE_INT + message.getEncodeSize();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -33,7 +33,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PageMessage;
+import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
@@ -151,8 +151,10 @@
{
store = oldStore;
}
-
- store.start();
+ else
+ {
+ store.start();
+ }
}
return store;
@@ -181,23 +183,25 @@
public void clearLastPageRecord(final LastPageRecord lastRecord) throws Exception
{
trace("Clearing lastRecord information " + lastRecord.getLastId());
+
storageManager.storeDelete(lastRecord.getRecordId());
}
/**
- * This method will remove files from the page system and add them into the journal, doing it transactionally
+ * This method will remove files from the page system and and route them, doing it transactionally
*
* A Transaction will be opened only if persistent messages are used.
+ *
* If persistent messages are also used, it will update eventual PageTransactions
*/
public boolean onDepage(final int pageId,
final SimpleString destination,
final PagingStore pagingStore,
- final PageMessage[] data) throws Exception
+ final PagedMessage[] data) throws Exception
{
trace("Depaging....");
- // / Depage has to be done atomically, in case of failure it should be
+ // Depage has to be done atomically, in case of failure it should be
// back to where it was
final long depageTransactionID = storageManager.generateUniqueID();
@@ -225,13 +229,14 @@
final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
- for (PageMessage msg : data)
+ for (PagedMessage msg : data)
{
ServerMessage pagedMessage = null;
pagedMessage = (ServerMessage)msg.getMessage(storageManager);
final long transactionIdDuringPaging = msg.getTransactionID();
+
if (transactionIdDuringPaging >= 0)
{
final PageTransactionInfo pageTransactionInfo = transactions.get(transactionIdDuringPaging);
@@ -256,7 +261,7 @@
continue;
}
- // / Update information about transactions
+ // Update information about transactions
if (pagedMessage.isDurable())
{
pageTransactionInfo.decrement();
@@ -319,6 +324,7 @@
public void setLastPage(final LastPageRecord lastPage) throws Exception
{
trace("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
+
getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
}
@@ -339,12 +345,12 @@
public boolean page(final ServerMessage message, final long transactionId) throws Exception
{
- return getPageStore(message.getDestination()).page(new PageMessageImpl(message, transactionId));
+ return getPageStore(message.getDestination()).page(new PagedMessageImpl(message, transactionId));
}
public boolean page(final ServerMessage message) throws Exception
{
- return getPageStore(message.getDestination()).page(new PageMessageImpl(message));
+ return getPageStore(message.getDestination()).page(new PagedMessageImpl(message));
}
public void addTransaction(final PageTransactionInfo pageTransaction)
@@ -368,13 +374,23 @@
return started;
}
- public void start() throws Exception
+ public synchronized void start() throws Exception
{
+ if (started)
+ {
+ return;
+ }
+
started = true;
}
- public void stop() throws Exception
+ public synchronized void stop() throws Exception
{
+ if (!started)
+ {
+ return;
+ }
+
started = false;
pagingSPI.stop();
@@ -413,6 +429,7 @@
if (!store.isDroppedMessage())
{
store.setDroppedMessage(true);
+
log.warn("Messages are being dropped on adress " + store.getStoreName());
}
@@ -536,7 +553,6 @@
if (globalSize.get() < maxGlobalSize && started)
{
-
globalMode.set(false);
// Clearing possible messages still in page-mode
for (PagingStore store : stores.values())
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -38,7 +38,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.Page;
-import org.jboss.messaging.core.paging.PageMessage;
+import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -212,7 +212,7 @@
return false;
}
page.open();
- PageMessage messages[] = page.read();
+ PagedMessage messages[] = page.read();
boolean addressNotFull = pagingManager.onDepage(page.getPageId(), storeName, PagingStoreImpl.this, messages);
page.delete();
@@ -233,14 +233,12 @@
try
{
-
if (numberOfPages == 0)
{
return null;
}
else
{
-
numberOfPages--;
final Page returnPage;
@@ -289,7 +287,7 @@
}
- public boolean page(final PageMessage message) throws Exception
+ public boolean page(final PagedMessage message) throws Exception
{
// Max-size is set, but reject is activated, what means.. never page on
// this address
Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -75,8 +75,7 @@
void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
- /** Create an area that will get LargeMessage bytes on the server size*/
- ServerLargeMessage createLargeMessageStorage();
+ ServerLargeMessage createLargeMessage();
void prepare(long txID, Xid xid) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -227,8 +227,7 @@
return idGenerator.generateID();
}
- /** Create an area that will get LargeMessage bytes on the server size*/
- public ServerLargeMessage createLargeMessageStorage()
+ public ServerLargeMessage createLargeMessage()
{
return new JournalLargeMessageImpl(this);
}
@@ -401,7 +400,7 @@
{
case ADD_LARGE_MESSAGE:
{
- ServerLargeMessage largeMessage = this.createLargeMessageStorage();
+ ServerLargeMessage largeMessage = this.createLargeMessage();
LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -153,7 +153,7 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.persistence.StorageManager#createLargeMessageStorage(long, int, int)
*/
- public ServerLargeMessage createLargeMessageStorage()
+ public ServerLargeMessage createLargeMessage()
{
return new NullStorageLargeMessageImpl();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -47,8 +47,6 @@
int incrementDurableRefCount();
- int getRefCount();
-
ServerMessage copy();
int getMemoryEstimate();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -118,11 +118,6 @@
return refCount.decrementAndGet();
}
- public int getRefCount()
- {
- return refCount.get();
- }
-
public int getMemoryEstimate()
{
// This is just an estimate...
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -2412,7 +2412,7 @@
private ServerLargeMessage createLargeMessageStorage(long producerID, long messageID, byte[] header) throws Exception
{
- ServerLargeMessage largeMessage = storageManager.createLargeMessageStorage();
+ ServerLargeMessage largeMessage = storageManager.createLargeMessage();
MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -49,18 +49,6 @@
// Public --------------------------------------------------------
- public void testPageWithAIO() throws Exception
- {
- if (!AsynchronousFileImpl.isLoaded())
- {
- fail(String.format("libAIO is not loaded on %s %s %s",
- System.getProperty("os.name"),
- System.getProperty("os.arch"),
- System.getProperty("os.version")));
- }
- testAdd(new AIOSequentialFileFactory(journalDir), 1000);
- }
-
public void testPageWithNIO() throws Exception
{
testAdd(new NIOSequentialFileFactory(journalDir), 1000);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -26,9 +26,9 @@
import java.nio.ByteBuffer;
import org.jboss.messaging.core.paging.Page;
-import org.jboss.messaging.core.paging.PageMessage;
+import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
+import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreFactoryNIO;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
@@ -77,17 +77,17 @@
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
- assertFalse(store.page(new PageMessageImpl(msg)));
+ assertFalse(store.page(new PagedMessageImpl(msg)));
store.startPaging();
- assertTrue(store.page(new PageMessageImpl(msg)));
+ assertTrue(store.page(new PagedMessageImpl(msg)));
Page page = store.depage();
page.open();
- PageMessage msgs[] = page.read();
+ PagedMessage msgs[] = page.read();
page.close();
@@ -99,7 +99,7 @@
assertNull(store.depage());
- assertFalse(store.page(new PageMessageImpl(msg)));
+ assertFalse(store.page(new PagedMessageImpl(msg)));
}
public void testPagingManagerAddressFull() throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -31,7 +31,6 @@
*/
public class PageImplTest extends PageImplTestBase
{
-
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -42,11 +41,6 @@
// Public --------------------------------------------------------
- public void testPageFakeWithCallbacks() throws Exception
- {
- testAdd(new FakeSequentialFileFactory(512, true), 10);
- }
-
public void testPageFakeWithoutCallbacks() throws Exception
{
testAdd(new FakeSequentialFileFactory(1, false), 10);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -27,9 +27,9 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.paging.PageMessage;
+import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.impl.PageImpl;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
+import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -99,7 +99,7 @@
msg.setDestination(simpleDestination);
- impl.write(new PageMessageImpl(msg));
+ impl.write(new PagedMessageImpl(msg));
assertEquals(i + 1, impl.getNumberOfMessages());
}
@@ -111,7 +111,7 @@
file.open();
impl = new PageImpl(factory, file, 10);
- PageMessage msgs[] = impl.read();
+ PagedMessage msgs[] = impl.read();
assertEquals(numberOfElements, msgs.length);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -27,10 +27,10 @@
import org.easymock.EasyMock;
import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PageMessage;
+import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
+import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -88,9 +88,9 @@
EasyMock.expect(queue.addLast(ref)).andReturn(null);
EasyMock.replay(spi, store, message, storageManager, po, ref, queue);
SimpleString queueName = new SimpleString("aq");
- PageMessageImpl pageMessage = new PageMessageImpl(message);
+ PagedMessageImpl pageMessage = new PagedMessageImpl(message);
- manager.onDepage(0, queueName, store, new PageMessage[] {pageMessage} );
+ manager.onDepage(0, queueName, store, new PagedMessage[] {pageMessage} );
EasyMock.verify(spi, store, message, storageManager, po, ref, queue);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -28,9 +28,9 @@
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.Page;
-import org.jboss.messaging.core.paging.PageMessage;
+import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
+import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -95,7 +95,7 @@
buffers.add(buffer);
SimpleString destination = new SimpleString("test");
- PageMessageImpl msg = createMessage(destination, buffer);
+ PagedMessageImpl msg = createMessage(destination, buffer);
assertTrue(storeImpl.isPaging());
@@ -137,7 +137,7 @@
buffers.add(buffer);
- PageMessageImpl msg = createMessage(destination, buffer);
+ PagedMessageImpl msg = createMessage(destination, buffer);
assertTrue(storeImpl.page(msg));
}
@@ -150,7 +150,7 @@
page.open();
- PageMessage msg[] = page.read();
+ PagedMessage msg[] = page.read();
assertEquals(10, msg.length);
assertEquals(1, storeImpl.getNumberOfPages());
@@ -203,7 +203,7 @@
storeImpl.forceAnotherPage();
}
- PageMessageImpl msg = createMessage(destination, buffer);
+ PagedMessageImpl msg = createMessage(destination, buffer);
assertTrue(storeImpl.page(msg));
}
@@ -218,7 +218,7 @@
page.open();
- PageMessage msg[] = page.read();
+ PagedMessage msg[] = page.read();
page.close();
@@ -235,7 +235,7 @@
assertTrue(storeImpl.isPaging());
- PageMessageImpl msg = createMessage(destination, buffers.get(0));
+ PagedMessageImpl msg = createMessage(destination, buffers.get(0));
assertTrue(storeImpl.page(msg));
@@ -265,7 +265,7 @@
page.open();
- PageMessage msgs[] = page.read();
+ PagedMessage msgs[] = page.read();
assertEquals(1, msgs.length);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-11-20 18:34:03 UTC (rev 5402)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-11-20 19:10:37 UTC (rev 5403)
@@ -35,8 +35,8 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.Page;
-import org.jboss.messaging.core.paging.PageMessage;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
+import org.jboss.messaging.core.paging.PagedMessage;
+import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
@@ -95,7 +95,7 @@
final CountDownLatch latchStart = new CountDownLatch(numberOfThreads);
- final ConcurrentHashMap<Long, PageMessageImpl> buffers = new ConcurrentHashMap<Long, PageMessageImpl>();
+ final ConcurrentHashMap<Long, PagedMessageImpl> buffers = new ConcurrentHashMap<Long, PagedMessageImpl>();
final ArrayList<Page> readPages = new ArrayList<Page>();
@@ -133,7 +133,7 @@
while (true)
{
long id = messageIdGenerator.incrementAndGet();
- PageMessageImpl msg = createMessage(destination, createRandomBuffer(id, 5));
+ PagedMessageImpl msg = createMessage(destination, createRandomBuffer(id, 5));
if (storeImpl.page(msg))
{
buffers.put(id, msg);
@@ -219,21 +219,21 @@
System.out.println("Reading " + buffers.size() + " messages, " + readPages.size() + " pages");
- final ConcurrentHashMap<Long, PageMessage> buffers2 = new ConcurrentHashMap<Long, PageMessage>();
+ final ConcurrentHashMap<Long, PagedMessage> buffers2 = new ConcurrentHashMap<Long, PagedMessage>();
for (Page page : readPages)
{
page.open();
- PageMessage msgs[] = page.read();
+ PagedMessage msgs[] = page.read();
page.close();
- for (PageMessage msg : msgs)
+ for (PagedMessage msg : msgs)
{
(msg.getMessage(null)).getBody().rewind();
long id = (msg.getMessage(null)).getBody().getLong();
(msg.getMessage(null)).getBody().rewind();
- PageMessageImpl msgWritten = buffers.remove(id);
+ PagedMessageImpl msgWritten = buffers.remove(id);
buffers2.put(id, msg);
assertNotNull(msgWritten);
assertEquals((msg.getMessage(null)).getDestination(), (msgWritten.getMessage(null)).getDestination());
@@ -269,7 +269,7 @@
assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
long lastMessageId = messageIdGenerator.incrementAndGet();
- PageMessage lastMsg = createMessage(destination, createRandomBuffer(lastMessageId, 5));
+ PagedMessage lastMsg = createMessage(destination, createRandomBuffer(lastMessageId, 5));
storeImpl2.page(lastMsg);
buffers2.put(lastMessageId, lastMsg);
@@ -287,16 +287,16 @@
page.open();
- PageMessage[] msgs = page.read();
+ PagedMessage[] msgs = page.read();
page.close();
- for (PageMessage msg : msgs)
+ for (PagedMessage msg : msgs)
{
(msg.getMessage(null)).getBody().rewind();
long id = (msg.getMessage(null)).getBody().getLong();
- PageMessage msgWritten = buffers2.remove(id);
+ PagedMessage msgWritten = buffers2.remove(id);
assertNotNull(msgWritten);
assertEquals((msg.getMessage(null)).getDestination(), (msgWritten.getMessage(null)).getDestination());
assertEqualsByteArrays((msgWritten.getMessage(null)).getBody().array(), (msg.getMessage(null)).getBody()
@@ -305,7 +305,7 @@
}
lastPage.open();
- PageMessage lastMessages[] = lastPage.read();
+ PagedMessage lastMessages[] = lastPage.read();
lastPage.close();
assertEquals(1, lastMessages.length);
@@ -318,7 +318,7 @@
}
- protected PageMessageImpl createMessage(final SimpleString destination, final ByteBuffer buffer)
+ protected PagedMessageImpl createMessage(final SimpleString destination, final ByteBuffer buffer)
{
ServerMessage msg = new ServerMessageImpl((byte)1,
true,
@@ -328,7 +328,7 @@
new ByteBufferWrapper(buffer));
msg.setDestination(destination);
- return new PageMessageImpl(msg);
+ return new PagedMessageImpl(msg);
}
protected ByteBuffer createRandomBuffer(final long id, final int size)
More information about the jboss-cvs-commits
mailing list