JBoss hornetq SVN: r11902 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-13 12:04:05 -0500 (Tue, 13 Dec 2011)
New Revision: 11902
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
JBPAPP-7730 - Deadlock on testsuite (failover on large messages)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-13 13:14:08 UTC (rev 11901)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-13 17:04:05 UTC (rev 11902)
@@ -76,6 +76,13 @@
private final boolean browseOnly;
private final Executor sessionExecutor;
+
+ // For failover we can't send credits back
+ // while holding a lock or failover could dead lock eventually
+ // And we can't use the sessionExecutor as that's being used for message handlers
+ // for that reason we have a separate flowControlExecutor that's using the thread pool
+ // Which is a OrderedExecutor
+ private final Executor flowControlExecutor;
private final int clientWindowSize;
@@ -135,6 +142,7 @@
final int ackBatchSize,
final TokenBucketLimiter rateLimiter,
final Executor executor,
+ final Executor flowControlExecutor,
final Channel channel,
final SessionQueueQueryResponseMessage queueInfo,
final ClassLoader contextClassLoader)
@@ -162,6 +170,8 @@
this.queueInfo = queueInfo;
this.contextClassLoader = contextClassLoader;
+
+ this.flowControlExecutor = flowControlExecutor;
}
// ClientConsumer implementation
@@ -846,7 +856,13 @@
*/
private void sendCredits(final int credits)
{
- channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ flowControlExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ }
+ });
}
private void waitForOnMessageToComplete(boolean waitForOnMessage)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-13 13:14:08 UTC (rev 11901)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-13 17:04:05 UTC (rev 11902)
@@ -830,6 +830,7 @@
connection,
response.getServerVersion(),
sessionChannel,
+ orderedExecutorFactory.getExecutor(),
orderedExecutorFactory.getExecutor());
synchronized (sessions)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-13 13:14:08 UTC (rev 11901)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-13 17:04:05 UTC (rev 11902)
@@ -129,6 +129,9 @@
private final boolean xa;
private final Executor executor;
+
+ // to be sent to consumers as consumers will need a separate consumer for flow control
+ private final Executor flowControlExecutor;
private volatile CoreRemotingConnection remotingConnection;
@@ -228,7 +231,8 @@
final CoreRemotingConnection remotingConnection,
final int version,
final Channel channel,
- final Executor executor) throws HornetQException
+ final Executor executor,
+ final Executor flowControlExecutor) throws HornetQException
{
this.sessionFactory = sessionFactory;
@@ -241,6 +245,8 @@
this.remotingConnection = remotingConnection;
this.executor = executor;
+
+ this.flowControlExecutor = flowControlExecutor;
this.xa = xa;
@@ -1795,6 +1801,7 @@
false)
: null,
executor,
+ flowControlExecutor,
channel,
queueInfo,
lookupTCCL());
13 years, 3 months
JBoss hornetq SVN: r11900 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710: src/main/org/hornetq/core/config and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-12 17:30:38 -0500 (Mon, 12 Dec 2011)
New Revision: 11900
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/schema/hornetq-configuration.xsd
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/config/Configuration.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/config/ConfigurationTest-full-config.xml
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
Log:
JBPAPP-7655 - The customer is facing an issue that's directly related to the way concurrent paging will happen, and multiple pages being accessed. We are adding a direct allocation on Paging (and releasing it manually what is causing issues with NIO), and we are also dealing with some max-IO on paging..
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/schema/hornetq-configuration.xsd 2011-12-12 22:03:12 UTC (rev 11899)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/schema/hornetq-configuration.xsd 2011-12-12 22:30:38 UTC (rev 11900)
@@ -146,6 +146,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="create-bindings-dir" type="xsd:boolean">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="page-max-concurrent-io" type="xsd:string">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="journal-directory" type="xsd:string">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="create-journal-dir" type="xsd:boolean">
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/config/Configuration.java 2011-12-12 22:03:12 UTC (rev 11899)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/config/Configuration.java 2011-12-12 22:30:38 UTC (rev 11900)
@@ -473,6 +473,15 @@
* Sets the file system directory used to store bindings.
*/
void setBindingsDirectory(String dir);
+
+ /** The max number of concurrent reads allowed on paging.
+ *
+ * Default = 5 */
+ int getPageMaxConcurrentIO();
+
+ /** The max number of concurrent reads allowed on paging.
+ * Default = 5 */
+ void setPageMaxConcurrentIO(int maxIO);
/**
* Returns the file system directory used to store journal log.
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2011-12-12 22:03:12 UTC (rev 11899)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2011-12-12 22:30:38 UTC (rev 11900)
@@ -88,6 +88,8 @@
public static final String DEFAULT_PAGING_DIR = "data/paging";
public static final String DEFAULT_LARGE_MESSAGES_DIR = "data/largemessages";
+
+ public static final int DEFAULT_MAX_CONCURRENT_PAGE_IO = 5;
public static final boolean DEFAULT_CREATE_JOURNAL_DIR = true;
@@ -255,6 +257,8 @@
protected String pagingDirectory = ConfigurationImpl.DEFAULT_PAGING_DIR;
// File related attributes -----------------------------------------------------------
+
+ protected int maxConcurrentPageIO = ConfigurationImpl.DEFAULT_MAX_CONCURRENT_PAGE_IO;
protected String largeMessagesDirectory = ConfigurationImpl.DEFAULT_LARGE_MESSAGES_DIR;
@@ -613,7 +617,25 @@
{
bindingsDirectory = dir;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.config.Configuration#getPageMaxConcurrentIO()
+ */
+ public int getPageMaxConcurrentIO()
+ {
+ return maxConcurrentPageIO;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.config.Configuration#setPageMaxConcurrentIO(int)
+ */
+ public void setPageMaxConcurrentIO(int maxIO)
+ {
+ this.maxConcurrentPageIO = maxIO;
+ }
+
+
public String getJournalDirectory()
{
return journalDirectory;
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-12-12 22:03:12 UTC (rev 11899)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-12-12 22:30:38 UTC (rev 11900)
@@ -424,6 +424,12 @@
"journal-directory",
config.getJournalDirectory(),
Validators.NOT_NULL_OR_EMPTY));
+
+
+ config.setPageMaxConcurrentIO(XMLConfigurationUtil.getInteger(e,
+ "page-max-concurrent-io",
+ 5,
+ Validators.MINUS_ONE_OR_GT_ZERO));
config.setPagingDirectory(XMLConfigurationUtil.getString(e,
"paging-directory",
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-12 22:03:12 UTC (rev 11899)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-12 22:30:38 UTC (rev 11900)
@@ -31,6 +31,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import javax.transaction.xa.Xid;
@@ -156,6 +157,8 @@
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
private UUID persistentID;
+
+ private final Semaphore pageMaxConcurrentIO;
private final BatchingIDGenerator idGenerator;
@@ -318,6 +321,15 @@
largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false);
perfBlastPages = config.getJournalPerfBlastPages();
+
+ if (config.getPageMaxConcurrentIO() != 1)
+ {
+ pageMaxConcurrentIO = new Semaphore(config.getPageMaxConcurrentIO());
+ }
+ else
+ {
+ pageMaxConcurrentIO = null;
+ }
}
public void clearContext()
@@ -1538,8 +1550,10 @@
*/
public void beforePageRead() throws Exception
{
- // TODO Auto-generated method stub
-
+ if (pageMaxConcurrentIO != null)
+ {
+ pageMaxConcurrentIO.acquire();
+ }
}
/* (non-Javadoc)
@@ -1547,8 +1561,10 @@
*/
public void afterPageRead() throws Exception
{
- // TODO Auto-generated method stub
-
+ if (pageMaxConcurrentIO != null)
+ {
+ pageMaxConcurrentIO.release();
+ }
}
/* (non-Javadoc)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/config/ConfigurationTest-full-config.xml 2011-12-12 22:03:12 UTC (rev 11899)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/config/ConfigurationTest-full-config.xml 2011-12-12 22:30:38 UTC (rev 11900)
@@ -36,6 +36,7 @@
<create-bindings-dir>false</create-bindings-dir>
<journal-directory>somedir2</journal-directory>
<create-journal-dir>false</create-journal-dir>
+ <page-max-concurrent-io>17</page-max-concurrent-io>
<journal-type>NIO</journal-type>
<journal-compact-min-files>123</journal-compact-min-files>
<journal-compact-percentage>33</journal-compact-percentage>
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2011-12-12 22:03:12 UTC (rev 11899)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2011-12-12 22:30:38 UTC (rev 11900)
@@ -70,6 +70,8 @@
Assert.assertEquals("pagingdir", conf.getPagingDirectory());
Assert.assertEquals("somedir", conf.getBindingsDirectory());
Assert.assertEquals(false, conf.isCreateBindingsDir());
+
+ Assert.assertEquals(17, conf.getPageMaxConcurrentIO());
Assert.assertEquals("somedir2", conf.getJournalDirectory());
Assert.assertEquals(false, conf.isCreateJournalDir());
Assert.assertEquals(JournalType.NIO, conf.getJournalType());
13 years, 3 months
JBoss hornetq SVN: r11899 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710: src/main/org/hornetq/core/journal/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-12 17:03:12 -0500 (Mon, 12 Dec 2011)
New Revision: 11899
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/SequentialFileFactory.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
JBPAPP-7655 - The customer is facing an issue that's directly related to the way concurrent paging will happen, and multiple pages being accessed. We are adding a direct allocation on Paging (and releasing it manually what is causing issues with NIO), and we are also dealing with some max-IO on paging.. Adding this to JBPAPP-7710
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2011-12-12 21:36:13 UTC (rev 11898)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2011-12-12 22:03:12 UTC (rev 11899)
@@ -32,6 +32,14 @@
boolean isSupportsCallbacks();
+ /** used for cases where you need direct buffer outside of the journal context.
+ * This is because the native layer has a method that can be reused in certain cases like paging */
+ ByteBuffer allocateDirectBuffer(int size);
+
+ /** used for cases where you need direct buffer outside of the journal context.
+ * This is because the native layer has a method that can be reused in certain cases like paging */
+ void releaseDirectBuffer(ByteBuffer buffer);
+
/**
* Note: You need to release the buffer if is used for reading operations.
* You don't need to do it if using writing operations (AIO Buffer Lister will take of writing operations)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2011-12-12 21:36:13 UTC (rev 11898)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2011-12-12 22:03:12 UTC (rev 11899)
@@ -93,7 +93,29 @@
{
return AsynchronousFileImpl.isLoaded();
}
+
+ public ByteBuffer allocateDirectBuffer(final int size)
+ {
+
+ int blocks = size / 512;
+ if (size % 512 != 0)
+ {
+ blocks ++;
+ }
+
+ // The buffer on AIO has to be a multiple of 512
+ ByteBuffer buffer = AsynchronousFileImpl.newBuffer(blocks * 512);
+
+ buffer.limit(size);
+ return buffer;
+ }
+
+ public void releaseDirectBuffer(final ByteBuffer buffer)
+ {
+ AsynchronousFileImpl.destroyBuffer(buffer);
+ }
+
public ByteBuffer newBuffer(int size)
{
if (size % 512 != 0)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2011-12-12 21:36:13 UTC (rev 11898)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2011-12-12 22:03:12 UTC (rev 11899)
@@ -74,6 +74,17 @@
{
return timedBuffer != null;
}
+
+
+ public ByteBuffer allocateDirectBuffer(final int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ public void releaseDirectBuffer(ByteBuffer buffer)
+ {
+ // nothing we can do on this case. we can just have good faith on GC
+ }
public ByteBuffer newBuffer(final int size)
{
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-12 21:36:13 UTC (rev 11898)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-12 22:03:12 UTC (rev 11899)
@@ -1556,7 +1556,7 @@
*/
public ByteBuffer allocateDirectBuffer(int size)
{
- return journalFF.newBuffer(size);
+ return journalFF.allocateDirectBuffer(size);
}
/* (non-Javadoc)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-12-12 21:36:13 UTC (rev 11898)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-12-12 22:03:12 UTC (rev 11899)
@@ -757,4 +757,19 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFileFactory#newDirectBuffer(int)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFileFactory#releaseDirectBuffer(java.nio.ByteBuffer)
+ */
+ public void releaseDirectBuffer(ByteBuffer buffer)
+ {
+ }
+
}
13 years, 3 months
JBoss hornetq SVN: r11898 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710: src/main/org/hornetq/core/paging/impl and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-12 16:36:13 -0500 (Mon, 12 Dec 2011)
New Revision: 11898
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
JBPAPP-7655 - The customer is facing an issue that's directly related to the way concurrent paging will happen, and multiple pages being accessed. We are adding a direct allocation on Paging (and releasing it manually what is causing issues with NIO), and we are also dealing with some max-IO on paging
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-12-12 21:36:13 UTC (rev 11898)
@@ -183,6 +183,7 @@
{
page = pagingStore.createPage((int)pageId);
+ storageManager.beforePageRead();
page.open();
List<PagedMessage> pgdMessages = page.read(storageManager);
@@ -200,6 +201,7 @@
catch (Throwable ignored)
{
}
+ storageManager.afterPageRead();
cache.unlock();
}
}
@@ -451,8 +453,26 @@
// The page is not on cache any more
// We need to read the page-file before deleting it
// to make sure we remove any large-messages pending
- depagedPage.open();
- List<PagedMessage> pgdMessagesList = depagedPage.read(storageManager);
+ storageManager.beforePageRead();
+
+ List<PagedMessage> pgdMessagesList = null;
+ try
+ {
+ depagedPage.open();
+ pgdMessagesList = depagedPage.read(storageManager);
+ }
+ finally
+ {
+ try
+ {
+ depagedPage.close();
+ }
+ catch (Exception e)
+ {
+ }
+
+ storageManager.afterPageRead();
+ }
depagedPage.close();
pgdMessages = pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]);
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-12 21:36:13 UTC (rev 11898)
@@ -1290,7 +1290,11 @@
public void remove()
{
deliveredCount.incrementAndGet();
- PageSubscriptionImpl.this.getPageInfo(position).remove(position);
+ PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(position);
+ if (info != null)
+ {
+ info.remove(position);
+ }
}
/* (non-Javadoc)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-12-12 21:36:13 UTC (rev 11898)
@@ -107,68 +107,77 @@
public List<PagedMessage> read(StorageManager storage) throws Exception
{
- if (isDebug)
- {
- log.debug("reading page " + this.pageId + " on address = " + storeName);
- }
-
+ if (isDebug)
+ {
+ log.debug("reading page " + this.pageId + " on address = " + storeName);
+ }
+
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
size.set((int)file.size());
// Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
- ByteBuffer buffer2 = ByteBuffer.allocateDirect(size.get());
-
- file.position(0);
- file.read(buffer2);
+ ByteBuffer directBuffer = storage.allocateDirectBuffer((int)file.size());
- buffer2.rewind();
+ try
+ {
- HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(buffer2);
- fileBuffer.writerIndex(fileBuffer.capacity());
+ file.position(0);
+ file.read(directBuffer);
- while (fileBuffer.readable())
- {
- final int position = fileBuffer.readerIndex();
+ directBuffer.rewind();
- byte byteRead = fileBuffer.readByte();
+ HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(directBuffer);
+ fileBuffer.writerIndex(fileBuffer.capacity());
- if (byteRead == PageImpl.START_BYTE)
+ while (fileBuffer.readable())
{
- if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < fileBuffer.capacity())
+ final int position = fileBuffer.readerIndex();
+
+ byte byteRead = fileBuffer.readByte();
+
+ if (byteRead == PageImpl.START_BYTE)
{
- int messageSize = fileBuffer.readInt();
- int oldPos = fileBuffer.readerIndex();
- if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == PageImpl.END_BYTE)
+ if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < fileBuffer.capacity())
{
- PagedMessage msg = new PagedMessageImpl();
- msg.decode(fileBuffer);
- byte b = fileBuffer.readByte();
- if (b != PageImpl.END_BYTE)
+ int messageSize = fileBuffer.readInt();
+ int oldPos = fileBuffer.readerIndex();
+ if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == PageImpl.END_BYTE)
{
- // Sanity Check: This would only happen if there is a bug on decode or any internal code, as this
- // constraint was already checked
- throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
+ PagedMessage msg = new PagedMessageImpl();
+ msg.decode(fileBuffer);
+ byte b = fileBuffer.readByte();
+ if (b != PageImpl.END_BYTE)
+ {
+ // Sanity Check: This would only happen if there is a bug on decode or any internal code, as
+ // this
+ // constraint was already checked
+ throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
+ }
+ msg.initMessage(storage);
+ if (isTrace)
+ {
+ log.trace("Reading message " + msg + " on pageId=" + this.pageId + " for address=" + storeName);
+ }
+ messages.add(msg);
}
- msg.initMessage(storage);
- if (isTrace)
+ else
{
- log.trace("Reading message " + msg + " on pageId=" + this.pageId + " for address=" + storeName);
+ markFileAsSuspect(position, messages.size());
+ break;
}
- messages.add(msg);
}
- else
- {
- markFileAsSuspect(position, messages.size());
- break;
- }
}
+ else
+ {
+ markFileAsSuspect(position, messages.size());
+ break;
+ }
}
- else
- {
- markFileAsSuspect(position, messages.size());
- break;
- }
}
+ finally
+ {
+ storage.freeDirectuffer(directBuffer);
+ }
numberOfMessages.set(messages.size());
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java 2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java 2011-12-12 21:36:13 UTC (rev 11898)
@@ -13,6 +13,7 @@
package org.hornetq.core.persistence;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -88,6 +89,34 @@
* in case of the pools are full
* @throws Exception */
void waitOnOperations() throws Exception;
+
+ /**
+ * We need a safeguard in place to avoid too much concurrent IO happening on Paging,
+ * otherwise the system may become irrensponsive if too many destinations are reading all the same time.
+ * This is called before we read, so we can limit concurrent reads
+ * @throws Exception
+ */
+ void beforePageRead() throws Exception;
+
+ /**
+ * We need a safeguard in place to avoid too much concurrent IO happening on Paging,
+ * otherwise the system may become irrensponsive if too many destinations are reading all the same time.
+ * This is called after we read, so we can limit concurrent reads
+ * @throws Exception
+ */
+ void afterPageRead() throws Exception;
+
+
+ /** AIO has an optimized buffer which has a method to release it
+ instead of the way NIO will release data based on GC.
+ These methods will use that buffer if the inner method supports it */
+ ByteBuffer allocateDirectBuffer(int size);
+
+ /** AIO has an optimized buffer which has a method to release it
+ instead of the way NIO will release data based on GC.
+ These methods will use that buffer if the inner method supports it */
+ void freeDirectuffer(ByteBuffer buffer);
+
void clearContext();
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-12 21:36:13 UTC (rev 11898)
@@ -166,6 +166,8 @@
private final Journal bindingsJournal;
private final SequentialFileFactory largeMessagesFactory;
+
+ private SequentialFileFactory journalFF = null;
private volatile boolean started;
@@ -261,8 +263,6 @@
syncTransactional = config.isJournalSyncTransactional();
- SequentialFileFactory journalFF = null;
-
if (config.getJournalType() == JournalType.ASYNCIO)
{
JournalStorageManager.log.info("Using AIO Journal");
@@ -1532,6 +1532,41 @@
return info;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#startPageRead()
+ */
+ public void beforePageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#finishPageRead()
+ */
+ public void afterPageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(long)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ return journalFF.newBuffer(size);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+ */
+ public void freeDirectuffer(ByteBuffer buffer)
+ {
+ journalFF.releaseBuffer(buffer);
+ }
+
// Public -----------------------------------------------------------------------------------
public Journal getMessageJournal()
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-12-12 21:36:13 UTC (rev 11898)
@@ -13,6 +13,7 @@
package org.hornetq.core.persistence.impl.nullpm;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -569,4 +570,36 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#startPageRead()
+ */
+ public void beforePageRead() throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#finishPageRead()
+ */
+ public void afterPageRead() throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(int)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+ */
+ public void freeDirectuffer(ByteBuffer buffer)
+ {
+ // nothing to be done.. just wait for GC
+ }
+
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-12-12 21:36:13 UTC (rev 11898)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.paging.impl;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -1680,6 +1681,41 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#startPageRead()
+ */
+ public void beforePageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#finishPageRead()
+ */
+ public void afterPageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(int)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+ */
+ public void freeDirectuffer(ByteBuffer buffer)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
13 years, 3 months
JBoss hornetq SVN: r11897 - branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-11 00:23:46 -0500 (Sun, 11 Dec 2011)
New Revision: 11897
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
Log:
JBPAPP-7710 - Back porting fixed from 2.2.8 into 2.2.5 _JBPAPP_7242 as the Customer is having issues
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-11 04:35:32 UTC (rev 11896)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-11 05:23:46 UTC (rev 11897)
@@ -596,7 +596,18 @@
{
return;
}
- currentLargeMessageController.addPacket(chunk);
+ if (currentLargeMessageController == null)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Sending back credits for largeController = null " + chunk.getPacketSize());
+ }
+ flowControl(chunk.getPacketSize(), false);
+ }
+ else
+ {
+ currentLargeMessageController.addPacket(chunk);
+ }
}
public void clear(boolean waitForOnMessage) throws HornetQException
13 years, 3 months
JBoss hornetq SVN: r11896 - branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-10 23:35:32 -0500 (Sat, 10 Dec 2011)
New Revision: 11896
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
JBPAPP-7710 - Back porting fixed from 2.2.8 into 2.2.5 _JBPAPP_7242 as the Customer is having issues
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-11 04:07:09 UTC (rev 11895)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-11 04:35:32 UTC (rev 11896)
@@ -951,7 +951,6 @@
public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception
{
- deliveringCount.decrementAndGet();
if (checkRedelivery(reference, timeBase))
{
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false))
13 years, 3 months
JBoss hornetq SVN: r11895 - branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-10 23:07:09 -0500 (Sat, 10 Dec 2011)
New Revision: 11895
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/hornetq-version.properties
Log:
changing version id
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/hornetq-version.properties
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/hornetq-version.properties 2011-12-11 01:58:20 UTC (rev 11894)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/config/common/hornetq-version.properties 2011-12-11 04:07:09 UTC (rev 11895)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=HQ_2_2_5_GA_EAP_JBPAPP-7242
+hornetq.version.versionName=HQ_2_2_5_GA_EAP_JBPAPP-7710
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=5
13 years, 3 months
JBoss hornetq SVN: r11894 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710: src/main/org/hornetq/core/management/impl and 9 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-10 20:58:20 -0500 (Sat, 10 Dec 2011)
New Revision: 11894
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
JBPAPP-7710 - Back porting fixed from 2.2.8 into 2.2.5 _JBPAPP_7242 as the Customer is having issues
Added: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh (rev 0)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh 2011-12-11 01:58:20 UTC (rev 11894)
@@ -0,0 +1,2 @@
+#you need to define this variable on mac
+./build.sh -Djdk5.home=/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home/ "$@"
Property changes on: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh
___________________________________________________________________
Added: svn:executable
+ *
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-12-11 01:58:20 UTC (rev 11894)
@@ -400,7 +400,7 @@
{
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<Map<String, Object>>();
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
LinkedListIterator<MessageReference> iterator = queue.iterator();
try
{
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-12-11 01:58:20 UTC (rev 11894)
@@ -107,10 +107,10 @@
public List<PagedMessage> read(StorageManager storage) throws Exception
{
- if (isDebug)
- {
- log.debug("reading page " + this.pageId + " on address = " + storeName);
- }
+ if (isDebug)
+ {
+ log.debug("reading page " + this.pageId + " on address = " + storeName);
+ }
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
@@ -212,7 +212,10 @@
public void open() throws Exception
{
- file.open();
+ if (!file.isOpen())
+ {
+ file.open();
+ }
size.set((int)file.size());
file.position(0);
}
@@ -307,6 +310,21 @@
{
return otherPage.getPageId() - this.pageId;
}
+
+ public void finalize()
+ {
+ try
+ {
+ if (file != null && file.isOpen())
+ {
+ file.close();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
/* (non-Javadoc)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-12-11 01:58:20 UTC (rev 11894)
@@ -730,6 +730,12 @@
{
return notificationLock;
}
+
+ // For tests
+ public AddressManager getAddressManager()
+ {
+ return addressManager;
+ }
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
{
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java 2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java 2011-12-11 01:58:20 UTC (rev 11894)
@@ -167,7 +167,7 @@
void resetAllIterators();
- boolean blockOnExecutorFuture();
+ boolean flushExecutor();
void close() throws Exception;
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-11 01:58:20 UTC (rev 11894)
@@ -24,11 +24,14 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
@@ -428,7 +431,7 @@
{
// We must block on the executor to ensure any async deliveries have completed or we might get out of order
// deliveries
- if (blockOnExecutorFuture())
+ if (flushExecutor())
{
// Go into direct delivery mode
directDeliver = true;
@@ -449,7 +452,7 @@
directDeliver = false;
- executor.execute(concurrentPoller);
+ getExecutor().execute(concurrentPoller);
}
public void forceDelivery()
@@ -458,14 +461,14 @@
{
if (isTrace)
{
- log.trace("Force delivery scheduling depage");
+ log.trace("Force delivery scheduling depage");
}
scheduleDepage();
}
if (isTrace)
{
- log.trace("Force delivery deliverying async");
+ log.trace("Force delivery deliverying async");
}
deliverAsync();
@@ -473,7 +476,13 @@
public void deliverAsync()
{
- getExecutor().execute(deliverRunner);
+ try
+ {
+ getExecutor().execute(deliverRunner);
+ }
+ catch (RejectedExecutionException ignored)
+ {
+ }
}
public void close() throws Exception
@@ -483,7 +492,20 @@
checkQueueSizeFuture.cancel(false);
}
- cancelRedistributor();
+ getExecutor().execute(new Runnable(){
+ public void run()
+ {
+ try
+ {
+ cancelRedistributor();
+ }
+ catch (Exception e)
+ {
+ // nothing that could be done anyway.. just logging
+ log.warn(e.getMessage(), e);
+ }
+ }
+ });
}
public Executor getExecutor()
@@ -504,14 +526,14 @@
{
deliverAsync();
- blockOnExecutorFuture();
+ flushExecutor();
}
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
Future future = new Future();
- executor.execute(future);
+ getExecutor().execute(future);
boolean ok = future.await(10000);
@@ -780,9 +802,31 @@
public long getMessageCount()
{
- blockOnExecutorFuture();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicLong count = new AtomicLong(0);
- return getInstantMessageCount();
+ getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ count.set(getInstantMessageCount());
+ latch.countDown();
+ }
+ });
+
+ try
+ {
+ if (!latch.await(10, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Timed out on waiting for MessageCount");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+
+ return count.get();
}
public long getInstantMessageCount()
@@ -907,6 +951,7 @@
public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception
{
+ deliveringCount.decrementAndGet();
if (checkRedelivery(reference, timeBase))
{
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false))
@@ -924,7 +969,7 @@
{
if (isTrace)
{
- log.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName(), new Exception ("trace"));
+ log.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName());
}
move(expiryAddress, ref, true, false);
}
@@ -950,9 +995,31 @@
public long getMessagesAdded()
{
- blockOnExecutorFuture();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicLong count = new AtomicLong(0);
- return getInstantMessagesAdded();
+ getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ count.set(getInstantMessagesAdded());
+ latch.countDown();
+ }
+ });
+
+ try
+ {
+ if (!latch.await(10, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Timed out on waiting for MessagesAdded");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+
+ return count.get();
}
public long getInstantMessagesAdded()
@@ -1137,28 +1204,43 @@
}
}
- public synchronized void expireReferences() throws Exception
+ public void expireReferences() throws Exception
{
- LinkedListIterator<MessageReference> iter = iterator();
-
- try
- {
- while (iter.hasNext())
+ getExecutor().execute(new Runnable(){
+ public void run()
{
- MessageReference ref = iter.next();
- if (ref.getMessage().isExpired())
+ synchronized (QueueImpl.this)
{
- deliveringCount.incrementAndGet();
- expire(ref);
- iter.remove();
- refRemoved(ref);
+ LinkedListIterator<MessageReference> iter = iterator();
+
+ try
+ {
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ try
+ {
+ if (ref.getMessage().isExpired())
+ {
+ deliveringCount.incrementAndGet();
+ expire(ref);
+ iter.remove();
+ refRemoved(ref);
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Error expiring reference " + ref, e);
+ }
+ }
+ }
+ finally
+ {
+ iter.close();
+ }
}
}
- }
- finally
- {
- iter.close();
- }
+ });
}
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
@@ -1486,7 +1568,7 @@
@Override
public String toString()
{
- return "QueueImpl[name=" + name.toString() + "]@" + Integer.toHexString(System.identityHashCode(this));
+ return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + "]@" + Integer.toHexString(System.identityHashCode(this));
}
// Private
@@ -1530,166 +1612,168 @@
// This method will deliver as many messages as possible until all consumers are busy or there are no more matching
// or available messages
- private synchronized void deliver()
+ private void deliver()
{
- if (paused || consumerList.isEmpty())
+ synchronized (this)
{
- return;
- }
-
- if (log.isDebugEnabled())
- {
- log.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
- }
+ if (paused || consumerList.isEmpty())
+ {
+ return;
+ }
- int busyCount = 0;
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
+ }
- int nullRefCount = 0;
+ int busyCount = 0;
- int size = consumerList.size();
+ int nullRefCount = 0;
- int endPos = pos == size - 1 ? 0 : size - 1;
+ int size = consumerList.size();
- int numRefs = messageReferences.size();
+ int endPos = pos == size - 1 ? 0 : size - 1;
- int handled = 0;
-
- long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
+ int numRefs = messageReferences.size();
- while (handled < numRefs)
- {
- if (handled == MAX_DELIVERIES_IN_LOOP)
- {
- // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
+ int handled = 0;
- deliverAsync();
+ long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
- return;
- }
-
- if (System.currentTimeMillis() > timeout)
+ while (handled < numRefs)
{
- if (isTrace)
+ if (handled == MAX_DELIVERIES_IN_LOOP)
{
- log.trace("delivery has been running for too long. Scheduling another delivery task now");
- }
-
- deliverAsync();
-
- return;
- }
-
+ // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
+ // long
- ConsumerHolder holder = consumerList.get(pos);
+ deliverAsync();
- Consumer consumer = holder.consumer;
+ return;
+ }
- if (holder.iter == null)
- {
- holder.iter = messageReferences.iterator();
- }
-
- MessageReference ref;
-
- if (holder.iter.hasNext())
- {
- ref = holder.iter.next();
- }
- else
- {
- ref = null;
- }
-
-
- if (ref == null)
- {
- nullRefCount++;
- }
- else
- {
- if (checkExpired(ref))
+ if (System.currentTimeMillis() > timeout)
{
if (isTrace)
{
- log.trace("Reference " + ref + " being expired");
+ log.trace("delivery has been running for too long. Scheduling another delivery task now");
}
- holder.iter.remove();
- refRemoved(ref);
-
- handled++;
+ deliverAsync();
- continue;
+ return;
}
- Consumer groupConsumer = null;
-
- if (isTrace)
+ ConsumerHolder holder = consumerList.get(pos);
+
+ Consumer consumer = holder.consumer;
+
+ if (holder.iter == null)
{
- log.trace("Queue " + this.getName() + " is delivering reference " + ref);
+ holder.iter = messageReferences.iterator();
}
- // If a group id is set, then this overrides the consumer chosen round-robin
+ MessageReference ref;
- SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+ if (holder.iter.hasNext())
+ {
+ ref = holder.iter.next();
+ }
+ else
+ {
+ ref = null;
+ }
- if (groupID != null)
+ if (ref == null)
{
- groupConsumer = groups.get(groupID);
+ nullRefCount++;
+ }
+ else
+ {
+ if (checkExpired(ref))
+ {
+ if (isTrace)
+ {
+ log.trace("Reference " + ref + " being expired");
+ }
+ holder.iter.remove();
- if (groupConsumer != null)
+ refRemoved(ref);
+
+ handled++;
+
+ continue;
+ }
+
+ Consumer groupConsumer = null;
+
+ if (isTrace)
{
- consumer = groupConsumer;
+ log.trace("Queue " + this.getName() + " is delivering reference " + ref);
}
- }
- HandleStatus status = handle(ref, consumer);
+ // If a group id is set, then this overrides the consumer chosen round-robin
- if (status == HandleStatus.HANDLED)
- {
- holder.iter.remove();
+ SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
- refRemoved(ref);
+ if (groupID != null)
+ {
+ groupConsumer = groups.get(groupID);
- if (groupID != null && groupConsumer == null)
+ if (groupConsumer != null)
+ {
+ consumer = groupConsumer;
+ }
+ }
+
+ HandleStatus status = handle(ref, consumer);
+
+ if (status == HandleStatus.HANDLED)
{
- groups.put(groupID, consumer);
+ holder.iter.remove();
+
+ refRemoved(ref);
+
+ if (groupID != null && groupConsumer == null)
+ {
+ groups.put(groupID, consumer);
+ }
+
+ handled++;
}
+ else if (status == HandleStatus.BUSY)
+ {
+ holder.iter.repeat();
- handled++;
+ busyCount++;
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ }
}
- else if (status == HandleStatus.BUSY)
- {
- holder.iter.repeat();
- busyCount++;
- }
- else if (status == HandleStatus.NO_MATCH)
+ if (pos == endPos)
{
- }
- }
+ // Round robin'd all
- if (pos == endPos)
- {
- // Round robin'd all
-
- if (nullRefCount + busyCount == size)
- {
- if (log.isDebugEnabled())
+ if (nullRefCount + busyCount == size)
{
- log.debug(this + "::All the consumers were busy, giving up now");
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::All the consumers were busy, giving up now");
+ }
+ break;
}
- break;
+
+ nullRefCount = busyCount = 0;
}
- nullRefCount = busyCount = 0;
- }
+ pos++;
- pos++;
-
- if (pos == size)
- {
- pos = 0;
+ if (pos == size)
+ {
+ pos = 0;
+ }
}
}
@@ -1735,7 +1819,7 @@
}
}
- private synchronized void depage()
+ private void depage()
{
depagePending = false;
@@ -1860,7 +1944,7 @@
}
reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
- if (message.isDurable() && durable)
+ if (!reference.isPaged() && message.isDurable() && durable)
{
storageManager.updateScheduledDeliveryTime(reference);
}
@@ -2173,7 +2257,7 @@
}
catch (Exception e)
{
- QueueImpl.log.warn("Unable to decrement reference counting", e);
+ QueueImpl.log.warn("Unable to decrement reference counting", e);
}
}
@@ -2244,6 +2328,10 @@
for (MessageReference ref : refsToAck)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace("rolling back " + ref);
+ }
try
{
if (ref.getQueue().checkRedelivery(ref, timeBase))
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-11 01:58:20 UTC (rev 11894)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientConsumerImpl;
@@ -61,7 +62,7 @@
// Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
-
+
private static boolean isTrace = log.isTraceEnabled();
// Static ---------------------------------------------------------------------------------------
@@ -85,14 +86,12 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-
+
public String debug()
{
return toString() + "::Delivering " + this.deliveringRefs.size();
}
- private boolean largeMessageInDelivery;
-
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
*/
@@ -117,7 +116,7 @@
private final Binding binding;
private boolean transferring = false;
-
+
/* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
* This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
* write queue when the TCP buffer is full, e.g. the client is slow or has died.
@@ -165,11 +164,11 @@
minLargeMessageSize = session.getMinLargeMessageSize();
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
-
+
this.callback.addReadyListener(this);
this.creationTime = System.currentTimeMillis();
-
+
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -187,7 +186,7 @@
{
return id;
}
-
+
public boolean isBrowseOnly()
{
return browseOnly;
@@ -197,12 +196,12 @@
{
return creationTime;
}
-
+
public String getConnectionID()
{
return this.session.getConnectionID().toString();
}
-
+
public String getSessionID()
{
return this.session.getName();
@@ -212,20 +211,23 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (log.isDebugEnabled() )
+ if (log.isDebugEnabled())
{
- log.debug(this + " is busy for the lack of credits!!!");
+ log.debug(this + " is busy for the lack of credits. Current credits = " +
+ availableCredits +
+ " Can't receive reference " +
+ ref);
}
-
+
return HandleStatus.BUSY;
}
-
-// TODO - https://jira.jboss.org/browse/HORNETQ-533
-// if (!writeReady.get())
-// {
-// return HandleStatus.BUSY;
-// }
-
+
+ // TODO - https://jira.jboss.org/browse/HORNETQ-533
+ // if (!writeReady.get())
+ // {
+ // return HandleStatus.BUSY;
+ // }
+
synchronized (lock)
{
// If the consumer is stopped then we don't accept the message, it
@@ -238,11 +240,23 @@
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
- if (largeMessageInDelivery)
+ if (largeMessageDeliverer != null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is busy delivering large message " +
+ largeMessageDeliverer +
+ ", can't deliver reference " +
+ ref);
+ }
return HandleStatus.BUSY;
}
+ if (log.isTraceEnabled())
+ {
+ log.trace("Handling reference " + ref);
+ }
+
final ServerMessage message = ref.getMessage();
if (filter != null && !filter.match(message))
@@ -265,7 +279,9 @@
// the updateDeliveryCount would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged())
{
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue())
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
+ !ref.getQueue().isInternalQueue() &&
+ !ref.isPaged())
{
storageManager.updateDeliveryCount(ref);
}
@@ -306,7 +322,7 @@
public void close(final boolean failed) throws Exception
{
callback.removeReadyListener(this);
-
+
setStarted(false);
if (largeMessageDeliverer != null)
@@ -352,8 +368,8 @@
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
- props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null ? null
- : filter.getFilterString());
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+ filter == null ? null : filter.getFilterString());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -374,39 +390,71 @@
{
promptDelivery();
- Future future = new Future();
+ // JBPAPP-6030 - Using the executor to avoid distributed dead locks
+ messageQueue.getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // We execute this on the same executor to make sure the force delivery message is written after
+ // any delivery is completed
- messageQueue.getExecutor().execute(future);
+ synchronized (lock)
+ {
+ if (transferring)
+ {
+ // Case it's transferring (reattach), we will retry later
+ messageQueue.getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ forceDelivery(sequence);
+ }
+ });
+ }
+ else
+ {
+ ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+
+ forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+ forcedDeliveryMessage.setAddress(messageQueue.getName());
+
+ callback.sendMessage(forcedDeliveryMessage, id, 0);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
+ }
+ }
+ });
- boolean ok = future.await(10000);
+ }
- if (!ok)
- {
- log.warn("Timed out waiting for executor");
- }
+ public LinkedList<MessageReference> cancelRefs(final boolean failed,
+ final boolean lastConsumedAsDelivered,
+ final Transaction tx) throws Exception
+ {
+ boolean performACK = lastConsumedAsDelivered;
try
{
- // We execute this on the same executor to make sure the force delivery message is written after
- // any delivery is completed
-
- ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
-
- forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
- forcedDeliveryMessage.setAddress(messageQueue.getName());
-
- callback.sendMessage(forcedDeliveryMessage, id, 0);
+ if (largeMessageDeliverer != null)
+ {
+ largeMessageDeliverer.finish();
+ }
}
- catch (Exception e)
+ catch (Throwable e)
{
- ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
+ log.warn("Error on resetting large message deliver - " + largeMessageDeliverer, e);
}
- }
+ finally
+ {
+ largeMessageDeliverer = null;
+ }
- public LinkedList<MessageReference> cancelRefs(final boolean failed, final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
- {
- boolean performACK = lastConsumedAsDelivered;
-
LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
if (!deliveringRefs.isEmpty())
@@ -427,8 +475,9 @@
{
if (!failed)
{
- //We don't decrement delivery count if the client failed, since there's a possibility that refs were actually delivered but we just didn't get any acks for them
- //before failure
+ // We don't decrement delivery count if the client failed, since there's a possibility that refs
+ // were actually delivered but we just didn't get any acks for them
+ // before failure
ref.decrementDeliveryCount();
}
@@ -461,21 +510,6 @@
synchronized (lock)
{
this.transferring = transferring;
-
- if (transferring)
- {
- // Now we must wait for any large message delivery to finish
- while (largeMessageInDelivery)
- {
- try
- {
- Thread.sleep(1);
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
}
// Outside the lock
@@ -504,18 +538,23 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + ":: FlowControl::Received disable flow control message");
+ }
// No flow control
availableCredits = null;
-
- //There may be messages already in the queue
+
+ // There may be messages already in the queue
promptDelivery();
}
else if (credits == 0)
{
- //reset, used on slow consumers
+ // reset, used on slow consumers
+ log.debug(this + ":: FlowControl::Received reset flow control message");
availableCredits.set(0);
}
else
@@ -524,16 +563,17 @@
if (log.isDebugEnabled())
{
- log.debug(this + "::Received " + credits +
- " credits, previous value = " +
- previous +
- " currentValue = " +
- availableCredits.get());
+ log.debug(this + "::FlowControl::Received " +
+ credits +
+ " credits, previous value = " +
+ previous +
+ " currentValue = " +
+ availableCredits.get());
}
if (previous <= 0 && previous + credits > 0)
{
- if (log.isTraceEnabled() )
+ if (log.isTraceEnabled())
{
log.trace(this + "::calling promptDelivery from receiving credits");
}
@@ -547,59 +587,103 @@
return messageQueue;
}
- public void acknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
+ public void acknowledge(final boolean autoCommitAcks, Transaction tx, final long messageID) throws Exception
{
if (browseOnly)
{
return;
}
-
+
// Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
// acknowledged
-
- MessageReference ref;
- do
+
+ // We use a transaction here as if the message is not found, we should rollback anything done
+ // This could eventually happen on retries during transactions, and we need to make sure we don't ACK things we are not supposed to acknowledge
+
+ boolean startedTransaction = false;
+
+ if (tx == null || autoCommitAcks)
{
- ref = deliveringRefs.poll();
-
- if (ref == null)
+ startedTransaction = true;
+ tx = new TransactionImpl(storageManager);
+ }
+
+ try
+ {
+
+ MessageReference ref;
+ do
{
- throw new IllegalStateException(System.identityHashCode(this) + " Could not find reference on consumerID=" +
- id +
- ", messageId = " +
- messageID +
- " queue = " +
- messageQueue.getName() +
- " closed = " +
- closed);
+ ref = deliveringRefs.poll();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("ACKing ref " + ref + " on " + this);
+ }
+
+ if (ref == null)
+ {
+
+ HornetQException e = new HornetQException(HornetQException.ILLEGAL_STATE, "Could not find reference on consumerID=" +
+ id +
+ ", messageId = " +
+ messageID +
+ " queue = " +
+ messageQueue.getName());
+ throw e;
+ }
+
+ ref.getQueue().acknowledge(tx, ref);
}
-
- if (autoCommitAcks || tx == null)
+ while (ref.getMessage().getMessageID() != messageID);
+
+ if (startedTransaction)
{
- ref.getQueue().acknowledge(ref);
+ tx.commit();
}
+ }
+ catch (HornetQException e)
+ {
+ if (startedTransaction)
+ {
+ tx.rollback();
+ }
else
{
- ref.getQueue().acknowledge(tx, ref);
+ tx.markAsRollbackOnly(e);
}
+ throw e;
}
- while (ref.getMessage().getMessageID() != messageID);
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ HornetQException hqex = new HornetQException(HornetQException.ILLEGAL_STATE, e.getMessage());
+ if (startedTransaction)
+ {
+ tx.rollback();
+ }
+ else
+ {
+ tx.markAsRollbackOnly(hqex);
+ }
+ throw hqex;
+ }
}
-
+
public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
{
if (browseOnly)
{
return;
}
-
+
MessageReference ref = removeReferenceByID(messageID);
-
+
if (ref == null)
{
throw new IllegalStateException("Cannot find ref to ack " + messageID);
}
-
+
if (autoCommitAcks)
{
ref.getQueue().acknowledge(ref);
@@ -639,13 +723,13 @@
return ref;
}
-
+
public void readyForWriting(final boolean ready)
{
if (ready)
{
writeReady.set(true);
-
+
promptDelivery();
}
else
@@ -664,28 +748,30 @@
private void promptDelivery()
{
- synchronized (lock)
+ // largeMessageDeliverer is aways set inside a lock
+ // if we don't acquire a lock, we will have NPE eventually
+ if (largeMessageDeliverer != null)
{
- // largeMessageDeliverer is aways set inside a lock
- // if we don't acquire a lock, we will have NPE eventually
- if (largeMessageDeliverer != null)
- {
- resumeLargeMessage();
- }
- else
- {
- if (browseOnly)
- {
- messageQueue.getExecutor().execute(browserDeliverer);
- }
- else
- {
- messageQueue.forceDelivery();
- }
- }
+ resumeLargeMessage();
}
+ else
+ {
+ forceDelivery();
+ }
}
+ private void forceDelivery()
+ {
+ if (browseOnly)
+ {
+ messageQueue.getExecutor().execute(browserDeliverer);
+ }
+ else
+ {
+ messageQueue.deliverAsync();
+ }
+ }
+
private void resumeLargeMessage()
{
messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
@@ -693,8 +779,6 @@
private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception
{
- largeMessageInDelivery = true;
-
final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
// it doesn't need lock because deliverLargeMesasge is already inside the lock()
@@ -713,6 +797,14 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery standard taking " +
+ packetSize +
+ " from credits, available now is " +
+ availableCredits);
+ }
}
}
@@ -729,16 +821,7 @@
{
if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
{
- if (browseOnly)
- {
- messageQueue.getExecutor().execute(browserDeliverer);
- }
- else
- {
- // prompt Delivery only if chunk was finished
-
- messageQueue.deliverAsync();
- }
+ forceDelivery();
}
}
catch (Exception e)
@@ -786,6 +869,12 @@
if (availableCredits != null && availableCredits.get() <= 0)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
+ availableCredits);
+ }
+
return false;
}
@@ -794,7 +883,7 @@
context = largeMessage.getBodyEncoder();
sizePendingLargeMessage = context.getLargeBodySize();
-
+
context.open();
sentInitialPacket = true;
@@ -807,6 +896,15 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::" +
+ " deliver initialpackage with " +
+ packetSize +
+ " delivered, available now = " +
+ availableCredits);
+ }
}
// Execute the rest of the large message on a different thread so as not to tie up the delivery thread
@@ -822,7 +920,8 @@
{
if (ServerConsumerImpl.isTrace)
{
- log.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
+ log.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
+ availableCredits);
}
return false;
@@ -845,16 +944,17 @@
int chunkLen = body.length;
- if (ServerConsumerImpl.isTrace)
- {
- log.trace("deliverLargeMessage: Sending " + packetSize +
- " availableCredits now is " +
- availableCredits);
- }
-
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
+ packetSize +
+ " available now=" +
+ availableCredits);
+ }
}
positionPendingLargeMessage += chunkLen;
@@ -903,8 +1003,6 @@
largeMessageDeliverer = null;
- largeMessageInDelivery = false;
-
largeMessage = null;
}
}
@@ -920,7 +1018,7 @@
}
private final LinkedListIterator<MessageReference> iterator;
-
+
public synchronized void close()
{
iterator.close();
Added: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java (rev 0)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java 2011-12-11 01:58:20 UTC (rev 11894)
@@ -0,0 +1,460 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.core.postoffice.AddressManager;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.QueueBinding;
+import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ * A MultipleConsumerTest
+ *
+ * @author clebert
+ *
+ *
+ */
+public class MultipleConsumerTest extends JMSTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ private static final int TIMEOUT_ON_WAIT = 5000;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ protected boolean usePersistence()
+ {
+ return true;
+ }
+
+ volatile boolean running = true;
+
+ private static final long WAIT_ON_SEND = 0;
+
+ CountDownLatch errorLatch = new CountDownLatch(1);
+
+ AtomicInteger numberOfErrors = new AtomicInteger(0);
+
+ public void error(Throwable e)
+ {
+ System.err.println("Error at " + Thread.currentThread().getName());
+ e.printStackTrace();
+ errorLatch.countDown();
+ }
+
+ /**
+ * @param destinationID
+ * @return
+ */
+ public Topic createSampleTopic(int destinationID)
+ {
+ return HornetQJMSClient.createTopic(createTopicName(destinationID));
+ }
+
+ /**
+ * @param destinationID
+ * @return
+ */
+ private String createTopicName(int destinationID)
+ {
+ return "topic-input" + destinationID;
+ }
+
+ /**
+ * @param destinationID
+ * @return
+ */
+ public Queue createSampleQueue(int destinationID)
+ {
+ return HornetQJMSClient.createQueue(createQueueName(destinationID));
+ }
+
+ /**
+ * @param destinationID
+ * @return
+ */
+ private String createQueueName(int destinationID)
+ {
+ return "queue-output-" + destinationID;
+ }
+
+ public class Counter extends Thread
+ {
+ public Counter()
+ {
+ super("Counter-Thread-Simulating-Management");
+ }
+
+ public void run()
+ {
+ try
+ {
+ AddressManager addr = ((PostOfficeImpl)server.getPostOffice()).getAddressManager();
+
+ LinkedList<org.hornetq.core.server.Queue> queues = new LinkedList<org.hornetq.core.server.Queue>();
+ for (Binding binding : addr.getBindings().values())
+ {
+ if (binding instanceof QueueBinding)
+ {
+ queues.add(((QueueBinding)binding).getQueue());
+ }
+ }
+
+ while (running)
+ {
+ Thread.sleep(1000);
+ for (org.hornetq.core.server.Queue q : queues)
+ {
+ System.out.println("Queue " + q +
+ " has " +
+ q.getInstantMessageCount() +
+ " with " +
+ q.getMessagesAdded() +
+ " with " +
+ q.getConsumerCount() +
+ " consumers");
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ error(e);
+ }
+ }
+ }
+
+ // It will produce to a destination
+ public class ProducerThread extends Thread
+ {
+ Connection conn;
+
+ Session sess;
+
+ Topic topic;
+
+ MessageProducer prod;
+
+ public ProducerThread(Connection conn, int destinationID) throws Exception
+ {
+ this.conn = conn;
+ this.sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ this.topic = createSampleTopic(destinationID);
+ this.prod = sess.createProducer(topic);
+ }
+
+ public void run()
+ {
+ try
+ {
+ while (running)
+ {
+ BytesMessage msg = sess.createBytesMessage();
+ msg.writeBytes(new byte[1024]);
+ prod.send(msg);
+ sess.commit();
+ Thread.sleep(WAIT_ON_SEND);
+ }
+ }
+ catch (Exception e)
+ {
+ error(e);
+ }
+ }
+ }
+
+ // It will bridge from one subscription and send to a queue
+ public class BridgeSubscriberThread extends Thread
+ {
+ Session session;
+
+ MessageProducer prod;
+
+ MessageConsumer cons;
+
+ Topic topic;
+
+ Queue outputQueue;
+
+ public BridgeSubscriberThread(Connection masterConn, int destinationID) throws Exception
+ {
+ super("Bridge_destination=" + destinationID);
+ topic = createSampleTopic(destinationID);
+ outputQueue = createSampleQueue(destinationID);
+ session = masterConn.createSession(true, Session.SESSION_TRANSACTED);
+ cons = session.createDurableSubscriber(topic, "bridge-on-" + destinationID);
+
+ prod = session.createProducer(outputQueue);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ }
+
+ public void run()
+ {
+ try
+ {
+
+ int i = 0;
+ while (running)
+ {
+ Message msg = cons.receive(TIMEOUT_ON_WAIT);
+
+ if (msg == null)
+ {
+ System.err.println("couldn't receive a message within TIMEOUT_ON_WAIT miliseconds on " + topic);
+ error(new RuntimeException("Couldn't receive message"));
+ }
+ else
+ {
+ if (i++ % 100 == 0)
+ {
+ System.out.println(Thread.currentThread().getName() + " received " + i);
+ }
+ prod.send(msg);
+ session.commit();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ error(e);
+ }
+ }
+ }
+
+ // It will read from a destination, and pretend it finished processing it
+ public class ProcessorThread extends Thread
+ {
+
+ Connection conn;
+
+ Session session;
+
+ MessageConsumer cons;
+
+ Destination dest;
+
+ final long waitOnEachConsume;
+
+ public ProcessorThread(Connection conn,
+ Session sess,
+ Destination dest,
+ MessageConsumer cons,
+ long waitOnEachConsume) throws Exception
+ {
+ super("Processor on " + dest);
+ this.conn = conn;
+ this.session = sess;
+ this.dest = dest;
+ this.cons = cons;
+ this.waitOnEachConsume = waitOnEachConsume;
+ }
+
+ public void run()
+ {
+ int i = 0;
+ try
+ {
+ while (running)
+ {
+ if (waitOnEachConsume != 0)
+ {
+ Thread.sleep(waitOnEachConsume);
+ }
+ Message msg = cons.receive(TIMEOUT_ON_WAIT);
+
+ if (i++ % 100 == 0)
+ {
+ System.out.println(Thread.currentThread().getName() + " processed " + i);
+ }
+ if (msg == null)
+ {
+ System.err.println("couldn't receive a message on processor within TIMEOUT_ON_WAIT miliseconds on " + dest);
+ error(new RuntimeException("Couldn't receive message"));
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ error(e);
+ }
+ }
+ }
+
+ // This test requires to be manually tested
+ // At the end this test is throwing an OME for some issue on the test itself.
+ // As long as you see the message "Finished" the test is considered successfull!
+ public void _testMultipleConsumers() throws Throwable
+ {
+
+ AddressSettings set = new AddressSettings();
+ set.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ set.setPageSizeBytes(10 * 1024);
+ set.setMaxSizeBytes(100 * 1024);
+ server.getAddressSettingsRepository().addMatch("#", set);
+
+ try
+ {
+ int nDestinations = 100;
+
+ for (int i = 0; i < nDestinations; i++)
+ {
+ createTopic(createTopicName(i));
+ createQueue(createQueueName(i));
+ }
+
+ LinkedList<Connection> connections = new LinkedList<Connection>();
+
+ LinkedList<Thread> consumerThreads = new LinkedList<Thread>();
+
+ LinkedList<Thread> producerThreads = new LinkedList<Thread>();
+
+ // start a few simulated external consumers on the topic (1 external subscription)
+ for (int i = 0; i < nDestinations; i++)
+ {
+ Connection conn = cf.createConnection();
+ conn.setClientID("external-consumer-" + i);
+ conn.start();
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic = createSampleTopic(i);
+ MessageConsumer cons = sess.createDurableSubscriber(topic, "ex-" + i);
+ ProcessorThread proc = new ProcessorThread(conn, sess, topic, cons, 100l);
+ consumerThreads.add(proc);
+
+ connections.add(conn);
+ }
+
+ // uncomment this to read from the output queues
+ for (int i = 0; i < nDestinations; i++)
+ {
+ Connection conncons = cf.createConnection();
+ conncons.setClientID("output-queue" + i);
+ conncons.start();
+ Session sesscons = conncons.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = createSampleQueue(i);
+ MessageConsumer cons = sesscons.createConsumer(queue);
+ ProcessorThread proc = new ProcessorThread(conncons, sesscons, queue, cons, 0l);
+ consumerThreads.add(proc);
+ connections.add(conncons);
+ }
+
+ Connection masterConn = cf.createConnection();
+ connections.add(masterConn);
+ masterConn.setClientID("master-conn");
+ masterConn.start();
+
+ // start the bridges itself
+ for (int i = 0; i < nDestinations; i++)
+ {
+ BridgeSubscriberThread subs = new BridgeSubscriberThread(masterConn, i);
+ consumerThreads.add(subs);
+ }
+
+ // The producers
+ for (int i = 0; i < nDestinations; i++)
+ {
+ Connection prodConn = cf.createConnection();
+ ProducerThread prod = new ProducerThread(prodConn, i);
+ producerThreads.add(prod);
+ }
+
+ for (Thread t : producerThreads)
+ {
+ t.start();
+ }
+
+ // Waiting some time before we start the consumers. To make sure it's paging
+ Thread.sleep(20000);
+
+ System.out.println("starting consumers now");
+
+ for (Thread t : consumerThreads)
+ {
+ t.start();
+ }
+
+ Counter managerThread = new Counter();
+
+ managerThread.start();
+
+ errorLatch.await(20, TimeUnit.MINUTES);
+
+ assertEquals(0, numberOfErrors.get());
+
+ running = false;
+
+ for (Thread t : consumerThreads)
+ {
+ t.join();
+ }
+
+ for (Thread t : producerThreads)
+ {
+ t.join();
+ }
+
+ for (Connection conn : connections)
+ {
+ conn.close();
+ }
+
+ managerThread.join();
+
+ System.out.println("Finished!!!!");
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-12-11 01:58:20 UTC (rev 11894)
@@ -134,7 +134,7 @@
prod.send(msg);
}
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
//Consumer is not started so should go queued
assertFalse(queue.isDirectDeliver());
@@ -157,7 +157,7 @@
prod.send(msg);
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
assertTrue(queue.isDirectDeliver());
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-12-11 01:58:20 UTC (rev 11894)
@@ -82,7 +82,7 @@
}
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
return true;
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-12-11 01:58:20 UTC (rev 11894)
@@ -617,7 +617,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#blockOnExecutorFuture()
*/
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
// TODO Auto-generated method stub
return false;
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java 2011-12-11 01:43:04 UTC (rev 11893)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java 2011-12-11 01:58:20 UTC (rev 11894)
@@ -214,7 +214,7 @@
HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
- HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ 100 * 1024, // HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE,
HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
13 years, 3 months
JBoss hornetq SVN: r11893 - branches/one-offs.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-10 20:43:04 -0500 (Sat, 10 Dec 2011)
New Revision: 11893
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/
Log:
Creating this branch of JBPAPP_7242 - JBPAPP-7710
13 years, 3 months