JBoss hornetq SVN: r11370 - in branches/Branch_2_2_EAP_cluster4: src/main/org/hornetq/core/server/cluster/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-19 19:47:54 -0400 (Mon, 19 Sep 2011)
New Revision: 11370
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Log:
Fixing tests
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-19 21:54:41 UTC (rev 11369)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-19 23:47:54 UTC (rev 11370)
@@ -52,7 +52,7 @@
void deploy() throws Exception;
- void deployBridge(BridgeConfiguration config) throws Exception;
+ void deployBridge(BridgeConfiguration config, boolean start) throws Exception;
void destroyBridge(String name) throws Exception;
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-19 21:54:41 UTC (rev 11369)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-19 23:47:54 UTC (rev 11370)
@@ -211,18 +211,10 @@
for (BridgeConfiguration config : configuration.getBridgeConfigurations())
{
- deployBridge(config);
+ deployBridge(config, !backup);
}
- for (Bridge bridge : bridges.values())
- {
- if (!backup)
- {
- bridge.start();
- }
- }
-
started = true;
}
@@ -379,7 +371,7 @@
this.clusterLocators.remove(serverLocator);
}
- public synchronized void deployBridge(final BridgeConfiguration config) throws Exception
+ public synchronized void deployBridge(final BridgeConfiguration config, final boolean start) throws Exception
{
if (config.getName() == null)
{
@@ -507,6 +499,11 @@
bridges.put(config.getName(), bridge);
managementService.registerBridge(bridge, config);
+
+ if (start)
+ {
+ bridge.start();
+ }
}
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-19 21:54:41 UTC (rev 11369)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-19 23:47:54 UTC (rev 11370)
@@ -1085,7 +1085,7 @@
{
if (clusterManager != null)
{
- clusterManager.deployBridge(config);
+ clusterManager.deployBridge(config, true);
}
}
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-09-19 21:54:41 UTC (rev 11369)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-09-19 23:47:54 UTC (rev 11370)
@@ -12,16 +12,9 @@
*/
package org.hornetq.tests.integration.cluster.failover;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
/**
@@ -167,6 +160,7 @@
closeSessionFactory(0);
Thread.sleep(1000);
+
servers[0].stop(true);
waitForServerRestart(2);
@@ -213,16 +207,4 @@
abstract boolean isSharedServer();
- private void fail(final RemotingConnection conn, final CountDownLatch latch) throws InterruptedException
- {
- // Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
- // Wait to be informed of failure
-
- boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
-
- Assert.assertTrue(ok);
- }
-
}
13 years, 3 months
JBoss hornetq SVN: r11369 - branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-19 17:54:41 -0400 (Mon, 19 Sep 2011)
New Revision: 11369
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
fixing tests
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-19 15:34:05 UTC (rev 11368)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-19 21:54:41 UTC (rev 11369)
@@ -1554,7 +1554,7 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
}
- ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, tcConfigs);
locator.setClusterConnection(true);
return locator;
}
@@ -1586,7 +1586,7 @@
public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
- ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, dg);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, dg);
return locator;
}
13 years, 3 months
JBoss hornetq SVN: r11368 - branches/HORNETQ-720_Replication.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-19 11:34:05 -0400 (Mon, 19 Sep 2011)
New Revision: 11368
Modified:
branches/HORNETQ-720_Replication/.gitignore
Log:
Update .gitignore file
Modified: branches/HORNETQ-720_Replication/.gitignore
===================================================================
--- branches/HORNETQ-720_Replication/.gitignore 2011-09-19 15:33:50 UTC (rev 11367)
+++ branches/HORNETQ-720_Replication/.gitignore 2011-09-19 15:34:05 UTC (rev 11368)
@@ -2,6 +2,9 @@
tests/jms-tests/data
org.eclipse.jdt.core.prefs
org.eclipse.jdt.ui.prefs
+org.eclipse.m2e.core.prefs
+*/.settings/org.eclipse.*
+*/*/.settings/org.eclipse.*
!etc/org.eclipse.jdt.ui.prefs
!etc/org.eclipse.jdt.core.prefs
org.maven.ide.eclipse.prefs
13 years, 3 months
JBoss hornetq SVN: r11367 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-19 11:33:50 -0400 (Mon, 19 Sep 2011)
New Revision: 11367
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
Log:
delete unused method
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-19 15:32:57 UTC (rev 11366)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-19 15:33:50 UTC (rev 11367)
@@ -107,29 +107,6 @@
bodySize += bytes.length;
}
- public void encodeBody(final HornetQBuffer bufferOut, final BodyEncoder context, final int size)
- {
- try
- {
- // This could maybe be optimized (maybe reading directly into bufferOut)
- ByteBuffer bufferRead = ByteBuffer.allocate(size);
-
- int bytesRead = context.encode(bufferRead);
-
- bufferRead.flip();
-
- if (bytesRead > 0)
- {
- bufferOut.writeBytes(bufferRead.array(), 0, bytesRead);
- }
-
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
@Override
public synchronized int getEncodeSize()
{
13 years, 3 months
JBoss hornetq SVN: r11366 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: persistence and 4 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-19 11:32:57 -0400 (Mon, 19 Sep 2011)
New Revision: 11366
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
HORNETQ-720 addToPage method to StorageManager interface
So that we use the storageManager lock to control the addition of data to pages.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-19 15:32:57 UTC (rev 11366)
@@ -386,28 +386,20 @@
public synchronized void stop() throws Exception
{
- lock(-1);
- try
+ if (running)
{
- if (running)
- {
- running = false;
+ running = false;
- cursorProvider.stop();
+ cursorProvider.stop();
- flushExecutors();
+ flushExecutors();
- if (currentPage != null)
- {
- currentPage.close();
- currentPage = null;
- }
+ if (currentPage != null)
+ {
+ currentPage.close();
+ currentPage = null;
}
}
- finally
- {
- unlock();
- }
}
/** Wait any pending runnable to finish its execution. */
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-19 15:32:57 UTC (rev 11366)
@@ -39,6 +39,8 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
@@ -244,4 +246,13 @@
*/
void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception;
+ /**
+ * Adds message to page if we are paging.
+ * @return whether we added the message to a page or not.
+ */
+ boolean addToPage(PagingManager pagingManager,
+ SimpleString address,
+ ServerMessage message,
+ RoutingContext ctx,
+ RouteContextList listCtx) throws Exception;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-19 15:32:57 UTC (rev 11366)
@@ -86,6 +86,8 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -162,7 +164,7 @@
public enum JournalContent
{
- BINDINGS((byte)0), MESSAGES((byte)1);
+ BINDINGS((byte)0), MESSAGES((byte)1);
public final byte typeByte;
@@ -220,11 +222,6 @@
private final boolean hasCallbackSupport;
- public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory)
- {
- this(config, executorFactory, null);
- }
-
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final ReplicationManager replicator)
@@ -3295,7 +3292,7 @@
public static final class CursorAckRecordEncoding implements EncodingSupport
{
- public CursorAckRecordEncoding(final long queueID, final PagePosition position)
+ private CursorAckRecordEncoding(final long queueID, final PagePosition position)
{
this.queueID = queueID;
this.position = position;
@@ -3629,7 +3626,7 @@
* @param buffer
* @return
*/
- protected static PersistedRoles newSecurityRecord(long id, HornetQBuffer buffer)
+ private static PersistedRoles newSecurityRecord(long id, HornetQBuffer buffer)
{
PersistedRoles roles = new PersistedRoles();
roles.decode(buffer);
@@ -3642,7 +3639,7 @@
* @param buffer
* @return
*/
- protected static PersistedAddressSetting newAddressEncoding(long id, HornetQBuffer buffer)
+ private static PersistedAddressSetting newAddressEncoding(long id, HornetQBuffer buffer)
{
PersistedAddressSetting setting = new PersistedAddressSetting();
setting.decode(buffer);
@@ -3688,7 +3685,7 @@
* @param journal
* @throws Exception
*/
- protected static void describeJournal(SequentialFileFactory fileFactory, JournalImpl journal) throws Exception
+ private static void describeJournal(SequentialFileFactory fileFactory, JournalImpl journal) throws Exception
{
List<JournalFile> files = journal.orderFiles();
@@ -3896,4 +3893,22 @@
return hasCallbackSupport;
}
+ @Override
+ public boolean addToPage(PagingManager pagingManager,
+ SimpleString address,
+ ServerMessage message,
+ RoutingContext ctx,
+ RouteContextList listCtx) throws Exception
+ {
+ readLock();
+ try
+ {
+ PagingStore store = pagingManager.getPageStore(address);
+ return store.page(message, ctx, listCtx);
+ }
+ finally
+ {
+ readUnLock();
+ }
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-19 15:32:57 UTC (rev 11366)
@@ -43,6 +43,8 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
@@ -594,4 +596,14 @@
// no-op
}
+ @Override
+ public boolean addToPage(PagingManager manager,
+ SimpleString address,
+ ServerMessage message,
+ RoutingContext ctx,
+ RouteContextList listCtx) throws Exception
+ {
+ return false;
+ }
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-19 15:32:57 UTC (rev 11366)
@@ -583,7 +583,7 @@
return;
}
-
+
if (message.hasInternalProperties())
{
// We need to perform some cleanup on internal properties,
@@ -679,7 +679,7 @@
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
ServerMessage copyRedistribute = message.copy(storageManager.generateUniqueID());
-
+
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddress());
boolean res = false;
@@ -810,7 +810,7 @@
/**
* @param message
*/
- protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
+ private void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
{
for (SimpleString name : message.getPropertyNames())
{
@@ -845,13 +845,14 @@
private class PageDelivery extends TransactionOperationAbstract
{
- private Set<Queue> queues = new HashSet<Queue>();
+ private final Set<Queue> queues = new HashSet<Queue>();
public void addQueues(List<Queue> queueList)
{
queues.addAll(queueList);
}
+ @Override
public void afterCommit(Transaction tx)
{
// We need to try delivering async after paging, or nothing may start a delivery after paging since nothing is
@@ -866,6 +867,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
*/
+ @Override
public List<MessageReference> getRelatedMessageReferences()
{
return Collections.emptyList();
@@ -881,11 +883,8 @@
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet())
{
- PagingStore store = pagingManager.getPageStore(entry.getKey());
-
- if (store.page(message, context, entry.getValue()))
+ if (storageManager.addToPage(pagingManager, entry.getKey(), message, context, entry.getValue()))
{
-
// We need to kick delivery so the Queues may check for the cursors case they are empty
schedulePageDelivery(tx, entry);
continue;
@@ -1064,7 +1063,7 @@
warnMessage.append("Duplicate message detected through the bridge - message will not be routed. Message information:\n");
warnMessage.append(message.toString());
PostOfficeImpl.log.warn(warnMessage.toString());
-
+
if (context.getTransaction() != null)
{
context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage.toString()));
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-19 15:32:57 UTC (rev 11366)
@@ -1304,10 +1304,7 @@
addressSettingsRepository);
}
- /**
- * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
- */
- protected StorageManager createStorageManager()
+ private StorageManager createStorageManager()
{
if (configuration.isPersistenceEnabled())
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-19 15:32:57 UTC (rev 11366)
@@ -107,7 +107,7 @@
private final boolean strictUpdateDeliveryCount;
- private RemotingConnection remotingConnection;
+ private final RemotingConnection remotingConnection;
private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
@@ -151,9 +151,9 @@
private OperationContext sessionContext;
// Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
- private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
+ private final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
- private long creationTime = System.currentTimeMillis();
+ private final long creationTime = System.currentTimeMillis();
// Constructors ---------------------------------------------------------------------------------
@@ -448,6 +448,7 @@
run();
}
+ @Override
public String toString()
{
return "Temporary Cleaner for queue " + bindingName;
13 years, 3 months
JBoss hornetq SVN: r11365 - in branches/HORNETQ-720_Replication/tests: integration-tests/src/test/java/org/hornetq/tests/integration/persistence and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-19 11:31:38 -0400 (Mon, 19 Sep 2011)
New Revision: 11365
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/RestartSMTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
delete constructor used only in tests
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-09-19 15:30:13 UTC (rev 11364)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-09-19 15:31:38 UTC (rev 11365)
@@ -54,9 +54,9 @@
import org.hornetq.utils.SimpleIDGenerator;
/**
- *
+ *
* A JournalImplTestBase
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
@@ -131,11 +131,11 @@
Assert.assertFalse(iterNewFiles.hasNext());
}
-
+
// public void testRepeat() throws Exception
// {
// int i = 0 ;
-//
+//
// while (true)
// {
// System.out.println("#test (" + (i++) + ")");
@@ -1746,16 +1746,16 @@
final AtomicLong seqGenerator = new AtomicLong(1);
final ExecutorService executor = Executors.newCachedThreadPool();
-
+
OrderedExecutorFactory factory = new OrderedExecutorFactory(executor);
-
+
final ExecutorService deleteExecutor = Executors.newCachedThreadPool();
- final JournalStorageManager storage = new JournalStorageManager(config, factory);
+ final JournalStorageManager storage = new JournalStorageManager(config, factory, null);
storage.start();
storage.loadInternalOnly();
-
+
((JournalImpl)storage.getMessageJournal()).setAutoReclaim(false);
final LinkedList<Long> survivingMsgs = new LinkedList<Long>();
@@ -1785,9 +1785,9 @@
storage.storeMessageTransactional(tx, message);
}
ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100);
-
+
survivingMsgs.add(message.getMessageID());
-
+
// This one will stay here forever
storage.storeMessage(message);
@@ -1874,9 +1874,9 @@
executor.shutdown();
assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
-
+
deleteExecutor.shutdown();
-
+
assertTrue(deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
storage.stop();
@@ -1894,6 +1894,7 @@
file.mkdir();
}
+ @Override
protected void tearDown() throws Exception
{
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2011-09-19 15:30:13 UTC (rev 11364)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2011-09-19 15:31:38 UTC (rev 11365)
@@ -54,20 +54,20 @@
public void testDeleteMessagesOnStartup() throws Exception
{
createStorage();
-
+
Queue theQueue = new FakeQueue(new SimpleString(""));
HashMap<Long, Queue> queues = new HashMap<Long, Queue>();
queues.put(100l, theQueue);
-
+
ServerMessage msg = new ServerMessageImpl(1, 100);
-
+
journal.storeMessage(msg);
for (int i = 2; i < 100; i++)
{
journal.storeMessage(new ServerMessageImpl(i, 100));
}
-
+
journal.storeReference(100, 1, true);
journal.stop();
@@ -75,21 +75,23 @@
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
-
+
journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
assertEquals(98, deletedMessage.size());
-
+
for (Long messageID : deletedMessage)
{
assertTrue("messageID = " + messageID, messageID.longValue() >= 2 && messageID <= 99);
}
}
+ @Override
protected JournalStorageManager createJournalStorageManager(Configuration configuration)
{
- return new JournalStorageManager(configuration, execFactory)
+ return new JournalStorageManager(configuration, execFactory, null)
{
+ @Override
public void deleteMessage(final long messageID) throws Exception
{
System.out.println("message : " + messageID);
@@ -107,6 +109,6 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
+
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/RestartSMTest.java 2011-09-19 15:30:13 UTC (rev 11364)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/RestartSMTest.java 2011-09-19 15:31:38 UTC (rev 11365)
@@ -27,7 +27,6 @@
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.ServiceTestBase;
@@ -38,7 +37,7 @@
* A DeleteMessagesRestartTest
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
+ *
* Created Mar 2, 2009 10:14:38 AM
*
*
@@ -88,7 +87,7 @@
PostOffice postOffice = new FakePostOffice();
- final JournalStorageManager journal = new JournalStorageManager(configuration, execFactory);
+ final JournalStorageManager journal = new JournalStorageManager(configuration, execFactory, null);
try
{
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2011-09-19 15:30:13 UTC (rev 11364)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2011-09-19 15:31:38 UTC (rev 11365)
@@ -49,9 +49,9 @@
protected ExecutorService executor;
protected ExecutorFactory execFactory;
-
+
protected JournalStorageManager journal;
-
+
protected JMSStorageManager jmsJournal;
@@ -72,7 +72,7 @@
execFactory = new OrderedExecutorFactory(executor);
File testdir = new File(getTestDir());
-
+
deleteDirectory(testdir);
}
@@ -80,7 +80,7 @@
protected void tearDown() throws Exception
{
executor.shutdown();
-
+
if (journal != null)
{
try
@@ -91,7 +91,7 @@
{
e.printStackTrace(); // >> junit report
}
-
+
journal = null;
}
@@ -105,13 +105,13 @@
{
e.printStackTrace(); // >> junit report
}
-
+
jmsJournal = null;
}
super.tearDown();
}
-
+
/**
* @return
* @throws Exception
@@ -125,7 +125,7 @@
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
-
+
Map<Long, Queue> queues = new HashMap<Long, Queue>();
journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
@@ -136,7 +136,7 @@
*/
protected JournalStorageManager createJournalStorageManager(Configuration configuration)
{
- return new JournalStorageManager(configuration, execFactory);
+ return new JournalStorageManager(configuration, execFactory, null);
}
/**
@@ -150,7 +150,7 @@
jmsJournal = new JMSJournalStorageManagerImpl(new TimeAndCounterIDGenerator(), configuration, null);
jmsJournal.start();
-
+
jmsJournal.load();
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-09-19 15:30:13 UTC (rev 11364)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-09-19 15:31:38 UTC (rev 11365)
@@ -397,7 +397,7 @@
*/
private JournalStorageManager getStorage()
{
- return new JournalStorageManager(createDefaultConfig(), factory);
+ return new JournalStorageManager(createDefaultConfig(), factory, null);
}
/**
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-09-19 15:30:13 UTC (rev 11364)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-09-19 15:31:38 UTC (rev 11365)
@@ -95,7 +95,7 @@
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE);
- journal = new JournalStorageManager(configuration, factory);
+ journal = new JournalStorageManager(configuration, factory, null);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@@ -120,7 +120,7 @@
journal.stop();
- journal = new JournalStorageManager(configuration, factory);
+ journal = new JournalStorageManager(configuration, factory, null);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@@ -149,7 +149,7 @@
mapDups.clear();
- journal = new JournalStorageManager(configuration, factory);
+ journal = new JournalStorageManager(configuration, factory, null);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
13 years, 3 months
JBoss hornetq SVN: r11364 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-19 11:30:13 -0400 (Mon, 19 Sep 2011)
New Revision: 11364
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
Log:
javadoc
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java 2011-09-19 13:17:33 UTC (rev 11363)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java 2011-09-19 15:30:13 UTC (rev 11364)
@@ -129,7 +129,7 @@
Collection<Integer> getCurrentIds() throws Exception;
/**
- * Sends the pages with given ids to the replicator.
+ * Sends the pages with given IDs to the {@link ReplicationManager}.
* <p>
* Sending is done here to avoid exposing the internal {@link SequentialFile}s.
* @param replicator
13 years, 3 months
JBoss hornetq SVN: r11363 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl and 5 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-19 09:17:33 -0400 (Mon, 19 Sep 2011)
New Revision: 11363
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -238,15 +238,12 @@
{
if (binding.getFilter() == null || binding.getFilter().match(message))
{
- log.error("---------------------- route to binding: " + binding);
binding.getBindable().route(message, context);
routed = true;
}
}
}
-
- log.error("-------- now routed is: " + routed);
if (!routed)
{
@@ -279,7 +276,6 @@
if (theBinding != null)
{
- log.error("------------------- route theBinding: " + theBinding + " mesage: " + message);
theBinding.route(message, context);
}
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -591,11 +591,7 @@
cleanupInternalPropertiesBeforeRouting(message);
}
- log.error("----------get address: " + address + " addressManager: " + addressManager);
-
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
-
- log.error("-------------------Bindings: " + bindings);
if (bindings != null)
{
@@ -634,7 +630,6 @@
}
else
{
- log.error("----------processing route: " + context + " direct " + direct);
processRoute(message, context, direct);
}
@@ -971,8 +966,6 @@
message.incrementRefCount();
}
}
-
- log.error("In processing, tx: " + tx);
if (tx != null)
{
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -220,7 +220,6 @@
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
- log.error("------------------------ write buffer " + connection);
connection.getTransportConnection().write(buffer, flush, batch);
}
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -72,8 +72,6 @@
public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
{
Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
-
- log.error("------------------channel sent " + channel);
channel.sendBatched(packet);
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -469,7 +469,6 @@
public void sendFrame(StompFrame frame)
{
- log.error("--------------- sending reply: " + frame);
manager.sendReply(this, frame);
}
@@ -529,9 +528,7 @@
}
try
{
- log.error("--------------------- sending mesage: " + message);
stompSession.getSession().send(message, true);
- log.error("----------------------sent by " + stompSession.getSession());
}
catch (Exception e)
{
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -579,8 +579,6 @@
data = data - pos;
// reset
-
- log.error("-------new Frame decoded: " + command + " headers " + headers + " content " + content);
StompFrame ret = new StompFrame(command, headers, content);
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -177,15 +177,11 @@
public String getEscapedKey()
{
- log.error("----------------key is : |" + key + "|");
- log.error("----------------esc'd: |" + escape(key) + "|");
return escape(key);
}
public String getEscapedValue()
{
- log.error("----------------val is : |" + val + "|");
- log.error("----------------esc'd v: |" + escape(val) + "|");
return escape(val);
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -156,6 +156,7 @@
if (sub.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL))
{
+ log.error("---------------------client-individual ack: " + id);
session.individualAcknowledge(consumerID, id);
}
else
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -95,14 +95,10 @@
{
response = onUnknown(request.getCommand());
}
-
- log.error("-------------------- handled " + request);
if (response == null)
{
response = postprocess(request);
-
- log.error("---------------postprocessed response: " + response);
}
else
{
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -153,7 +153,6 @@
@Override
public StompFrame onDisconnect(StompFrame frame)
{
- log.error("----------------- frame: " + frame);
if (this.heartBeater != null)
{
heartBeater.shutdown();
@@ -447,8 +446,6 @@
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
- log.error("-------------------- frame created: " + frame);
-
return frame;
}
@@ -456,8 +453,6 @@
@Override
public void replySent(StompFrame reply)
{
- log.error("----------------------- reply sent notified: " + reply);
-
if (reply.getCommand().equals(Stomp.Responses.CONNECTED))
{
//kick off the pinger
@@ -574,8 +569,6 @@
{
dur2 = System.currentTimeMillis() - lastAccepted.get();
- log.error("-------------------------- dur2 is " + dur2);
-
if (dur2 > (2 * serverAcceptPing))
{
connection.disconnect();
@@ -614,21 +607,17 @@
try
{
- log.error("-------------------waiting for " + waitTime);
this.wait(waitTime);
- log.error("--------------------wake up " );
}
catch (InterruptedException e)
{
}
}
- log.error("-------------------------HeartBeat thread shut down!");
}
}
public void pingAccepted()
{
- log.error("------------------------Ping accepted!");
this.lastAccepted.set(System.currentTimeMillis());
}
}
@@ -638,7 +627,6 @@
{
if (heartBeater != null)
{
- log.error("----------------------PING accepted: " + request);
heartBeater.pingAccepted();
}
}
@@ -897,8 +885,6 @@
boolean isEscaping = false;
SimpleBytes holder = new SimpleBytes(1024);
- log.error("--------------------------------- Decoding command: " + decoder.command);
-
outer: while (true)
{
byte b = decoder.workingBuffer[decoder.pos++];
@@ -990,8 +976,6 @@
}
holder.reset();
- log.error("---------- A new header decoded: " + decoder.headerName + " : " + headerValue);
-
decoder.headers.put(decoder.headerName, headerValue);
if (decoder.headerName.equals(StompDecoder.CONTENT_LENGTH_HEADER_NAME))
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -71,8 +71,6 @@
}
// Add a newline to separate the headers from the content.
head.append(Stomp.NEWLINE);
-
- log.error("------------------------_______now head: " + head);
buffer.writeBytes(head.toString().getBytes("UTF-8"));
if (bytesBody != null)
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -279,7 +279,6 @@
public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
- log.error("-------------------in queue route, context: " + context);
context.addQueue(address, this);
}
@@ -363,7 +362,6 @@
return;
}
- log.error("----------------checkingDirect " + checkDirect);
// The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
// directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
@@ -386,13 +384,10 @@
checkDirect = false;
}
- log.error("-----now direct " + direct + " directDeliver " + directDeliver );
if (direct && directDeliver && deliverDirect(ref))
{
return;
}
-
- log.error("------- ok, adding ref to the queue");
queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
@@ -401,8 +396,6 @@
directDeliver = false;
executor.execute(concurrentPoller);
-
- log.error("-----------executing : " + concurrentPoller);
}
public void deliverAsync()
@@ -1953,10 +1946,7 @@
HandleStatus status;
try
{
- log.error("-------------------Now let consumer " + consumer + " handle " + reference);
status = consumer.handle(reference);
-
- log.error("-------------- returned status: " + status);
}
catch (Throwable t)
{
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -286,7 +286,6 @@
}
else
{
- log.error("--------------------- deliver standard");
deliverStandardMessage(ref, message);
}
@@ -696,7 +695,6 @@
*/
private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
{
- log.error("------------------ calling callback " + callback + " to send message");
int packetSize = callback.sendMessage(message, id, ref.getDeliveryCount());
if (availableCredits != null)
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -72,7 +72,6 @@
String data = new String(sb.toString());
- System.out.println("---------------------------full frame is : " + data);
byte[] byteValue = data.getBytes("UTF-8");
ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -164,7 +164,6 @@
while (n >= 0)
{
- System.out.println("read " + n);
if (n > 0)
{
receiveBytes(n);
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -46,15 +46,9 @@
public ClientStompFrame createFrame(final String data)
{
- System.out.println("Data: |" + data + "|");
//split the string at "\n\n"
String[] dataFields = data.split("\n\n");
- System.out.println("DataFields[0] |" + dataFields[0]);
- if (dataFields.length > 1)
- {
- System.out.println("DataFields[1] |" + dataFields[1]);
- }
StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
String command = tokenizer.nextToken();
@@ -63,7 +57,6 @@
while (tokenizer.hasMoreTokens())
{
String header = tokenizer.nextToken();
- System.out.println("header is: " + header);
String[] fields = splitHeader(header);
frame.addHeader(fields[0], fields[1]);
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-19 04:53:40 UTC (rev 11362)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-19 13:17:33 UTC (rev 11363)
@@ -21,6 +21,7 @@
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
import junit.framework.Assert;
@@ -545,6 +546,8 @@
nack(connV11, "sub1", messageID);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//Nack makes the message be dropped.
@@ -573,6 +576,8 @@
System.out.println("Receiver error: " + error);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should be still there
@@ -601,6 +606,8 @@
System.out.println("Receiver error: " + error);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should still there
@@ -626,6 +633,8 @@
ack(connV11, "sub1", messageID);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//Nack makes the message be dropped.
@@ -654,6 +663,8 @@
System.out.println("Receiver error: " + error);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should be still there
@@ -682,6 +693,8 @@
System.out.println("Receiver error: " + error);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should still there
@@ -718,6 +731,8 @@
assertEquals("answer-me", error.getHeader("receipt-id"));
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should still there
@@ -754,6 +769,8 @@
assertEquals("answer-me", error.getHeader("receipt-id"));
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should still there
@@ -786,6 +803,8 @@
//ack the last
this.ack(connV11, "sub1", frame);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//no messages can be received.
@@ -821,6 +840,8 @@
}
}
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//no messages can be received.
@@ -852,6 +873,8 @@
assertNotNull(frame);
}
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//no messages can be received.
@@ -880,6 +903,7 @@
frame = connV11.receiveFrame();
assertNotNull(frame);
+ System.out.println(i + " == received: " + frame);
//ack on even numbers
if (i%2 == 0)
{
@@ -887,18 +911,23 @@
}
}
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
- Message message = null;
+ TextMessage message = null;
for (int i = 0; i < num/2; i++)
{
- message = consumer.receive(1000);
+ message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
+ System.out.println("Legal: " + message.getText());
}
- message = consumer.receive(1000);
+
+ message = (TextMessage) consumer.receive(1000);
+
Assert.assertNull(message);
}
@@ -908,12 +937,15 @@
String messageID = frame.getHeader("message-id");
ClientStompFrame ackFrame = connV11.createFrame("ACK");
- //give it a wrong sub id
+
ackFrame.addHeader("subscription", subId);
ackFrame.addHeader("message-id", messageID);
- ackFrame.addHeader("receipt", "answer-me");
- connV11.sendFrame(ackFrame);
+ ClientStompFrame response = connV11.sendFrame(ackFrame);
+ if (response != null)
+ {
+ throw new IOException("failed to ack " + response);
+ }
}
private void ack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
@@ -944,6 +976,14 @@
conn.sendFrame(subFrame);
}
+ private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
+ subFrame.addHeader("id", subId);
+
+ conn.sendFrame(subFrame);
+ }
+
}
13 years, 3 months
JBoss hornetq SVN: r11362 - in branches/STOMP11: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11 and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-19 00:53:40 -0400 (Mon, 19 Sep 2011)
New Revision: 11362
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-17 05:43:39 UTC (rev 11361)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-19 04:53:40 UTC (rev 11362)
@@ -86,8 +86,6 @@
StompFrame frame = connection.createStompMessage(serverMessage, subscription, deliveryCount);
- log.error("--------------lllll- Sending frame: " + frame);
-
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -155,7 +153,15 @@
throw new HornetQStompException("subscription id " + subscriptionID + " does not match " + sub.getID());
}
}
- session.acknowledge(consumerID, id);
+
+ if (sub.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL))
+ {
+ session.individualAcknowledge(consumerID, id);
+ }
+ else
+ {
+ session.acknowledge(consumerID, id);
+ }
session.commit();
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-17 05:43:39 UTC (rev 11361)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-19 04:53:40 UTC (rev 11362)
@@ -761,7 +761,161 @@
Message message = consumer.receive(1000);
Assert.assertNotNull(message);
}
+
+ public void testAckModeClient() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ subscribe(connV11, "sub1", "client");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+ }
+
+ //ack the last
+ this.ack(connV11, "sub1", frame);
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeClient2() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+
+ //ack the 49th
+ if (i == num - 2)
+ {
+ this.ack(connV11, "sub1", frame);
+ }
+ }
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeAuto() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "auto");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("auto-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+ }
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeClientIndividual() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client-individual");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-individual-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+
+ //ack on even numbers
+ if (i%2 == 0)
+ {
+ this.ack(connV11, "sub1", frame);
+ }
+ }
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = null;
+ for (int i = 0; i < num/2; i++)
+ {
+ message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+ message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ private void ack(StompClientConnection connV112, String subId,
+ ClientStompFrame frame) throws IOException, InterruptedException
+ {
+ String messageID = frame.getHeader("message-id");
+
+ ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ //give it a wrong sub id
+ ackFrame.addHeader("subscription", subId);
+ ackFrame.addHeader("message-id", messageID);
+ ackFrame.addHeader("receipt", "answer-me");
+
+ connV11.sendFrame(ackFrame);
+ }
+
private void ack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
{
ClientStompFrame ackFrame = conn.createFrame("ACK");
13 years, 3 months
JBoss hornetq SVN: r11361 - branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-17 01:43:39 -0400 (Sat, 17 Sep 2011)
New Revision: 11361
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
cluster cleanup
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-17 03:38:54 UTC (rev 11360)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-17 05:43:39 UTC (rev 11361)
@@ -182,11 +182,6 @@
deployClusterConnection(config);
}
}
-
- for (BridgeConfiguration config : configuration.getBridgeConfigurations())
- {
- deployBridge(config);
- }
}
public synchronized void start() throws Exception
@@ -214,6 +209,11 @@
}
}
+ for (BridgeConfiguration config : configuration.getBridgeConfigurations())
+ {
+ deployBridge(config);
+ }
+
for (Bridge bridge : bridges.values())
{
if (!backup)
13 years, 3 months