JBoss hornetq SVN: r11430 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-26 22:59:26 -0400 (Mon, 26 Sep 2011)
New Revision: 11430
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
Log:
fixing a test
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-26 16:16:31 UTC (rev 11429)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-27 02:59:26 UTC (rev 11430)
@@ -459,6 +459,11 @@
for (PagedMessage msg : messages)
{
pageCache.addLiveMessage(msg);
+ if (msg.getMessage().isLargeMessage())
+ {
+ // We have to do this since addLIveMessage will increment an extra one
+ ((LargeServerMessage)msg.getMessage()).decrementDelayDeletionCount();
+ }
}
currentPage.setLiveCache(pageCache);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-26 16:16:31 UTC (rev 11429)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-27 02:59:26 UTC (rev 11430)
@@ -69,6 +69,7 @@
* A ClusterConnectionImpl
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author Clebert Suconic
*
* Created 21 Jan 2009 14:43:05
*
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-09-26 16:16:31 UTC (rev 11429)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-09-27 02:59:26 UTC (rev 11430)
@@ -285,6 +285,8 @@
producer.send(clientFile);
}
session.commit();
+
+ validateNoFilesOnLargeDir(10);
for (int h = 0; h < 5; h++)
{
@@ -307,8 +309,6 @@
for (int i = 0; i < 10; i++)
{
ClientMessage clientMessage = cons.receive(5000);
-
- System.out.println("msg " + clientMessage);
assertNotNull(clientMessage);
for (int countByte = 0; countByte < messageSize; countByte++)
{
@@ -324,8 +324,11 @@
{
session.rollback();
}
+
+ session.close();
+ sf.close();
}
-
+
server.stop(false);
server.start();
13 years, 3 months
JBoss hornetq SVN: r11428 - in branches/HORNETQ-720_Replication: hornetq-core/src/test/java/org/hornetq/tests/util and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 08:49:52 -0400 (Mon, 26 Sep 2011)
New Revision: 11428
Removed:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java
Modified:
branches/HORNETQ-720_Replication/
branches/HORNETQ-720_Replication/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
Log:
merge from trunk
Property changes on: branches/HORNETQ-720_Replication
___________________________________________________________________
Modified: svn:mergeinfo
- /trunk:10878-11402
+ /trunk:10878-11427
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-09-26 12:10:11 UTC (rev 11427)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-09-26 12:49:52 UTC (rev 11428)
@@ -99,7 +99,7 @@
public static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
- protected static final String CLUSTER_PASSWORD = "HornetQ";
+ protected static final String CLUSTER_PASSWORD = "UnitTestsClusterPassword";
// Attributes ----------------------------------------------------
Deleted: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java 2011-09-26 12:10:11 UTC (rev 11427)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java 2011-09-26 12:49:52 UTC (rev 11428)
@@ -1,39 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.hornetq.tests.integration.cluster.failover;
-
-/**
- * A NettyFileStorageDiscoveryClusterWithBackupFailoverTest
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- *
- */
-public class NettyFileStorageDiscoveryClusterWithBackupFailoverTest extends DiscoveryClusterWithBackupFailoverTest
-{
- @Override
- protected boolean isNetty()
- {
- return true;
- }
-}
\ No newline at end of file
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java 2011-09-26 12:10:11 UTC (rev 11427)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java 2011-09-26 12:49:52 UTC (rev 11428)
@@ -24,6 +24,7 @@
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
+import org.hornetq.tests.util.UnitTestCase;
/**
* A SecurityManagementTest
@@ -58,9 +59,7 @@
*/
public void testSendManagementMessageWithClusterAdminUser() throws Exception
{
- doSendManagementMessage(ConfigurationImpl.DEFAULT_CLUSTER_USER,
- ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD,
- true);
+ doSendManagementMessage(ConfigurationImpl.DEFAULT_CLUSTER_USER, UnitTestCase.CLUSTER_PASSWORD, true);
}
public void testSendManagementMessageWithAdminRole() throws Exception
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java 2011-09-26 12:10:11 UTC (rev 11427)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java 2011-09-26 12:49:52 UTC (rev 11428)
@@ -63,6 +63,7 @@
protected HornetQServer setupAndStartHornetQServer() throws Exception
{
Configuration conf = createBasicConfig();
+ conf.setClusterPassword(ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
conf.setSecurityEnabled(true);
conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
HornetQServer server = HornetQServers.newHornetQServer(conf, false);
13 years, 3 months
JBoss hornetq SVN: r11427 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 08:10:11 -0400 (Mon, 26 Sep 2011)
New Revision: 11427
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java
Log:
Fix test using default password, set it to use UnitTestCase.CLUSTER_PASSWORD
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java 2011-09-26 12:06:13 UTC (rev 11426)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java 2011-09-26 12:10:11 UTC (rev 11427)
@@ -24,6 +24,7 @@
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
+import org.hornetq.tests.util.UnitTestCase;
/**
* A SecurityManagementTest
@@ -58,9 +59,7 @@
*/
public void testSendManagementMessageWithClusterAdminUser() throws Exception
{
- doSendManagementMessage(ConfigurationImpl.DEFAULT_CLUSTER_USER,
- ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD,
- true);
+ doSendManagementMessage(ConfigurationImpl.DEFAULT_CLUSTER_USER, UnitTestCase.CLUSTER_PASSWORD, true);
}
public void testSendManagementMessageWithAdminRole() throws Exception
13 years, 3 months
JBoss hornetq SVN: r11426 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/management and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 08:06:13 -0400 (Mon, 26 Sep 2011)
New Revision: 11426
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
Log:
Fix test relying on default cluster password
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-09-26 12:00:16 UTC (rev 11425)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-09-26 12:06:13 UTC (rev 11426)
@@ -99,7 +99,7 @@
public static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
- protected static final String CLUSTER_PASSWORD = "HornetQ";
+ protected static final String CLUSTER_PASSWORD = "UnitTestsClusterPassword";
// Attributes ----------------------------------------------------
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java 2011-09-26 12:00:16 UTC (rev 11425)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java 2011-09-26 12:06:13 UTC (rev 11426)
@@ -63,6 +63,7 @@
protected HornetQServer setupAndStartHornetQServer() throws Exception
{
Configuration conf = createBasicConfig();
+ conf.setClusterPassword(ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
conf.setSecurityEnabled(true);
conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
HornetQServer server = HornetQServers.newHornetQServer(conf, false);
13 years, 3 months
JBoss hornetq SVN: r11425 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 08:00:16 -0400 (Mon, 26 Sep 2011)
New Revision: 11425
Removed:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java
Log:
Delete NettyFileStorageDiscoveryClusterWithBackupFailoverTest as it is functionally equivalent to NettyDiscoveryClusterWithBackupFailoverTest
Deleted: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java 2011-09-26 10:41:01 UTC (rev 11424)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFileStorageDiscoveryClusterWithBackupFailoverTest.java 2011-09-26 12:00:16 UTC (rev 11425)
@@ -1,39 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.hornetq.tests.integration.cluster.failover;
-
-/**
- * A NettyFileStorageDiscoveryClusterWithBackupFailoverTest
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- *
- */
-public class NettyFileStorageDiscoveryClusterWithBackupFailoverTest extends DiscoveryClusterWithBackupFailoverTest
-{
- @Override
- protected boolean isNetty()
- {
- return true;
- }
-}
\ No newline at end of file
13 years, 3 months
JBoss hornetq SVN: r11424 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/persistence and 7 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 06:41:01 -0400 (Mon, 26 Sep 2011)
New Revision: 11424
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
HORNETQ-720 Always read using buffers from corresponding SequentialFileFactory
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -1140,7 +1140,7 @@
{
continue;
}
- replicator.syncPages(sFile, id, getAddress());
+ replicator.syncPages(fileFactory, sFile, id, getAddress());
}
}
finally
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -32,6 +32,7 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
@@ -224,10 +225,11 @@
long storePageCounterInc(long queueID, int add) throws Exception;
/**
+ * @param journalContent
* @return {@code true} if the underlying {@link SequentialFileFactory} has callback support.
* @see SequentialFileFactory#isSupportsCallbacks()
*/
- boolean hasCallbackSupport();
+ boolean hasCallbackSupport(JournalContent journalContent);
/**
* @return the bindings journal
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -183,9 +183,9 @@
}
private Journal messageJournal;
-
+ private final SequentialFileFactory messageJournalFileFactory;
private Journal bindingsJournal;
-
+ private final SequentialFileFactory bindingsJournalFileFactory;
private final SequentialFileFactory largeMessagesFactory;
private volatile boolean started;
@@ -220,8 +220,6 @@
private final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();
- private final boolean hasCallbackSupport;
-
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final ReplicationManager replicator)
@@ -248,13 +246,13 @@
journalDir = config.getJournalDirectory();
- SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
+ bindingsJournalFileFactory = new NIOSequentialFileFactory(bindingsDir);
Journal localBindings = new JournalImpl(1024 * 1024,
2,
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
- bindingsFF,
+ bindingsJournalFileFactory,
"hornetq-bindings",
"bindings",
1);
@@ -279,13 +277,12 @@
syncTransactional = config.isJournalSyncTransactional();
- SequentialFileFactory journalFF = null;
-
if (config.getJournalType() == JournalType.ASYNCIO)
{
JournalStorageManager.log.info("Using AIO Journal");
- journalFF = new AIOSequentialFileFactory(journalDir,
+ messageJournalFileFactory =
+ new AIOSequentialFileFactory(journalDir,
config.getJournalBufferSize_AIO(),
config.getJournalBufferTimeout_AIO(),
config.isLogJournalWriteRate());
@@ -293,7 +290,8 @@
else if (config.getJournalType() == JournalType.NIO)
{
JournalStorageManager.log.info("Using NIO Journal");
- journalFF = new NIOSequentialFileFactory(journalDir,
+ messageJournalFileFactory =
+ new NIOSequentialFileFactory(journalDir,
true,
config.getJournalBufferSize_NIO(),
config.getJournalBufferTimeout_NIO(),
@@ -303,7 +301,6 @@
{
throw new IllegalArgumentException("Unsupported journal type " + config.getJournalType());
}
- hasCallbackSupport = journalFF.isSupportsCallbacks();
idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
@@ -311,7 +308,7 @@
config.getJournalMinFiles(),
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
- journalFF,
+ messageJournalFileFactory,
"hornetq-data",
"hq",
config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
@@ -409,8 +406,8 @@
storageManagerLock.writeLock().unlock();
}
- sendJournalFile(messageFiles, JournalContent.MESSAGES);
- sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
+ sendJournalFile(messageJournalFileFactory, messageFiles, JournalContent.MESSAGES);
+ sendJournalFile(bindingsJournalFileFactory, bindingsFiles, JournalContent.BINDINGS);
sendLargeMessageFiles(largeMessageFilesToSync);
sendPagesToBackup(pageFilesToSync, pagingManager);
@@ -474,7 +471,7 @@
SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName, 1);
if (!seqFile.exists())
continue;
- replicator.syncLargeMessageFile(seqFile, size, getLargeMessageIdFromFilename(fileName));
+ replicator.syncLargeMessageFile(largeMessagesFactory, seqFile, size, getLargeMessageIdFromFilename(fileName));
}
}
@@ -507,11 +504,12 @@
/**
* Send an entire journal file to a replicating backup server.
*/
- private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws Exception
+ private void
+ sendJournalFile(SequentialFileFactory factory, JournalFile[] journalFiles, JournalContent type) throws Exception
{
for (JournalFile jf : journalFiles)
{
- replicator.syncJournalFile(jf, type);
+ replicator.syncJournalFile(factory, jf, type);
jf.setCanReclaim(true);
}
}
@@ -3890,9 +3888,11 @@
journal.stop();
}
- public boolean hasCallbackSupport()
+ public boolean hasCallbackSupport(JournalContent journalContent)
{
- return hasCallbackSupport;
+ if (journalContent == JournalContent.MESSAGES)
+ return messageJournalFileFactory.isSupportsCallbacks();
+ return bindingsJournalFileFactory.isSupportsCallbacks();
}
@Override
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -37,6 +37,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
@@ -573,7 +574,7 @@
}
@Override
- public boolean hasCallbackSupport()
+ public boolean hasCallbackSupport(JournalContent content)
{
return false;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -20,6 +20,7 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
@@ -92,7 +93,7 @@
* @throws HornetQException
* @throws Exception
*/
- void syncJournalFile(JournalFile jf, JournalContent type) throws Exception;
+ void syncJournalFile(SequentialFileFactory factory, JournalFile jf, JournalContent type) throws Exception;
/**
* Reserve the following fileIDs in the backup server.
@@ -113,7 +114,8 @@
* @param seqFile
* @throws Exception
*/
- void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws Exception;
+ void syncLargeMessageFile(SequentialFileFactory fctr, SequentialFile seqFile, long size, long id)
+ throws Exception;
/**
* @param file
@@ -121,5 +123,5 @@
* @param pageStore
* @throws Exception
*/
- void syncPages(SequentialFile file, long id, SimpleString pageStore) throws Exception;
+ void syncPages(SequentialFileFactory factory, SequentialFile file, long id, SimpleString pageStore) throws Exception;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -492,7 +492,7 @@
{
sf.open(1, false);
}
- sf.writeDirect(ByteBuffer.wrap(data), true);
+ sf.writeDirect(data);
}
/**
@@ -519,7 +519,7 @@
Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
JournalFile current = journal.createFilesForBackupSync(packet.getFileIds(), mapToFill);
registerJournal(packet.getJournalContentType().typeByte,
- new FileWrapperJournal(current, storage.hasCallbackSupport()));
+ new FileWrapperJournal(current, storage.hasCallbackSupport(packet.getJournalContentType())));
}
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -26,6 +26,7 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
@@ -462,26 +463,28 @@
}
@Override
- public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
+ public void syncJournalFile(SequentialFileFactory factory, JournalFile jf, JournalContent content) throws Exception
{
if (enabled)
{
SequentialFile file = jf.getFile().copy();
log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
- sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
+ sendLargeFile(content, null, jf.getFileID(), file, factory, Long.MAX_VALUE);
}
}
@Override
- public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception
+ public void
+ syncLargeMessageFile(SequentialFileFactory factory, SequentialFile file, long size, long id) throws Exception
{
- sendLargeFile(null, null, id, file, size);
+ sendLargeFile(null, null, id, file, factory, size);
}
@Override
- public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception
+ public void
+ syncPages(SequentialFileFactory factory, SequentialFile file, long id, SimpleString queueName) throws Exception
{
- sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
+ sendLargeFile(null, queueName, id, file, factory, Long.MAX_VALUE);
}
/**
@@ -493,7 +496,11 @@
* @param maxBytesToSend maximum number of bytes to read and send from the file
* @throws Exception
*/
- private void sendLargeFile(JournalContent content, SimpleString pageStore, final long id, SequentialFile file,
+ private void sendLargeFile(JournalContent content,
+ SimpleString pageStore,
+ final long id,
+ SequentialFile file,
+ SequentialFileFactory factory,
long maxBytesToSend)
throws Exception
{
@@ -501,11 +508,13 @@
return;
if (!file.isOpen())
{
- file.open(1, false);
+ file.open();
}
- final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
- while (true)
+ final ByteBuffer buffer = factory.newBuffer(1 << 17);
+ try
{
+ while (true)
+ {
buffer.rewind();
int bytesRead = file.read(buffer);
int toSend = bytesRead;
@@ -528,7 +537,12 @@
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, bytesRead, buffer));
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
+ }
}
+ finally
+ {
+ factory.releaseBuffer(buffer);
+ }
}
@Override
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -20,12 +20,10 @@
import org.hornetq.core.journal.impl.TimedBuffer;
/**
- *
* A SequentialFile
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
*/
public interface SequentialFile
{
@@ -65,19 +63,50 @@
void write(EncodingSupport bytes, boolean sync) throws Exception;
- /** Write directly to the file without using any buffer */
+ /**
+ * Write directly to the file without using any buffer
+ * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+ * NIO). To be safe, use a buffer from the corresponding
+ * {@link SequentialFileFactory#newBuffer(int)}.
+ */
void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
- /** Write directly to the file without using any buffer */
+ /**
+ * Write directly to the file without using any buffer
+ * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+ * NIO). To be safe, use a buffer from the corresponding
+ * {@link SequentialFileFactory#newBuffer(int)}.
+ */
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
- /** Write directly to the file.
- * This is used by compacting and other places where we write a big buffer in a single shot.
- * writeInternal should always block until the entire write is sync on disk */
+ /**
+ * Write directly to the file. This is used by compacting and other places where we write a big
+ * buffer in a single shot. writeInternal should always block until the entire write is sync on
+ * disk.
+ * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+ * NIO). To be safe, use a buffer from the corresponding
+ * {@link SequentialFileFactory#newBuffer(int)}.
+ */
void writeInternal(ByteBuffer bytes) throws Exception;
+ /**
+ * Wraps the bytes using a buffer from the internal {@link SequentialFileFactory} and writes it
+ * directly.
+ */
+ void writeDirect(byte[] bytes) throws Exception;
+
+ /**
+ * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+ * NIO). To be safe, use a buffer from the corresponding
+ * {@link SequentialFileFactory#newBuffer(int)}.
+ */
int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
+ /**
+ * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+ * NIO). To be safe, use a buffer from the corresponding
+ * {@link SequentialFileFactory#newBuffer(int)}.
+ */
int read(ByteBuffer bytes) throws Exception;
void position(long pos) throws Exception;
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -14,10 +14,10 @@
package org.hornetq.core.journal.impl;
import java.io.File;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
@@ -286,7 +286,7 @@
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
- public void writeInternal(final ByteBuffer bytes) throws Exception
+ public void writeInternal(final ByteBuffer bytes) throws HornetQException
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -234,6 +234,20 @@
}
}
+ @Override
+ public void writeDirect(byte[] data) throws Exception
+ {
+ ByteBuffer buffer = factory.wrapBuffer(data);
+ try
+ {
+ writeDirect(buffer, true);
+ }
+ finally
+ {
+ factory.releaseBuffer(buffer);
+ }
+ }
+
public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback) throws Exception
{
if (timedBuffer != null)
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -338,7 +338,9 @@
* @throws IOException
* @throws Exception
*/
- private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
+ private
+ void
+ doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException
{
channel.write(bytes);
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-26 10:39:02 UTC (rev 11423)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-26 10:41:01 UTC (rev 11424)
@@ -31,9 +31,9 @@
import org.hornetq.core.logging.Logger;
/**
- *
+ *
* A FakeSequentialFileFactory
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
@@ -313,8 +313,6 @@
public boolean isOpen()
{
- // log.debug("is open" + System.identityHashCode(this) +" open is now "
- // + open);
return open;
}
@@ -499,7 +497,7 @@
{
writeDirect(bytes, sync, null);
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.journal.SequentialFile#writeInternal(java.nio.ByteBuffer)
*/
@@ -508,8 +506,8 @@
writeDirect(bytes, true);
}
-
+
private void checkAndResize(final int size)
{
int oldpos = data == null ? 0 : data.position();
@@ -681,9 +679,15 @@
public void copyTo(SequentialFile newFileName)
{
// TODO Auto-generated method stub
-
}
+ @Override
+ public void writeDirect(byte[] bytes) throws Exception
+ {
+ ByteBuffer buffer = newBuffer(bytes.length);
+ HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
+ write(outbuffer, true);
+ }
}
/* (non-Javadoc)
13 years, 3 months
JBoss hornetq SVN: r11423 - in branches/HORNETQ-720_Replication/tests: stress-tests/src/test/java/org/hornetq/tests/stress/paging and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 06:39:02 -0400 (Mon, 26 Sep 2011)
New Revision: 11423
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageCursorStressTest.java
Log:
Avoid noise on std.out
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-09-26 10:38:04 UTC (rev 11422)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-09-26 10:39:02 UTC (rev 11423)
@@ -1549,7 +1549,7 @@
for (int i = 0; i < 50; i++)
{
- System.out.println("Sending " + i);
+ // System.out.println("Sending " + i);
ClientMessage message = sessionNonTX.createMessage(true);
message.getBodyBuffer().writeBytes(body);
message.putIntProperty(new SimpleString("id"), i);
@@ -1559,7 +1559,7 @@
if (i % 2 == 0)
{
- System.out.println("Sending 20 msgs to make it page");
+ // System.out.println("Sending 20 msgs to make it page");
for (int j = 0; j < 20; j++)
{
ClientMessage msgSend = sessionNonTX.createMessage(true);
@@ -1571,7 +1571,7 @@
}
else
{
- System.out.println("Consuming 20 msgs to make it page");
+ // System.out.println("Consuming 20 msgs to make it page");
ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
for (int j = 0; j < 20; j++)
{
@@ -1611,12 +1611,9 @@
Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
- // System.out.println(messageID);
- Assert.assertNotNull(messageID);
- Assert.assertEquals("message received out of order", i, messageID.intValue());
+ Assert.assertNotNull("MessageID", messageID);
+ Assert.assertEquals("message received out of order " + messageID, i, messageID.intValue());
- System.out.println("MessageID = " + messageID);
-
message.acknowledge();
}
@@ -1877,8 +1874,7 @@
for (int i = 0; i < numberOfMessages; i++)
{
ClientMessage msg = consumer.receive(5000);
- assertNotNull(msg);
- System.out.println("Received " + i);
+ assertNotNull(String.valueOf(i), msg);
assertEquals(i, msg.getIntProperty("count").intValue());
msg.acknowledge();
}
@@ -2197,13 +2193,12 @@
session.start();
for (int i = 0; i < numberOfMessages; i++)
{
- System.out.println("Received " + i);
if (i == 55)
{
System.out.println("i = 55");
}
ClientMessage msg = consumer.receive(5000);
- Assert.assertNotNull(msg);
+ Assert.assertNotNull("Received " + i, msg);
msg.acknowledge();
session.commit();
}
@@ -2296,10 +2291,9 @@
// 347 = I just picked any odd number, not rounded, to make sure it's not at the beggining of any page
for (int i = 0; i < 347; i++)
{
- System.out.println("Received " + i);
ClientMessage msg = consumer.receive(5000);
assertEquals(i, msg.getIntProperty("id").intValue());
- Assert.assertNotNull(msg);
+ Assert.assertNotNull("Received " + i, msg);
msg.acknowledge();
session.commit();
}
@@ -2329,10 +2323,9 @@
session.start();
for (int i = 347; i < numberOfMessages; i++)
{
- System.out.println("Received " + i);
ClientMessage msg = consumer.receive(5000);
assertEquals(i, msg.getIntProperty("id").intValue());
- Assert.assertNotNull(msg);
+ Assert.assertNotNull("Received " + i, msg);
msg.acknowledge();
session.commit();
}
Modified: branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageCursorStressTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2011-09-26 10:38:04 UTC (rev 11422)
+++ branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2011-09-26 10:39:02 UTC (rev 11423)
@@ -67,12 +67,12 @@
// Attributes ----------------------------------------------------
- private SimpleString ADDRESS = new SimpleString("test-add");
+ private final SimpleString ADDRESS = new SimpleString("test-add");
private HornetQServer server;
private Queue queue;
-
+
private List<Queue> queueList;
private static final int PAGE_MAX = -1;
@@ -120,7 +120,7 @@
final int NUM_MESSAGES = 100;
PageSubscription cursor = lookupPageStore(ADDRESS).getCursorProvider().getSubscription(queue.getID());
-
+
Iterator<PagedReference> iterEmpty = cursor.iterator();
int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
@@ -325,9 +325,8 @@
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
- System.out.println("Received " + i);
PagedReference msg = iterator.next();
- assertNotNull(msg);
+ assertNotNull("Received " + i, msg);
assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
cursor.ack(msg);
@@ -378,7 +377,7 @@
cursor.ack(msg);
}
}
-
+
server.getStorageManager().waitOnOperations();
server.stop();
@@ -498,7 +497,7 @@
.getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
-
+
RoutingContextImpl ctx = generateCTX();
LinkedListIterator<PagedReference> iterator = cursor.iterator();
@@ -527,7 +526,7 @@
}
OperationContextImpl.clearContext();
-
+
ctx = generateCTX();
pageStore = lookupPageStore(ADDRESS);
@@ -616,8 +615,8 @@
assertFalse(lookupPageStore(ADDRESS).isPaging());
}
-
+
public void testConsumeLivePageMultiThread() throws Exception
{
final PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
@@ -625,9 +624,9 @@
pageStore.startPaging();
final int NUM_TX = 100;
-
+
final int MSGS_TX = 100;
-
+
final int TOTAL_MSG = NUM_TX * MSGS_TX;
final int messageSize = 1024;
@@ -641,31 +640,32 @@
.getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
-
+
final StorageManager storage = this.server.getStorageManager();
-
+
final AtomicInteger exceptions = new AtomicInteger(0);
-
+
Thread t1 = new Thread()
{
+ @Override
public void run()
{
try
{
int count = 0;
-
+
for (int txCount = 0; txCount < NUM_TX; txCount++)
{
-
+
Transaction tx = null;
-
+
if (txCount % 2 == 0)
{
tx = new TransactionImpl(storage);
}
RoutingContext ctx = generateCTX(tx);
-
+
for (int i = 0 ; i < MSGS_TX; i++)
{
//System.out.println("Sending " + count);
@@ -678,12 +678,12 @@
Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
}
-
+
if (tx != null)
{
tx.commit();
}
-
+
}
}
catch (Throwable e)
@@ -693,12 +693,12 @@
}
}
};
-
+
t1.start();
-
-
+
+
LinkedListIterator<PagedReference> iterator = cursor.iterator();
-
+
for (int i = 0 ; i < TOTAL_MSG; i++ )
{
assertEquals(0, exceptions.get());
@@ -716,32 +716,32 @@
}
}
assertNotNull(ref);
-
+
ref.acknowledge();
assertNotNull(ref);
-
+
System.out.println("Consuming " + ref.getMessage().getIntProperty("key"));
//assertEquals(i, ref.getMessage().getIntProperty("key").intValue());
}
assertEquals(0, exceptions.get());
}
-
+
private RoutingContextImpl generateCTX()
{
return generateCTX(null);
}
-
+
private RoutingContextImpl generateCTX(Transaction tx)
{
RoutingContextImpl ctx = new RoutingContextImpl(tx);
ctx.addQueue(ADDRESS, queue);
-
+
for (Queue q : this.queueList)
{
ctx.addQueue(ADDRESS, q);
}
-
+
return ctx;
}
@@ -813,11 +813,11 @@
assertNull(iterator.next());
cursor.printDebug();
-
+
txCommit.commit();
txRollback.rollback();
-
+
storage.waitOnOperations();
// Second:after pgtxCommit was done
@@ -830,7 +830,7 @@
}
assertNull(iterator.next());
-
+
server.getStorageManager().waitOnOperations();
server.stop();
@@ -888,9 +888,9 @@
}
assertNull(iterator.next());
-
+
txLazy.commit();
-
+
storage.waitOnOperations();
for (int i = 0; i < 100; i++)
@@ -955,7 +955,7 @@
// We can't proceed until the operation has finished
server.getStorageManager().waitOnOperations();
-
+
PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
msg.initMessage(server.getStorageManager());
int initialKey = msg.getMessage().getIntProperty("key").intValue();
@@ -1000,7 +1000,7 @@
assertEquals(1, lookupPageStore(ADDRESS).getNumberOfPages());
}
-
+
private int tstProperty(ServerMessage msg)
{
return msg.getIntProperty("key").intValue();
@@ -1020,52 +1020,52 @@
PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
LinkedListIterator<PagedReference> iter = cursor.iterator();
-
+
LinkedListIterator<PagedReference> iter2 = cursor.iterator();
-
+
assertTrue(iter.hasNext());
-
+
PagedReference msg1 = iter.next();
-
+
PagedReference msg2 = iter2.next();
-
+
assertEquals(tstProperty(msg1.getMessage()), tstProperty(msg2.getMessage()));
-
+
System.out.println("property = " + tstProperty(msg1.getMessage()));
msg1 = iter.next();
-
+
assertEquals(1, tstProperty(msg1.getMessage()));
-
+
iter.remove();
-
+
msg2 = iter2.next();
-
+
assertEquals(2, tstProperty(msg2.getMessage()));
-
+
iter2.repeat();
-
+
msg2 = iter2.next();
-
+
assertEquals(2, tstProperty(msg2.getMessage()));
-
+
iter2.repeat();
-
+
assertEquals(2, tstProperty(msg2.getMessage()));
-
+
msg1 = iter.next();
-
+
assertEquals(2, tstProperty(msg1.getMessage()));
-
+
iter.repeat();
-
+
msg1 = iter.next();
-
+
assertEquals(2, tstProperty(msg1.getMessage()));
-
+
assertTrue(iter2.hasNext());
-
-
+
+
}
private int addMessages(final int numMessages, final int messageSize) throws Exception
@@ -1083,7 +1083,7 @@
PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
pageStore.startPaging();
-
+
RoutingContext ctx = generateCTX();
for (int i = start; i < start + numMessages; i++)
@@ -1118,6 +1118,7 @@
// Protected -----------------------------------------------------
+ @Override
protected void tearDown() throws Exception
{
server.stop();
@@ -1127,6 +1128,7 @@
super.tearDown();
}
+ @Override
protected void setUp() throws Exception
{
super.setUp();
@@ -1134,7 +1136,7 @@
System.out.println("Tmp:" + getTemporaryDir());
queueList = new ArrayList<Queue>();
-
+
createServer();
}
@@ -1152,7 +1154,7 @@
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
server.start();
-
+
queueList.clear();
try
@@ -1174,11 +1176,11 @@
long id = server.getStorageManager().generateUniqueID();
FakeQueue queue = new FakeQueue(new SimpleString(filter.toString()), id);
queueList.add(queue);
-
+
PageSubscription subs = lookupCursorProvider().createSubscription(id, filter, false);
-
+
queue.setPageSubscription(subs);
-
+
return subs;
}
@@ -1207,9 +1209,9 @@
final int NUM_MESSAGES,
final int messageSize) throws Exception
{
-
+
TransactionImpl txImpl = new TransactionImpl(pgParameter, null, storage);
-
+
RoutingContext ctx = generateCTX(txImpl);
for (int i = start; i < start + NUM_MESSAGES; i++)
@@ -1220,7 +1222,7 @@
msg.putIntProperty("key", i);
pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS));
}
-
+
return txImpl;
}
13 years, 3 months
JBoss hornetq SVN: r11422 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal/impl and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 06:38:04 -0400 (Mon, 26 Sep 2011)
New Revision: 11422
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Throw IOException instead of Exception
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-09-26 10:37:00 UTC (rev 11421)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-09-26 10:38:04 UTC (rev 11422)
@@ -13,6 +13,7 @@
package org.hornetq.core.journal;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.hornetq.api.core.HornetQBuffer;
@@ -87,7 +88,7 @@
void waitForClose() throws Exception;
- void sync() throws Exception;
+ void sync() throws IOException;
long size() throws Exception;
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-26 10:37:00 UTC (rev 11421)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-26 10:38:04 UTC (rev 11422)
@@ -14,6 +14,7 @@
package org.hornetq.core.journal.impl;
import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
@@ -230,7 +231,7 @@
return bytesRead;
}
- public void sync() throws Exception
+ public void sync()
{
throw new UnsupportedOperationException("This method is not supported on AIO");
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-09-26 10:37:00 UTC (rev 11421)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-09-26 10:38:04 UTC (rev 11422)
@@ -29,9 +29,9 @@
import org.hornetq.core.logging.Logger;
/**
- *
+ *
* A NIOSequentialFile
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
@@ -205,7 +205,7 @@
}
- public void sync() throws Exception
+ public void sync() throws IOException
{
if (channel != null)
{
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-26 10:37:00 UTC (rev 11421)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-26 10:38:04 UTC (rev 11422)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.journal.impl.fakes;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -474,7 +475,7 @@
}
- public void sync() throws Exception
+ public void sync() throws IOException
{
if (supportsCallback)
{
13 years, 3 months
JBoss hornetq SVN: r11421 - branches/HORNETQ-720_Replication.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 06:37:00 -0400 (Mon, 26 Sep 2011)
New Revision: 11421
Modified:
branches/HORNETQ-720_Replication/pom.xml
Log:
Fork the process for each test class, and time-it out after 10 min
Modified: branches/HORNETQ-720_Replication/pom.xml
===================================================================
--- branches/HORNETQ-720_Replication/pom.xml 2011-09-26 10:36:41 UTC (rev 11420)
+++ branches/HORNETQ-720_Replication/pom.xml 2011-09-26 10:37:00 UTC (rev 11421)
@@ -475,6 +475,8 @@
<testFailureIgnore>true</testFailureIgnore>
<runOrder>alphabetical</runOrder>
<redirectTestOutputToFile>false</redirectTestOutputToFile>
+ <forkMode>always</forkMode>
+ <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
<plugin>
13 years, 3 months