[hornetq-commits] JBoss hornetq SVN: r8126 - in branches/hornetq_grouping: src/main/org/hornetq/core/config/impl and 15 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Oct 19 07:30:50 EDT 2009
Author: ataylor
Date: 2009-10-19 07:30:49 -0400 (Mon, 19 Oct 2009)
New Revision: 8126
Added:
branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java
branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
some refactoring
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -130,9 +130,9 @@
void setDiscoveryGroupConfigurations(Map<String, DiscoveryGroupConfiguration> configs);
- List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations();
+ GroupingHandlerConfiguration getGroupingHandlerConfiguration();
- void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration);
+ void setGroupingHandlerConfiguration(GroupingHandlerConfiguration groupingHandlerConfiguration);
List<BridgeConfiguration> getBridgeConfigurations();
@@ -309,5 +309,4 @@
void setMessageExpiryThreadPriority(int messageExpiryThreadPriority);
-
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -230,7 +230,7 @@
protected Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations = new LinkedHashMap<String, DiscoveryGroupConfiguration>();
- protected List<GroupingHandlerConfiguration> groupingHandlerConfiguration = new ArrayList<GroupingHandlerConfiguration>();
+ protected GroupingHandlerConfiguration groupingHandlerConfiguration;
// Paging related attributes ------------------------------------------------------------
@@ -484,12 +484,12 @@
this.backupConnectorName = backupConnectorName;
}
- public List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations()
+ public GroupingHandlerConfiguration getGroupingHandlerConfiguration()
{
return groupingHandlerConfiguration;
}
- public void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration)
+ public void setGroupingHandlerConfiguration(GroupingHandlerConfiguration groupingHandlerConfiguration)
{
this.groupingHandlerConfiguration = groupingHandlerConfiguration;
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -577,19 +577,16 @@
}
private void parseGroupingHandlerConfiguration(final Element node)
- {
- String name = node.getAttribute("name");
- String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
- String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
- Integer timeout = getInteger(node, "timeout", GroupingHandlerConfiguration.DEFAULT_TIMEOUT, GT_ZERO);
- GroupingHandlerConfiguration arbitratorConfiguration =
- new GroupingHandlerConfiguration(new SimpleString(name),
- type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())? GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
- new SimpleString(address),
- timeout);
- System.out.println("arbitratorConfiguration = " + arbitratorConfiguration);
- groupingHandlerConfiguration.add(arbitratorConfiguration);
- }
+ {
+ String name = node.getAttribute("name");
+ String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
+ String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
+ Integer timeout = getInteger(node, "timeout", GroupingHandlerConfiguration.DEFAULT_TIMEOUT, GT_ZERO);
+ groupingHandlerConfiguration = new GroupingHandlerConfiguration(new SimpleString(name),
+ type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())? GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
+ new SimpleString(address),
+ timeout);
+ }
private void parseBridgeConfiguration(final Element brNode)
Added: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/GroupingInfo.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.persistence;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * Created Oct 18, 2009
+ */
+public interface GroupingInfo
+{
+ public SimpleString getClusterName();
+
+ public SimpleString getGroupId();
+
+ public long getId();
+}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -111,5 +112,11 @@
void deleteQueueBinding(long queueBindingID) throws Exception;
- void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos) throws Exception;
+ void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception;
+
+ //grouping relateed operations
+ void addGrouping(GroupBinding groupBinding) throws Exception;
+
+
+ void deleteGrouping(GroupBinding groupBinding) throws Exception;
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -50,6 +50,7 @@
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.remoting.impl.wireformat.XidCodecSupport;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -58,6 +59,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
@@ -85,6 +87,8 @@
private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
+ //grouping journal record type
+ public static final byte GROUP_RECORD = 41;
// Bindings journal record type
public static final byte QUEUE_BINDING_RECORD = 21;
@@ -988,7 +992,19 @@
resourceManager.putTransaction(xid, tx);
}
}
+ //grouping handler operations
+ public void addGrouping(GroupBinding groupBinding) throws Exception
+ {
+ GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(), groupBinding.getGroupId(), groupBinding.getClusterName());
+ bindingsJournal.appendAddRecord(groupBinding.getId(), GROUP_RECORD, groupingEncoding, true);
+ }
+
+ public void deleteGrouping(GroupBinding groupBinding) throws Exception
+ {
+ bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
+ }
+
// Bindings operations
public void addQueueBinding(final Binding binding) throws Exception
@@ -1011,7 +1027,7 @@
bindingsJournal.appendDeleteRecord(queueBindingID, true);
}
- public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
+ public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, final List<GroupingInfo> groupingInfos) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -1045,6 +1061,13 @@
persistentID = encoding.uuid;
}
+ else if(rec == GROUP_RECORD)
+ {
+ GroupingEncoding encoding = new GroupingEncoding();
+ encoding.decode(buffer);
+ encoding.setId(id);
+ groupingInfos.add(encoding);
+ }
else if (rec == BatchingIDGenerator.ID_COUNTER_RECORD)
{
idGenerator.loadState(record.id, buffer);
@@ -1259,6 +1282,63 @@
}
}
+ private static class GroupingEncoding implements EncodingSupport, GroupingInfo
+ {
+ long id;
+
+ SimpleString groupId;
+
+ SimpleString clusterName;
+
+ public GroupingEncoding(long id, SimpleString groupId, SimpleString clusterName)
+ {
+ this.id = id;
+ this.groupId = groupId;
+ this.clusterName = clusterName;
+ }
+
+ public GroupingEncoding()
+ {
+ }
+
+ public int getEncodeSize()
+ {
+ return SimpleString.sizeofString(groupId) + SimpleString.sizeofString(clusterName);
+ }
+
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeSimpleString(groupId);
+ buffer.writeSimpleString(clusterName);
+ }
+
+ public void decode(HornetQBuffer buffer)
+ {
+ groupId = buffer.readSimpleString();
+ clusterName = buffer.readSimpleString();
+ }
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public void setId(long id)
+ {
+ this.id = id;
+ }
+
+ public SimpleString getGroupId()
+ {
+ return groupId;
+ }
+
+ public SimpleString getClusterName()
+ {
+ return clusterName;
+ }
+ }
+
private static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
{
long id;
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -24,11 +24,13 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -74,7 +76,7 @@
{
}
- public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
+ public void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
{
}
@@ -247,4 +249,14 @@
{
}
+ public void addGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void deleteGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -64,8 +64,4 @@
void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
Object getNotificationLock();
-
- void setGroupingHandler(GroupingHandler groupingHandler);
-
- GroupingHandler getGroupingHandler();
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -28,7 +28,6 @@
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
-import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
@@ -60,11 +59,11 @@
private volatile boolean routeWhenNoConsumers;
- private final PostOffice postOffice;
+ private final GroupingHandler groupingHandler;
- public BindingsImpl(PostOffice postOffice)
+ public BindingsImpl(GroupingHandler groupingHandler)
{
- this.postOffice = postOffice;
+ this.groupingHandler = groupingHandler;
}
public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
@@ -278,15 +277,13 @@
if (!routed)
{
- GroupingHandler groupingGroupingHandler = postOffice.getGroupingHandler();
-
if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
{
routed = routeFromCluster(message, tx);
}
- else if (groupingGroupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID) != null)
+ else if (groupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID) != null)
{
- routeUsingStrictOrdering(message, tx, groupingGroupingHandler);
+ routeUsingStrictOrdering(message, tx, groupingHandler);
}
else
{
@@ -462,12 +459,12 @@
resp = groupingGroupingHandler.propose(new Proposal(fullID, chosen.getClusterName()));
- if (resp.getAlternative() != null)
+ if (resp.getAlternativeClusterName() != null)
{
chosen = null;
for (Binding binding : bindings)
{
- if (binding.getClusterName().equals(resp.getAlternative()))
+ if (binding.getClusterName().equals(resp.getAlternativeClusterName()))
{
chosen = binding;
break;
@@ -483,7 +480,7 @@
}
else
{
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosenClusterName() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
}
}
else
@@ -491,7 +488,7 @@
Binding chosen = null;
for (Binding binding : bindings)
{
- if (binding.getClusterName().equals(resp.getChosen()))
+ if (binding.getClusterName().equals(resp.getChosenClusterName()))
{
chosen = binding;
break;
@@ -505,7 +502,7 @@
}
else
{
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosenClusterName() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
}
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -111,7 +111,7 @@
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
- private GroupingHandler groupingGroupingHandler;
+ private final HornetQServer server;
public PostOfficeImpl(final HornetQServer server,
final StorageManager storageManager,
@@ -124,7 +124,7 @@
final int idCacheSize,
final boolean persistIDCache,
final ExecutorFactory orderedExecutorFactory,
- HierarchicalRepository<AddressSettings> addressSettingsRepository)
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository)
{
this.storageManager = storageManager;
@@ -155,6 +155,8 @@
this.redistributorExecutorFactory = orderedExecutorFactory;
this.addressSettingsRepository = addressSettingsRepository;
+
+ this.server = server;
}
// HornetQComponent implementation ---------------------------------------
@@ -694,18 +696,6 @@
return notificationLock;
}
-
- public void setGroupingHandler(GroupingHandler groupingHandler)
- {
- groupingGroupingHandler = groupingHandler;
- managementService.addNotificationListener(groupingGroupingHandler);
- }
-
- public GroupingHandler getGroupingHandler()
- {
- return groupingGroupingHandler;
- }
-
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
{
// We send direct to the queue so we can send it to the same queue that is bound to the notifications adress -
@@ -1047,6 +1037,6 @@
public Bindings createBindings()
{
- return new BindingsImpl(this);
+ return new BindingsImpl(server.getGroupingHandler());
}
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/HornetQServer.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -15,6 +15,7 @@
import java.util.List;
import java.util.Set;
+import java.nio.channels.DatagramChannel;
import javax.management.MBeanServer;
@@ -30,6 +31,7 @@
import org.hornetq.core.security.HornetQSecurityManager;
import org.hornetq.core.security.Role;
import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
@@ -126,4 +128,8 @@
void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
ExecutorFactory getExecutorFactory();
+
+ void setGroupingHandler(GroupingHandler groupingHandler);
+
+ GroupingHandler getGroupingHandler();
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -554,10 +554,10 @@
}
SimpleString val = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
- Response response = postOffice.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
+ Response response = server.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
if(response != null)
{
- postOffice.getGroupingHandler().send(response, 0);
+ server.getGroupingHandler().send(response, 0);
}
}
@@ -572,8 +572,8 @@
SimpleString alt = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE);
Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
Response response = new Response(type, val, alt);
- postOffice.getGroupingHandler().proposed(response);
- postOffice.getGroupingHandler().send(response, hops + 1);
+ server.getGroupingHandler().proposed(response);
+ server.getGroupingHandler().send(response, hops + 1);
}
private synchronized void clearBindings() throws Exception
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -37,7 +37,6 @@
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.remoting.Channel;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
@@ -149,11 +148,6 @@
{
deployClusterConnection(config);
}
-
- for (GroupingHandlerConfiguration config : configuration.getGroupingHandlerConfigurations())
- {
- deployGroupingHandlerConfigurations(config);
- }
started = true;
}
@@ -486,21 +480,6 @@
bridge.start();
}
- private synchronized void deployGroupingHandlerConfigurations(final GroupingHandlerConfiguration config) throws Exception
- {
- GroupingHandler groupingHandler;
- if (config.getType() == GroupingHandlerConfiguration.TYPE.LOCAL)
- {
- groupingHandler = new LocalGroupingHandler(managementService, config.getName(), config.getAddress());
- }
- else
- {
- groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress(), config.getTimeout());
- }
- log.info("deploying grouping handler: " + groupingHandler);
- postOffice.setGroupingHandler(groupingHandler);
- }
-
private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
{
if (config.getName() == null)
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -15,6 +15,7 @@
import org.hornetq.utils.SimpleString;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.management.NotificationListener;
import org.hornetq.core.management.Notification;
@@ -33,5 +34,5 @@
Response receive(Proposal proposal, int distance) throws Exception;
- void onNotification(Notification notification);
+ void addGroupBinding(GroupBinding groupBinding);
}
Added: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupBinding.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * Created Oct 19, 2009
+ */
+public class GroupBinding
+{
+ private long id;
+
+ private final SimpleString groupId;
+
+ private final SimpleString clusterName;
+
+ public GroupBinding(SimpleString groupId, SimpleString clusterName)
+ {
+ this.groupId = groupId;
+ this.clusterName = clusterName;
+ }
+
+ public GroupBinding(long id, SimpleString groupId, SimpleString clusterName)
+ {
+ this.id = id;
+ this.groupId = groupId;
+ this.clusterName = clusterName;
+ }
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public void setId(long id)
+ {
+ this.id = id;
+ }
+
+ public SimpleString getGroupId()
+ {
+ return groupId;
+ }
+
+ public SimpleString getClusterName()
+ {
+ return clusterName;
+ }
+}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -19,13 +19,11 @@
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
-import org.hornetq.utils.ConcurrentHashSet;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.HashMap;
/**
@@ -35,21 +33,23 @@
{
private static Logger log = Logger.getLogger(LocalGroupingHandler.class);
- private ConcurrentHashMap<SimpleString, SimpleString> map = new ConcurrentHashMap<SimpleString, SimpleString>();
+ private ConcurrentHashMap<SimpleString, GroupBinding> map = new ConcurrentHashMap<SimpleString, GroupBinding>();
- private HashMap<SimpleString, SimpleString> groupMap = new HashMap<SimpleString, SimpleString>();
+ private HashMap<SimpleString, GroupBinding> groupMap = new HashMap<SimpleString, GroupBinding>();
private final SimpleString name;
private final ManagementService managementService;
private SimpleString address;
+ private StorageManager storageManager;
- public LocalGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address)
+ public LocalGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address, StorageManager storageManager)
{
this.managementService = managementService;
this.name = name;
this.address = address;
+ this.storageManager = storageManager;
}
public SimpleString getName()
@@ -60,20 +60,23 @@
public Response propose(Proposal proposal) throws Exception
{
- if(proposal.getProposal() == null)
+ if(proposal.getClusterName() == null)
{
- SimpleString original = map.get(proposal.getProposalType());
- return original == null?null:new Response(proposal.getProposalType(), original);
+ GroupBinding original = map.get(proposal.getGroupId());
+ return original == null?null:new Response(proposal.getGroupId(), original.getClusterName());
}
- Response response = new Response(proposal.getProposalType(), proposal.getProposal());
- if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
+ GroupBinding groupBinding = new GroupBinding(proposal.getGroupId(), proposal.getClusterName());
+ if (map.putIfAbsent(groupBinding.getGroupId(), groupBinding) == null)
{
- groupMap.put(response.getChosen(), response.getResponseType());
- return response;
+ groupBinding.setId(storageManager.generateUniqueID());
+ groupMap.put(groupBinding.getClusterName(), groupBinding);
+ storageManager.addGrouping(groupBinding);
+ return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
}
else
{
- return new Response(proposal.getProposalType(), proposal.getProposal(), map.get(proposal.getProposalType()));
+ groupBinding = map.get(proposal.getGroupId());
+ return new Response(groupBinding.getGroupId(), proposal.getClusterName(), groupBinding.getClusterName());
}
}
@@ -84,9 +87,9 @@
public void send(Response response, int distance) throws Exception
{
TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, response.getOriginal());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, response.getAlternative());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getGroupId());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, response.getClusterName());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, response.getAlternativeClusterName());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -99,16 +102,30 @@
return propose(proposal);
}
+ public void addGroupBinding(GroupBinding groupBinding)
+ {
+ map.put(groupBinding.getGroupId(), groupBinding);
+ groupMap.put(groupBinding.getClusterName(), groupBinding);
+ }
+
public void onNotification(Notification notification)
{
if(notification.getType() == NotificationType.BINDING_REMOVED)
{
SimpleString clusterName = (SimpleString) notification.getProperties().getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- SimpleString val = groupMap.get(clusterName);
+ GroupBinding val = groupMap.get(clusterName);
if(val != null)
{
groupMap.remove(clusterName);
- map.remove(val);
+ map.remove(val.getGroupId());
+ try
+ {
+ storageManager.deleteGrouping(val);
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to delete group binding info " + val.getGroupId(), e);
+ }
}
}
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -19,26 +19,26 @@
*/
public class Proposal
{
- private final SimpleString proposalType;
- private final SimpleString proposal;
+ private final SimpleString groupId;
+ private final SimpleString clusterName;
public static final String PROPOSAL_TYPE_HEADER = "_JBM_PROPOSAL_TYPE";
public static final String PROPOSAL_HEADER = "_JBM_PROPOSAL";
- public Proposal(SimpleString proposalType, SimpleString proposal)
+ public Proposal(SimpleString groupId, SimpleString clusterName)
{
- this.proposal = proposal;
- this.proposalType = proposalType;
+ this.clusterName = clusterName;
+ this.groupId = groupId;
}
- public SimpleString getProposalType()
+ public SimpleString getGroupId()
{
- return proposalType;
+ return groupId;
}
- public SimpleString getProposal()
+ public SimpleString getClusterName()
{
- return proposal;
+ return clusterName;
}
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -67,12 +67,12 @@
public Response propose(final Proposal proposal) throws Exception
{
- Response response = responses.get(proposal.getProposalType());
+ Response response = responses.get(proposal.getGroupId());
if( response != null)
{
return response;
}
- if (proposal.getProposal() == null)
+ if (proposal.getClusterName() == null)
{
return null;
}
@@ -80,15 +80,15 @@
{
lock.lock();
TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getProposal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getGroupId());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getClusterName());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
managementService.sendNotification(notification);
sendCondition.await(timeout, TimeUnit.MILLISECONDS);
- response = responses.get(proposal.getProposalType());
+ response = responses.get(proposal.getGroupId());
}
finally
{
@@ -96,7 +96,7 @@
}
if(response == null)
{
- throw new IllegalStateException("no response received from group handler for " + proposal.getProposalType());
+ throw new IllegalStateException("no response received from group handler for " + proposal.getGroupId());
}
return response;
}
@@ -106,8 +106,8 @@
try
{
lock.lock();
- responses.put(response.getResponseType(), response);
- groupMap.put(response.getChosen(), response.getResponseType());
+ responses.put(response.getGroupId(), response);
+ groupMap.put(response.getChosenClusterName(), response.getGroupId());
sendCondition.signal();
}
finally
@@ -119,8 +119,8 @@
public Response receive(Proposal proposal, int distance) throws Exception
{
TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getProposal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getGroupId());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, proposal.getClusterName());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
@@ -133,6 +133,11 @@
{
}
+ public void addGroupBinding(GroupBinding groupBinding)
+ {
+
+ }
+
public void onNotification(Notification notification)
{
if(notification.getType() == NotificationType.BINDING_REMOVED)
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -21,23 +21,23 @@
{
private final boolean accepted;
- private final SimpleString original;
+ private final SimpleString clusterName;
- private final SimpleString alternative;
+ private final SimpleString alternativeClusterName;
- private SimpleString responseType;
+ private SimpleString groupId;
- public Response(SimpleString responseType, SimpleString original)
+ public Response(SimpleString groupId, SimpleString clusterName)
{
- this(responseType, original, null);
+ this(groupId, clusterName, null);
}
- public Response(SimpleString responseType, SimpleString original, SimpleString alternative)
+ public Response(SimpleString groupId, SimpleString clusterName, SimpleString alternativeClusterName)
{
- this.responseType = responseType;
- this.accepted = alternative == null;
- this.original = original;
- this.alternative = alternative;
+ this.groupId = groupId;
+ this.accepted = alternativeClusterName == null;
+ this.clusterName = clusterName;
+ this.alternativeClusterName = alternativeClusterName;
}
public boolean isAccepted()
@@ -45,29 +45,29 @@
return accepted;
}
- public SimpleString getOriginal()
+ public SimpleString getClusterName()
{
- return original;
+ return clusterName;
}
- public SimpleString getAlternative()
+ public SimpleString getAlternativeClusterName()
{
- return alternative;
+ return alternativeClusterName;
}
- public SimpleString getChosen()
+ public SimpleString getChosenClusterName()
{
- return alternative != null?alternative:original;
+ return alternativeClusterName != null? alternativeClusterName : clusterName;
}
@Override
public String toString()
{
- return "accepted = " + accepted + " original = " + original + " alternative = " + alternative;
+ return "accepted = " + accepted + " clusterName = " + clusterName + " alternativeClusterName = " + alternativeClusterName;
}
- public SimpleString getResponseType()
+ public SimpleString getGroupId()
{
- return responseType;
+ return groupId;
}
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -57,6 +57,7 @@
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
@@ -84,6 +85,8 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.group.impl.*;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
@@ -194,6 +197,8 @@
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
+ private GroupingHandler groupingHandler;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -309,6 +314,11 @@
clusterManager.stop();
}
+ if(groupingHandler != null)
+ {
+ managementService.removeNotificationListener(groupingHandler);
+ groupingHandler = null;
+ }
// Need to flush all sessions to make sure all confirmations get sent back to client
for (ServerSession session : sessions.values())
@@ -864,6 +874,17 @@
return executorFactory;
}
+ public void setGroupingHandler(GroupingHandler groupingHandler)
+ {
+ this.groupingHandler = groupingHandler;
+ // managementService.addNotificationListener(groupingHandler);
+ }
+
+ public GroupingHandler getGroupingHandler()
+ {
+ return groupingHandler;
+ }
+
// Public
// ---------------------------------------------------------------------------------------
@@ -1114,6 +1135,8 @@
clusterManager.start();
}
+ deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
+
if (deploymentManager != null)
{
deploymentManager.start();
@@ -1154,8 +1177,10 @@
{
List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
- storageManager.loadBindingJournal(queueBindingInfos);
+ List<GroupingInfo> groupingInfos = new ArrayList<GroupingInfo>();
+ storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
+
// Set the node id - must be before we load the queues into the postoffice, but after we load the journal
setNodeID();
@@ -1187,6 +1212,14 @@
managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
}
+ for (GroupingInfo groupingInfo : groupingInfos)
+ {
+ if(groupingHandler != null)
+ {
+ groupingHandler.addGroupBinding(new GroupBinding(groupingInfo.getId(), groupingInfo.getGroupId(), groupingInfo.getClusterName()));
+ }
+ }
+
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
storageManager.loadMessageJournal(pagingManager, resourceManager, queues, duplicateIDMap);
@@ -1348,6 +1381,25 @@
}
}
+ private synchronized void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception
+ {
+ if (config != null)
+ {
+ GroupingHandler groupingHandler;
+ if (config.getType() == GroupingHandlerConfiguration.TYPE.LOCAL)
+ {
+ groupingHandler = new LocalGroupingHandler(managementService, config.getName(), config.getAddress(), getStorageManager());
+ }
+ else
+ {
+ groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress(), config.getTimeout());
+ }
+ log.info("deploying grouping handler: " + groupingHandler);
+ this.groupingHandler = groupingHandler;
+ managementService.addNotificationListener(groupingHandler);
+ }
+ }
+
private Transformer instantiateTransformer(final String transformerClassName)
{
Transformer transformer = null;
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -45,8 +45,6 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.server.group.impl.LocalGroupingHandler;
-import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
@@ -60,25 +58,23 @@
* A ClusterTestBase
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 30 Jan 2009 11:29:43
- *
- *
+ * <p/>
+ * Created 30 Jan 2009 11:29:43
*/
public class ClusterTestBase extends ServiceTestBase
{
private static final Logger log = Logger.getLogger(ClusterTestBase.class);
private static final int[] PORTS = {TransportConstants.DEFAULT_PORT,
- TransportConstants.DEFAULT_PORT + 1,
- TransportConstants.DEFAULT_PORT + 2,
- TransportConstants.DEFAULT_PORT + 3,
- TransportConstants.DEFAULT_PORT + 4,
- TransportConstants.DEFAULT_PORT + 5,
- TransportConstants.DEFAULT_PORT + 6,
- TransportConstants.DEFAULT_PORT + 7,
- TransportConstants.DEFAULT_PORT + 8,
- TransportConstants.DEFAULT_PORT + 9,
+ TransportConstants.DEFAULT_PORT + 1,
+ TransportConstants.DEFAULT_PORT + 2,
+ TransportConstants.DEFAULT_PORT + 3,
+ TransportConstants.DEFAULT_PORT + 4,
+ TransportConstants.DEFAULT_PORT + 5,
+ TransportConstants.DEFAULT_PORT + 6,
+ TransportConstants.DEFAULT_PORT + 7,
+ TransportConstants.DEFAULT_PORT + 8,
+ TransportConstants.DEFAULT_PORT + 9,
};
private static final long WAIT_TIMEOUT = 10000;
@@ -89,21 +85,21 @@
super.setUp();
checkFreePort(PORTS);
-
+
clearData();
}
-
+
@Override
protected void tearDown() throws Exception
{
checkFreePort(PORTS);
-
+
servers = null;
sfs = null;
-
+
consumers = null;
-
+
consumers = new ConsumerHolder[MAX_CONSUMERS];
super.tearDown();
@@ -173,8 +169,8 @@
//System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount +
- ", expecting = " +
- count);
+ ", expecting = " +
+ count);
}
protected void waitForBindings(int node,
@@ -219,7 +215,7 @@
{
if ((binding instanceof LocalQueueBinding && local) || (binding instanceof RemoteQueueBinding && !local))
{
- QueueBinding qBinding = (QueueBinding)binding;
+ QueueBinding qBinding = (QueueBinding) binding;
bindingCount++;
@@ -242,8 +238,8 @@
// System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
String msg = "Timed out waiting for bindings (bindingCount = " + bindingCount +
- ", totConsumers = " +
- totConsumers;
+ ", totConsumers = " +
+ totConsumers;
log.error(msg);
@@ -424,7 +420,7 @@
{
sendInRange(node, address, 0, numMessages, durable, key, val);
}
-
+
protected void sendInRange(int node, String address, int msgStart, int msgEnd, boolean durable, SimpleString key, SimpleString val) throws Exception
{
ClientSessionFactory sf = this.sfs[node];
@@ -450,28 +446,20 @@
session.close();
}
- protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node)
+ protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node)
{
setUpGroupHandler(type, node, 5000);
}
- protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node, int timeout)
+ protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node, int timeout)
{
- GroupingHandler groupingHandler;
- if(type == GroupingHandlerConfiguration.TYPE.LOCAL)
- {
- groupingHandler = new LocalGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
- }
- else
- {
- groupingHandler = new RemoteGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), timeout);
- }
- this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
+ this.servers[node].getConfiguration().setGroupingHandlerConfiguration(
+ new GroupingHandlerConfiguration(new SimpleString("grouparbitrator"), type, new SimpleString("queues"), timeout));
}
- protected void setUpGroupHandler(GroupingHandler groupingHandler, int node)
+ protected void setUpGroupHandler(GroupingHandler groupingHandler, int node)
{
- this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
+ this.servers[node].setGroupingHandler(groupingHandler);
}
protected void send(int node, String address, int numMessages, boolean durable, String filterVal) throws Exception
@@ -490,25 +478,25 @@
}
protected void verifyReceiveAllWithGroupIDRoundRobin(
- int msgStart,
- int msgEnd,
- int... consumerIDs) throws Exception
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
{
- verifyReceiveAllWithGroupIDRoundRobin(true, -1, msgStart, msgEnd, consumerIDs);
+ verifyReceiveAllWithGroupIDRoundRobin(true, -1, msgStart, msgEnd, consumerIDs);
}
protected int verifyReceiveAllOnSingleConsumer(int msgStart,
- int msgEnd,
- int... consumerIDs) throws Exception
+ int msgEnd,
+ int... consumerIDs) throws Exception
{
- return verifyReceiveAllOnSingleConsumer(true, msgStart, msgEnd, consumerIDs);
+ return verifyReceiveAllOnSingleConsumer(true, msgStart, msgEnd, consumerIDs);
}
protected void verifyReceiveAllWithGroupIDRoundRobin(boolean ack,
- long firstReceiveTime,
- int msgStart,
- int msgEnd,
- int... consumerIDs) throws Exception
+ long firstReceiveTime,
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
{
HashMap<SimpleString, Integer> groupIdsReceived = new HashMap<SimpleString, Integer>();
for (int i = 0; i < consumerIDs.length; i++)
@@ -544,8 +532,8 @@
}
SimpleString id = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
- System.out.println("received " + id + " on consumer " + i);
- if(groupIdsReceived.get(id) == null)
+ System.out.println("received " + id + " on consumer " + i);
+ if (groupIdsReceived.get(id) == null)
{
groupIdsReceived.put(id, i);
}
@@ -562,9 +550,9 @@
}
protected int verifyReceiveAllOnSingleConsumer(boolean ack,
- int msgStart,
- int msgEnd,
- int... consumerIDs) throws Exception
+ int msgStart,
+ int msgEnd,
+ int... consumerIDs) throws Exception
{
int groupIdsReceived = -1;
for (int i = 0; i < consumerIDs.length; i++)
@@ -628,7 +616,7 @@
assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
}
-
+
if (ack)
{
message.acknowledge();
@@ -639,7 +627,7 @@
assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
}
- if (j != (Integer)(message.getProperty(COUNT_PROP)))
+ if (j != (Integer) (message.getProperty(COUNT_PROP)))
{
outOfOrder = true;
System.out.println("Message j=" + j + " was received out of order = " + message.getProperty(COUNT_PROP));
@@ -697,8 +685,8 @@
if (message != null)
{
log.info("check receive Consumer " + consumerIDs[i] +
- " received message " +
- message.getProperty(COUNT_PROP));
+ " received message " +
+ message.getProperty(COUNT_PROP));
}
else
{
@@ -769,7 +757,7 @@
if (message != null)
{
- int count = (Integer)message.getProperty(COUNT_PROP);
+ int count = (Integer) message.getProperty(COUNT_PROP);
Integer prevCount = countMap.get(i);
@@ -793,7 +781,7 @@
}
else
{
- // log.info("consumer " + consumerIDs[i] +" returns null");
+ // log.info("consumer " + consumerIDs[i] +" returns null");
}
}
while (message != null);
@@ -831,7 +819,7 @@
if (message != null)
{
- int count = (Integer)message.getProperty(COUNT_PROP);
+ int count = (Integer) message.getProperty(COUNT_PROP);
// log.info("consumer " + consumerIDs[i] + " received message " + count);
@@ -879,7 +867,7 @@
assertNotNull(list);
- int elem = (Integer)list.poll();
+ int elem = (Integer) list.poll();
assertEquals(messageCounts[i], elem);
@@ -919,7 +907,7 @@
if (message != null)
{
- int count = (Integer)message.getProperty(COUNT_PROP);
+ int count = (Integer) message.getProperty(COUNT_PROP);
ints.add(count);
}
@@ -1015,7 +1003,7 @@
}
ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc, serverBackuptc);
-
+
sf.setFailoverOnServerShutdown(false);
sf.setRetryInterval(100);
sf.setRetryIntervalMultiplier(1d);
@@ -1127,6 +1115,78 @@
servers[node] = server;
}
+ protected void setupSharedStorageServer(int node, boolean fileStorage, boolean netty, int backupNode)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
+
+ Configuration configuration = new ConfigurationImpl();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(backupNode, false));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxAIO(1000);
+ configuration.setJournalDirectory(getJournalDir(backupNode, false));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(JournalType.ASYNCIO);
+ configuration.setJournalMaxAIO(1000);
+ configuration.setPagingDirectory(getPageDir(backupNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(backupNode, false));
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+ configuration.setBackup(true);
+ configuration.setSharedStore(true);
+
+ if (backupNode != -1)
+ {
+ Map<String, Object> backupParams = generateParams(backupNode, netty);
+
+ if (netty)
+ {
+ TransportConfiguration nettyBackuptc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, backupParams);
+
+ configuration.getConnectorConfigurations().put(nettyBackuptc.getName(), nettyBackuptc);
+
+ configuration.setBackupConnectorName(nettyBackuptc.getName());
+ }
+ else
+ {
+ TransportConfiguration invmBackuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams);
+
+ configuration.getConnectorConfigurations().put(invmBackuptc.getName(), invmBackuptc);
+
+ configuration.setBackupConnectorName(invmBackuptc.getName());
+ }
+ }
+
+ configuration.getAcceptorConfigurations().clear();
+
+ Map<String, Object> params = generateParams(node, netty);
+
+ TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(invmtc);
+
+ if (netty)
+ {
+ TransportConfiguration nettytc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(nettytc);
+ }
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
+ server = HornetQ.newHornetQServer(configuration);
+ }
+ else
+ {
+ server = HornetQ.newHornetQServer(configuration, false);
+ }
+ servers[node] = server;
+ }
+
protected void setupServerWithDiscovery(int node,
String groupAddress,
int port,
@@ -1223,21 +1283,21 @@
configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
connectorPairs.add(new Pair<String, String>(nettytc_c.getName(),
- nettyBackuptc == null ? null : nettyBackuptc.getName()));
+ nettyBackuptc == null ? null : nettyBackuptc.getName()));
}
else
{
connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), invmBackuptc == null ? null
- : invmBackuptc.getName()));
+ : invmBackuptc.getName()));
}
BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
- null,
- -1,
- groupAddress,
- port,
- 250,
- connectorPairs);
+ null,
+ -1,
+ groupAddress,
+ port,
+ 250,
+ connectorPairs);
configuration.getBroadcastGroupConfigurations().add(bcConfig);
@@ -1266,7 +1326,7 @@
if (netty)
{
params.put(org.hornetq.integration.transports.netty.TransportConstants.PORT_PROP_NAME,
- org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
+ org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
}
return params;
@@ -1333,12 +1393,12 @@
pairs.add(connectorPair);
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
- address,
- 100,
- true,
- forwardWhenNoConsumers,
- maxHops,
- pairs);
+ address,
+ 100,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1384,12 +1444,12 @@
}
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
- address,
- 250,
- true,
- forwardWhenNoConsumers,
- maxHops,
- pairs);
+ address,
+ 250,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1454,12 +1514,12 @@
}
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
- address,
- 250,
- true,
- forwardWhenNoConsumers,
- maxHops,
- pairs);
+ address,
+ 250,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1480,12 +1540,12 @@
}
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
- address,
- 100,
- true,
- forwardWhenNoConsumers,
- maxHops,
- discoveryGroupName);
+ address,
+ 100,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ discoveryGroupName);
List<ClusterConnectionConfiguration> clusterConfs = server.getConfiguration().getClusterConfigurations();
clusterConfs.add(clusterConf);
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -16,6 +16,7 @@
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.core.server.group.impl.Response;
import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.management.Notification;
@@ -42,13 +43,14 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -98,6 +100,10 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1);
+
startServers(0, 1, 2);
try
@@ -106,7 +112,7 @@
{
public SimpleString getName()
{
- return null;
+ return null;
}
public Response propose(Proposal proposal) throws Exception
@@ -116,12 +122,12 @@
public void proposed(Response response) throws Exception
{
-
+ System.out.println("ClusteredGroupingTest.proposed");
}
public void send(Response response, int distance) throws Exception
{
-
+ System.out.println("ClusteredGroupingTest.send");
}
public Response receive(Proposal proposal, int distance) throws Exception
@@ -131,11 +137,14 @@
public void onNotification(Notification notification)
{
+ System.out.println("ClusteredGroupingTest.onNotification");
+ }
+ public void addGroupBinding(GroupBinding groupBinding)
+ {
+ System.out.println("ClusteredGroupingTest.addGroupBinding");
}
}, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -192,14 +201,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -251,14 +261,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -313,14 +324,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -373,14 +385,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -436,14 +449,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -496,14 +510,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
-
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -547,11 +562,11 @@
waitForBindings(2, "queues.testaddress", 1, 1, false);
sendInRange(0, "queues.testaddress", 30, 40, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
+ verifyReceiveAllInRange(30, 40, 3);
sendInRange(1, "queues.testaddress", 40, 50, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
+ verifyReceiveAllInRange(40, 50, 3);
sendInRange(2, "queues.testaddress", 50, 60, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- verifyReceiveAllInRange(30, 50, 3);
+ verifyReceiveAllInRange(50, 60, 3);
System.out.println("*****************************************************************************");
}
finally
@@ -576,13 +591,14 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -642,13 +658,14 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -677,8 +694,6 @@
sendInRange(2, "queues.testaddress", 10, 20, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- sendInRange(0, "queues.testaddress", 20, 30, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
stopServers(1);
startServers(1);
@@ -688,7 +703,7 @@
waitForBindings(1, "queues.testaddress", 1, 1, true);
- verifyReceiveAllInRange(10, 30, 1);
+ verifyReceiveAllInRange(10, 20, 1);
System.out.println("*****************************************************************************");
@@ -716,13 +731,14 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -788,13 +804,15 @@
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -817,18 +835,19 @@
sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAllInRange(0, 10, 0);
-
+ closeSessionFactory(0);
stopServers(0);
sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
startServers(0);
-
waitForBindings(0, "queues.testaddress", 1, 0, true);
+ setupSessionFactory(0, isNetty());
+ verifyReceiveAllInRange(10, 20, 0);
sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- verifyReceiveAllInRange(10, 20, 0);
verifyReceiveAllInRange(20, 30, 0);
System.out.println("*****************************************************************************");
@@ -854,14 +873,14 @@
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -919,14 +938,14 @@
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
startServers(0, 1, 2);
try
{
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -951,9 +970,9 @@
CountDownLatch latch = new CountDownLatch(1);
Thread[] threads = new Thread[9];
int range = 0;
- for(int i = 0 ; i < 9; i++,range+=10)
+ for (int i = 0; i < 9; i++, range += 10)
{
- threads[i] = new Thread(new ThreadSender(range, range+10, 1, new SimpleString("id" + i), latch, i < 8));
+ threads[i] = new Thread(new ThreadSender(range, range + 10, 1, new SimpleString("id" + i), latch, i < 8));
}
for (Thread thread : threads)
{
@@ -975,6 +994,61 @@
}
+ public void testGroupingLocalHandlerFails() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupSharedStorageServer(2, isFileStorage(), isNetty(), 0);
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
+
+ setupClusterConnectionWithBackups("cluster1", "queues", false, 1, isNetty(), 1, new int[]{0}, new int[]{2});
+
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 2, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
+
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+ sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAll(10, 0);
+ closeSessionFactory(0);
+ stopServers(0);
+ Thread.sleep(5000);
+ //setupSessionFactory(3, isNetty());
+ sendWithProperty(1, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
public boolean isNetty()
{
return true;
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -23,6 +23,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
@@ -70,7 +71,7 @@
List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
- journal.loadBindingJournal(queueBindingInfos);
+ journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>());
Map<Long, Queue> queues = new HashMap<Long, Queue>();
@@ -88,7 +89,7 @@
queueBindingInfos = new ArrayList<QueueBindingInfo>();
- journal.loadBindingJournal(queueBindingInfos);
+ journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>());
journal.start();
}
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -43,6 +43,7 @@
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -50,6 +51,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -842,7 +844,15 @@
{
}
- /* (non-Javadoc)
+ public void addGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void deleteGrouping(GroupBinding groupBinding) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#addQueueBinding(org.hornetq.core.postoffice.Binding)
*/
public void addQueueBinding(final Binding binding) throws Exception
@@ -933,13 +943,14 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
*/
- public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
+ public void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
{
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
- */
+ * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
+ */
public void loadMessageJournal(PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -26,6 +26,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
@@ -86,7 +87,7 @@
journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
@@ -108,7 +109,7 @@
journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
journal.loadMessageJournal(new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
@@ -135,7 +136,7 @@
journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
journal.start();
- journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
journal.loadMessageJournal(new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-10-17 12:38:46 UTC (rev 8125)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-10-19 11:30:49 UTC (rev 8126)
@@ -150,13 +150,4 @@
{
}
-
- public void setGroupingHandler(GroupingHandler groupingHandler)
- {
- }
-
- public GroupingHandler getGroupingHandler()
- {
- return null;
- }
}
\ No newline at end of file
More information about the hornetq-commits
mailing list