[jboss-cvs] JBoss Messaging SVN: r5381 - in trunk: src/main/org/jboss/messaging/core/config/cluster and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Nov 18 06:58:28 EST 2008
Author: timfox
Date: 2008-11-18 06:58:28 -0500 (Tue, 18 Nov 2008)
New Revision: 5381
Added:
trunk/src/main/org/jboss/messaging/core/config/cluster/
trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java
trunk/src/main/org/jboss/messaging/core/config/cluster/DiscoveryGroupConfiguration.java
trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java
trunk/src/main/org/jboss/messaging/core/server/cluster/
trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java
trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java
trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java
trunk/src/main/org/jboss/messaging/core/server/cluster/Forwarder.java
trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java
trunk/src/main/org/jboss/messaging/core/server/cluster/Transformer.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
Removed:
trunk/src/main/org/jboss/messaging/core/config/OutflowConfiguration.java
trunk/src/main/org/jboss/messaging/core/server/Forwarder.java
trunk/src/main/org/jboss/messaging/core/server/impl/ForwarderImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
Log:
Clustering
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -26,6 +26,9 @@
import java.util.List;
import java.util.Set;
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
import org.jboss.messaging.core.server.JournalType;
import org.jboss.messaging.util.SimpleString;
@@ -89,9 +92,17 @@
void setBackupConnectorConfiguration(TransportConfiguration config);
- Set<OutflowConfiguration> getOutflowConfigurations();
+ Set<BroadcastGroupConfiguration> getBroadcastGroupConfigurations();
+
+ void setBroadcastGroupConfigurations(Set<BroadcastGroupConfiguration> configs);
+
+ Set<DiscoveryGroupConfiguration> getDiscoveryGroupConfigurations();
+
+ void setDiscoveryGroupConfigurations(Set<DiscoveryGroupConfiguration> configs);
+
+ Set<MessageFlowConfiguration> getMessageFlowConfigurations();
- void setOutFlowConfigurations(final Set<OutflowConfiguration> configs);
+ void setMessageFlowConfigurations(final Set<MessageFlowConfiguration> configs);
SimpleString getManagementAddress();
Deleted: trunk/src/main/org/jboss/messaging/core/config/OutflowConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/OutflowConfiguration.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/config/OutflowConfiguration.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -1,107 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.config;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * A OutflowConfiguration
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 12 Nov 2008 13:52:22
- *
- *
- */
-public class OutflowConfiguration implements Serializable
-{
- private static final long serialVersionUID = 6583525368508418953L;
-
- private final String name;
-
- private final String address;
-
- private final String filterString;
-
- private final boolean fanout;
-
- private final int maxBatchSize;
-
- private final long maxBatchTime;
-
- private final List<TransportConfiguration> connectors;
-
- public OutflowConfiguration(final String name,
- final String address,
- final String filterString,
- final boolean fanout,
- final int maxBatchSize,
- final long maxBatchTime,
- final List<TransportConfiguration> connectors)
- {
- this.name = name;
- this.address = address;
- this.filterString = filterString;
- this.fanout = fanout;
- this.maxBatchSize = maxBatchSize;
- this.maxBatchTime = maxBatchTime;
- this.connectors = connectors;
- }
-
- public String getName()
- {
- return name;
- }
-
- public String getAddress()
- {
- return address;
- }
-
- public String getFilterString()
- {
- return filterString;
- }
-
- public boolean isFanout()
- {
- return fanout;
- }
-
- public int getMaxBatchSize()
- {
- return maxBatchSize;
- }
-
- public long getMaxBatchTime()
- {
- return maxBatchTime;
- }
-
- public List<TransportConfiguration> getConnectors()
- {
- return connectors;
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -62,15 +62,59 @@
return params;
}
- public boolean equals(Object other)
+ private int hash = -1;
+
+ public int hashCode()
{
+ return factoryClassName.hashCode();
+ }
+
+ public boolean equals(final Object other)
+ {
if (other instanceof TransportConfiguration == false)
{
return false;
}
- TransportConfiguration ai = (TransportConfiguration)other;
-
- return this.factoryClassName.equals(ai.factoryClassName);
+ TransportConfiguration kother = (TransportConfiguration)other;
+
+ if (factoryClassName.equals(kother.factoryClassName))
+ {
+ if (params == null)
+ {
+ return kother.params == null;
+ }
+ else
+ {
+ if (kother.params == null)
+ {
+ return false;
+ }
+ else if (params.size() == kother.params.size())
+ {
+ for (Map.Entry<String, Object> entry : params.entrySet())
+ {
+ Object thisVal = entry.getValue();
+
+ Object otherVal = kother.params.get(entry.getKey());
+
+ if (otherVal == null || !otherVal.equals(thisVal))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ return false;
+ }
}
+
}
Added: trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/BroadcastGroupConfiguration.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.config.cluster;
+
+import java.io.Serializable;
+
+/**
+ * A BroadcastGroupConfiguration
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 18 Nov 2008 08:44:30
+ *
+ *
+ */
+public class BroadcastGroupConfiguration implements Serializable
+{
+ private static final long serialVersionUID = 1052413739064253955L;
+
+ private final String name;
+
+ private final String localBindAddress;
+
+ private final int localBindPort;
+
+ private final String groupAddress;
+
+ private final int groupPort;
+
+ private final long broadcastPeriod;
+
+ public BroadcastGroupConfiguration(final String name,
+ final String localBindAddress,
+ final int localBindPort,
+ final String groupAddress,
+ final int groupPort,
+ final long broadcastPeriod)
+ {
+ super();
+ this.name = name;
+ this.localBindAddress = localBindAddress;
+ this.localBindPort = localBindPort;
+ this.groupAddress = groupAddress;
+ this.groupPort = groupPort;
+ this.broadcastPeriod = broadcastPeriod;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getLocalBindAddress()
+ {
+ return localBindAddress;
+ }
+
+ public int getLocalBindPort()
+ {
+ return localBindPort;
+ }
+
+ public String getGroupAddress()
+ {
+ return groupAddress;
+ }
+
+ public int getGroupPort()
+ {
+ return groupPort;
+ }
+
+ public long getBroadcastPeriod()
+ {
+ return broadcastPeriod;
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/config/cluster/DiscoveryGroupConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/DiscoveryGroupConfiguration.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/DiscoveryGroupConfiguration.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.config.cluster;
+
+import java.io.Serializable;
+
+/**
+ * A DiscoveryGroupConfiguration
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 18 Nov 2008 08:47:30
+ *
+ *
+ */
+public class DiscoveryGroupConfiguration implements Serializable
+{
+ private static final long serialVersionUID = 8657206421727863400L;
+
+ private final String name;
+
+ private final String groupAddress;
+
+ private final int groupPort;
+
+ private final long refreshTimeout;
+
+ public DiscoveryGroupConfiguration(final String name,
+ final String groupAddress,
+ final int groupPort,
+ final long refreshTimeout)
+ {
+ super();
+ this.name = name;
+ this.groupAddress = groupAddress;
+ this.groupPort = groupPort;
+ this.refreshTimeout = refreshTimeout;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getGroupAddress()
+ {
+ return groupAddress;
+ }
+
+ public int getGroupPort()
+ {
+ return groupPort;
+ }
+
+ public long getRefreshTimeout()
+ {
+ return refreshTimeout;
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.config.cluster;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+
+/**
+ * A MessageFlowConfiguration
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 12 Nov 2008 13:52:22
+ *
+ *
+ */
+public class MessageFlowConfiguration implements Serializable
+{
+ private static final long serialVersionUID = 6583525368508418953L;
+
+ private final String name;
+
+ private final String address;
+
+ private final String filterString;
+
+ private final boolean fanout;
+
+ private final int maxBatchSize;
+
+ private final long maxBatchTime;
+
+ private final List<TransportConfiguration> staticConnectors;
+
+ private final String discoveryGroupName;
+
+ private final String transformerClassName;
+
+ public MessageFlowConfiguration(final String name,
+ final String address,
+ final String filterString,
+ final boolean fanout,
+ final int maxBatchSize,
+ final long maxBatchTime,
+ final String transformerClassName,
+ final List<TransportConfiguration> connectors)
+ {
+ this.name = name;
+ this.address = address;
+ this.filterString = filterString;
+ this.fanout = fanout;
+ this.maxBatchSize = maxBatchSize;
+ this.maxBatchTime = maxBatchTime;
+ this.transformerClassName = transformerClassName;
+ this.staticConnectors = connectors;
+ this.discoveryGroupName = null;
+ }
+
+ public MessageFlowConfiguration(final String name,
+ final String address,
+ final String filterString,
+ final boolean fanout,
+ final int maxBatchSize,
+ final long maxBatchTime,
+ final String transformerClassName,
+ final String discoveryGroupName)
+ {
+ this.name = name;
+ this.address = address;
+ this.filterString = filterString;
+ this.fanout = fanout;
+ this.maxBatchSize = maxBatchSize;
+ this.maxBatchTime = maxBatchTime;
+ this.transformerClassName = transformerClassName;
+ this.staticConnectors = null;
+ this.discoveryGroupName = discoveryGroupName;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ public String getFilterString()
+ {
+ return filterString;
+ }
+
+ public boolean isFanout()
+ {
+ return fanout;
+ }
+
+ public int getMaxBatchSize()
+ {
+ return maxBatchSize;
+ }
+
+ public long getMaxBatchTime()
+ {
+ return maxBatchTime;
+ }
+
+ public String getTransformerClassName()
+ {
+ return transformerClassName;
+ }
+
+ public List<TransportConfiguration> getConnectors()
+ {
+ return staticConnectors;
+ }
+
+ public String getDiscoveryGroupName()
+ {
+ return this.discoveryGroupName;
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -18,8 +18,10 @@
import java.util.Set;
import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
import org.jboss.messaging.core.server.JournalType;
import org.jboss.messaging.util.SimpleString;
@@ -111,8 +113,13 @@
protected TransportConfiguration backupConnectorConfig;
- protected Set<OutflowConfiguration> outFlowConfigs = new HashSet<OutflowConfiguration>();
+ protected Set<MessageFlowConfiguration> outFlowConfigs = new HashSet<MessageFlowConfiguration>();
+
+ protected Set<BroadcastGroupConfiguration> broadcastGroupConfigurations = new HashSet<BroadcastGroupConfiguration>();
+
+ protected Set<DiscoveryGroupConfiguration> discoveryGroupConfigurations = new HashSet<DiscoveryGroupConfiguration>();
+
// Paging related attributes
protected long pagingMaxGlobalSize = -1;
@@ -253,16 +260,37 @@
backupConnectorConfig = config;
}
- public Set<OutflowConfiguration> getOutflowConfigurations()
+ public Set<MessageFlowConfiguration> getMessageFlowConfigurations()
{
return outFlowConfigs;
}
- public void setOutFlowConfigurations(final Set<OutflowConfiguration> configs)
+ public void setMessageFlowConfigurations(final Set<MessageFlowConfiguration> configs)
{
this.outFlowConfigs = configs;
}
+
+ public Set<BroadcastGroupConfiguration> getBroadcastGroupConfigurations()
+ {
+ return broadcastGroupConfigurations;
+ }
+ public void setBroadcastGroupConfigurations(Set<BroadcastGroupConfiguration> broadcastGroupConfigurations)
+ {
+ this.broadcastGroupConfigurations = broadcastGroupConfigurations;
+ }
+
+ public Set<DiscoveryGroupConfiguration> getDiscoveryGroupConfigurations()
+ {
+ return discoveryGroupConfigurations;
+ }
+
+ public void setDiscoveryGroupConfigurations(Set<DiscoveryGroupConfiguration> discoveryGroupConfigurations)
+ {
+ this.discoveryGroupConfigurations = discoveryGroupConfigurations;
+ }
+
+
public String getBindingsDirectory()
{
return bindingsDirectory;
@@ -491,5 +519,7 @@
cother.getSecurityInvalidationInterval() == getSecurityInvalidationInterval() &&
cother.getManagementAddress().equals(getManagementAddress());
}
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -46,6 +46,7 @@
* JBMTimestamp - the timestamp of the message
* JBMDurable - "DURABLE" or "NON_DURABLE"
* JBMExpiration - the expiration of the message
+* JBMSize - the encoded size of the full message in bytes
* Any other identifers that appear in a filter expression represent header values for the message
*
* String values must be set as <code>SimpleString</code>, not <code>java.lang.String</code> (see JBMESSAGING-1307).
@@ -81,6 +82,8 @@
private static final SimpleString JBM_PRIORITY = new SimpleString("JBMPriority");
private static final SimpleString JBM_MESSAGE_ID = new SimpleString("JBMMessageID");
+
+ private static final SimpleString JBM_SIZE = new SimpleString("JBMSize");
private static final SimpleString JBM_PREFIX = new SimpleString("JBM");
@@ -180,6 +183,10 @@
{
return msg.getExpiration();
}
+ else if (JBM_SIZE.equals(fieldName))
+ {
+ return msg.getEncodeSize();
+ }
else
{
return null;
Deleted: trunk/src/main/org/jboss/messaging/core/server/Forwarder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Forwarder.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/server/Forwarder.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -1,39 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server;
-
-
-/**
- * A Forwarder
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 15 Nov 2008 09:42:31
- *
- *
- */
-public interface Forwarder extends Consumer
-{
- void close() throws Exception;
-}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,47 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.server.MessagingComponent;
+
+/**
+ * A BroadcastGroup
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 18 Nov 2008 09:29:45
+ *
+ *
+ */
+public interface BroadcastGroup extends MessagingComponent
+{
+ void addConnector(final TransportConfiguration connector);
+
+ void removeConnector(final TransportConfiguration connector);
+
+ int size();
+
+ void broadcastConnectors() throws Exception;
+}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,53 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.server.MessagingComponent;
+
+/**
+ * A ClusterManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 18 Nov 2008 09:23:26
+ *
+ *
+ */
+public interface ClusterManager extends MessagingComponent
+{
+ void deployBroadcastGroup(BroadcastGroupConfiguration broadcastGroupConfig) throws Exception;
+
+ void deployDiscoveryGroup(DiscoveryGroupConfiguration discoveryGroupConfig) throws Exception;
+
+ void deployMessageFlow(MessageFlowConfiguration messageFlowConfig) throws Exception;
+
+ void undeployBroadcastGroup(String name) throws Exception;
+
+ void undeployDiscoveryGroup(String name) throws Exception;
+
+ void undeployMessageFlow(String name) throws Exception;
+}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.server.MessagingComponent;
+
+/**
+ * A DiscoveryGroup
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 18 Nov 2008 09:26:54
+ *
+ *
+ */
+public interface DiscoveryGroup extends MessagingComponent
+{
+ List<TransportConfiguration> getConnectors();
+
+ boolean waitForBroadcast(long timeout);
+
+ void registerListener(final DiscoveryListener listener);
+
+ void unregisterListener(final DiscoveryListener listener);
+}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+/**
+ * A DiscoveryListener
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 17 Nov 2008 14:30:39
+ *
+ *
+ */
+public interface DiscoveryListener
+{
+ void connectorsChanged();
+}
Copied: trunk/src/main/org/jboss/messaging/core/server/cluster/Forwarder.java (from rev 5368, trunk/src/main/org/jboss/messaging/core/server/Forwarder.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Forwarder.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Forwarder.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.MessagingComponent;
+
+
+/**
+ * A Forwarder
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 15 Nov 2008 09:42:31
+ *
+ *
+ */
+public interface Forwarder extends Consumer, MessagingComponent
+{
+}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.server.MessagingComponent;
+
+/**
+ * A MessageFlow
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 18 Nov 2008 09:41:01
+ *
+ *
+ */
+public interface MessageFlow extends MessagingComponent
+{
+
+
+}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/Transformer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Transformer.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Transformer.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.server.ServerMessage;
+
+/**
+ * A Transformer
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 18 Nov 2008 11:01:50
+ *
+ *
+ */
+public interface Transformer
+{
+ ServerMessage transform(ServerMessage message);
+}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,181 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.cluster.BroadcastGroup;
+
+/**
+ * A BroadcastGroupImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 15 Nov 2008 09:45:32
+ *
+ */
+public class BroadcastGroupImpl implements BroadcastGroup, Runnable
+{
+ private static final Logger log = Logger.getLogger(BroadcastGroupImpl.class);
+
+ private final InetAddress localBindAddress;
+
+ private final int localPort;
+
+ private final InetAddress groupAddress;
+
+ private final int groupPort;
+
+ private DatagramSocket socket;
+
+ private final List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+
+ private volatile boolean started;
+
+ private volatile ScheduledFuture<?> future;
+
+ public BroadcastGroupImpl(final InetAddress localBindAddress,
+ final int localPort,
+ final InetAddress groupAddress,
+ final int groupPort) throws Exception
+ {
+ this.localBindAddress = localBindAddress;
+
+ this.localPort = localPort;
+
+ this.groupAddress = groupAddress;
+
+ this.groupPort = groupPort;
+
+ // FIXME - doesn't seem to work when specifying port and address
+
+ // this.socket = new DatagramSocket(localPort, localBindAddress);
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ socket = new DatagramSocket();
+
+ started = true;
+ }
+
+ public synchronized void stop()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ if (future != null)
+ {
+ future.cancel(false);
+ }
+
+ socket.close();
+
+ started = false;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public void addConnector(final TransportConfiguration connector)
+ {
+ connectors.add(connector);
+ }
+
+ public void removeConnector(final TransportConfiguration connector)
+ {
+ connectors.remove(connector);
+ }
+
+ public int size()
+ {
+ return connectors.size();
+ }
+
+ public void broadcastConnectors() throws Exception
+ {
+ // TODO - for now we just use plain serialization to serialize the transport configs
+ // we should use our own format for a tighter representation
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+
+ oos.writeInt(connectors.size());
+
+ for (TransportConfiguration connector : connectors)
+ {
+ oos.writeObject(connector);
+ }
+
+ oos.flush();
+
+ byte[] data = bos.toByteArray();
+
+ DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
+
+ log.info("broadcasting packet");
+
+ socket.send(packet);
+ }
+
+ public void run()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ try
+ {
+ broadcastConnectors();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to broadcast connector configs");
+ }
+ }
+
+ public void setScheduledFuture(final ScheduledFuture<?> future)
+ {
+ this.future = future;
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,303 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.cluster.BroadcastGroup;
+import org.jboss.messaging.core.server.cluster.ClusterManager;
+import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.server.cluster.MessageFlow;
+import org.jboss.messaging.core.server.cluster.Transformer;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A ClusterManagerImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 18 Nov 2008 09:23:49
+ *
+ *
+ */
+public class ClusterManagerImpl implements ClusterManager
+{
+ private final Map<String, BroadcastGroup> broadcastGroups = new HashMap<String, BroadcastGroup>();
+
+ private final Map<String, DiscoveryGroup> discoveryGroups = new HashMap<String, DiscoveryGroup>();
+
+ private final Map<String, MessageFlow> messageFlows = new HashMap<String, MessageFlow>();
+
+ private final ExecutorFactory executorFactory;
+
+ private final StorageManager storageManager;
+
+ private final PostOffice postOffice;
+
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+ private final ScheduledExecutorService scheduledExecutor;
+
+ private volatile boolean started;
+
+ public ClusterManagerImpl(final ExecutorFactory executorFactory,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final ScheduledExecutorService scheduledExecutor)
+ {
+ this.executorFactory = executorFactory;
+
+ this.storageManager = storageManager;
+
+ this.postOffice = postOffice;
+
+ this.queueSettingsRepository = queueSettingsRepository;
+
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ for (BroadcastGroup group: broadcastGroups.values())
+ {
+ group.start();
+ }
+
+ for (DiscoveryGroup group: discoveryGroups.values())
+ {
+ group.start();
+ }
+
+ for (MessageFlow flow: this.messageFlows.values())
+ {
+ flow.start();
+ }
+
+ started = true;
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ for (BroadcastGroup group: broadcastGroups.values())
+ {
+ group.stop();
+ }
+
+ for (DiscoveryGroup group: discoveryGroups.values())
+ {
+ group.stop();
+ }
+
+ for (MessageFlow flow: this.messageFlows.values())
+ {
+ flow.stop();
+ }
+
+ started = false;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
+ {
+ if (broadcastGroups.containsKey(config.getName()))
+ {
+ throw new IllegalArgumentException("There is already a broadcast-group with name " + config.getName() +
+ " deployed");
+ }
+
+ InetAddress localBindAddress = InetAddress.getByName(config.getLocalBindAddress());
+
+ InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
+
+ BroadcastGroupImpl group = new BroadcastGroupImpl(localBindAddress,
+ config.getLocalBindPort(),
+ groupAddress,
+ config.getGroupPort());
+
+ ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
+ 0L,
+ config.getBroadcastPeriod(),
+ MILLISECONDS);
+
+ group.setScheduledFuture(future);
+
+ broadcastGroups.put(config.getName(), group);
+
+ group.start();
+ }
+
+ public synchronized void deployDiscoveryGroup(final DiscoveryGroupConfiguration config) throws Exception
+ {
+ if (discoveryGroups.containsKey(config.getName()))
+ {
+ throw new IllegalArgumentException("There is already a discovery-group with name " + config.getName() +
+ " deployed");
+ }
+
+ InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
+
+ DiscoveryGroup group = new DiscoveryGroupImpl(groupAddress, config.getGroupPort(), config.getRefreshTimeout());
+
+ discoveryGroups.put(config.getName(), group);
+
+ group.start();
+ }
+
+ public synchronized void deployMessageFlow(final MessageFlowConfiguration config) throws Exception
+ {
+ if (messageFlows.containsKey(config.getName()))
+ {
+ throw new IllegalArgumentException("There is already a message-flow with name " + config.getName() +
+ " deployed");
+ }
+
+ Transformer transformer = null;
+
+ if (config.getTransformerClassName() != null)
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ Class<?> clz = loader.loadClass(config.getTransformerClassName());
+ transformer = (Transformer)clz.newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Error instantiating transformer class \"" + config.getTransformerClassName() +
+ "\"",
+ e);
+ }
+ }
+
+ MessageFlow flow;
+
+ if (config.getDiscoveryGroupName() == null)
+ {
+ // Create message flow with list of static connectors
+
+ flow = new MessageFlowImpl(new SimpleString(config.getName()),
+ new SimpleString(config.getAddress()),
+ config.getMaxBatchSize(),
+ config.getMaxBatchTime(),
+ config.getFilterString() == null ? null
+ : new SimpleString(config.getFilterString()),
+ config.isFanout(),
+ executorFactory,
+ storageManager,
+ postOffice,
+ queueSettingsRepository,
+ transformer,
+ config.getConnectors());
+ }
+ else
+ {
+ // Create message flow with connectors from discovery group
+
+ DiscoveryGroup group = discoveryGroups.get(config.getDiscoveryGroupName());
+
+ if (group == null)
+ {
+ throw new IllegalArgumentException("There is no discovery-group with name " + config.getDiscoveryGroupName() +
+ " deployed");
+ }
+
+ flow = new MessageFlowImpl(new SimpleString(config.getName()),
+ new SimpleString(config.getAddress()),
+ config.getMaxBatchSize(),
+ config.getMaxBatchTime(),
+ config.getFilterString() == null ? null
+ : new SimpleString(config.getFilterString()),
+ config.isFanout(),
+ this.executorFactory,
+ storageManager,
+ postOffice,
+ queueSettingsRepository,
+ transformer,
+ group);
+ }
+
+ messageFlows.put(config.getName(), flow);
+
+ flow.start();
+ }
+
+ public synchronized void undeployBroadcastGroup(final String name) throws Exception
+ {
+ BroadcastGroup group = broadcastGroups.get(name);
+
+ if (group != null)
+ {
+ group.stop();
+ }
+ }
+
+ public synchronized void undeployDiscoveryGroup(final String name) throws Exception
+ {
+ DiscoveryGroup group = discoveryGroups.get(name);
+
+ if (group != null)
+ {
+ group.stop();
+ }
+ }
+
+ public synchronized void undeployMessageFlow(final String name) throws Exception
+ {
+ MessageFlow flow = messageFlows.get(name);
+
+ if (flow != null)
+ {
+ flow.stop();
+ }
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,269 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.InterruptedIOException;
+import java.io.ObjectInputStream;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.server.cluster.DiscoveryListener;
+
+/**
+ * A DiscoveryGroupImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 17 Nov 2008 13:21:45
+ *
+ */
+public class DiscoveryGroupImpl implements Runnable, DiscoveryGroup
+{
+ private static final Logger log = Logger.getLogger(DiscoveryGroupImpl.class);
+
+ private static final int SOCKET_TIMEOUT = 500;
+
+ private MulticastSocket socket;
+
+ private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+
+ private final Thread thread;
+
+ private boolean received;
+
+ private final Object waitLock = new Object();
+
+ private final Map<TransportConfiguration, Long> connectors = new HashMap<TransportConfiguration, Long>();
+
+ private final long timeout;
+
+ private volatile boolean started;
+
+ public DiscoveryGroupImpl(final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
+ {
+ socket = new MulticastSocket(groupPort);
+
+ socket.joinGroup(groupAddress);
+
+ socket.setSoTimeout(SOCKET_TIMEOUT);
+
+ this.timeout = timeout;
+
+ thread = new Thread(this);
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ thread.start();
+
+ started = true;
+ }
+
+ public synchronized void stop()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ try
+ {
+ thread.join();
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ socket.close();
+
+ started = false;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public synchronized List<TransportConfiguration> getConnectors()
+ {
+ return new ArrayList<TransportConfiguration>(connectors.keySet());
+ }
+
+ public boolean waitForBroadcast(final long timeout)
+ {
+ synchronized (waitLock)
+ {
+ long start = System.currentTimeMillis();
+
+ long toWait = timeout;
+
+ while (!received && toWait > 0)
+ {
+ try
+ {
+ waitLock.wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ boolean ret = received;
+
+ received = false;
+
+ return ret;
+ }
+ }
+
+ public void run()
+ {
+ //TODO - can we use a smaller buffer size?
+ final byte[] data = new byte[65535];
+
+ final DatagramPacket packet = new DatagramPacket(data, data.length);
+
+ try
+ {
+ while (true)
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ try
+ {
+ socket.receive(packet);
+ }
+ catch (InterruptedIOException e)
+ {
+ if (!started)
+ {
+ return;
+ }
+ else
+ {
+ continue;
+ }
+ }
+
+ log.info("Listener received packet");
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(data);
+
+ ObjectInputStream ois = new ObjectInputStream(bis);
+
+ int size = ois.readInt();
+
+ synchronized (this)
+ {
+ for (int i = 0; i < size; i++)
+ {
+ TransportConfiguration connector = (TransportConfiguration)ois.readObject();
+
+ connectors.put(connector, System.currentTimeMillis());
+ }
+ }
+
+ packet.setLength(data.length);
+
+ synchronized (waitLock)
+ {
+ received = true;
+
+ waitLock.notify();
+ }
+
+ callListeners();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to receive datagram", e);
+ }
+ }
+
+ public synchronized void registerListener(final DiscoveryListener listener)
+ {
+ this.listeners.add(listener);
+ }
+
+ public synchronized void unregisterListener(final DiscoveryListener listener)
+ {
+ this.listeners.remove(listener);
+ }
+
+ private synchronized void callListeners()
+ {
+ long now = System.currentTimeMillis();
+
+ Iterator<Map.Entry<TransportConfiguration, Long>> iter = connectors.entrySet().iterator();
+
+ //Weed out any expired connectors
+
+ while (iter.hasNext())
+ {
+ Map.Entry<TransportConfiguration, Long> entry = iter.next();
+
+ if (entry.getValue() + timeout <= now)
+ {
+ iter.remove();
+ }
+ }
+
+ for (DiscoveryListener listener: listeners)
+ {
+ try
+ {
+ listener.connectorsChanged();
+ }
+ catch (Throwable t)
+ {
+ //Catch it so exception doesn't prevent other listeners from running
+ log.error("Failed to call discovery listener", t);
+ }
+ }
+ }
+}
Copied: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java (from rev 5368, trunk/src/main/org/jboss/messaging/core/server/impl/ForwarderImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,288 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.cluster.Forwarder;
+import org.jboss.messaging.core.server.cluster.Transformer;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.Future;
+
+/**
+ * A ForwarderImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 12 Nov 2008 11:37:35
+ *
+ *
+ */
+public class ForwarderImpl implements Forwarder
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(ForwarderImpl.class);
+
+ // Attributes ----------------------------------------------------
+
+ private final Queue queue;
+
+ private Executor executor;
+
+ private volatile boolean busy;
+
+ private int maxBatchSize;
+
+ private long maxBatchTime;
+
+ private int count;
+
+ private java.util.Queue<MessageReference> refs = new LinkedList<MessageReference>();
+
+ private Transaction tx;
+
+ private final StorageManager storageManager;
+
+ private final PostOffice postOffice;
+
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+ private final Transformer transformer;
+
+ private final ClientSessionFactory csf;
+
+ private ClientSession session;
+
+ private ClientProducer producer;
+
+ private volatile boolean started;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public ForwarderImpl(final Queue queue,
+ final TransportConfiguration connectorConfig,
+ final Executor executor,
+ final int maxBatchSize,
+ final long maxBatchTime,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final Transformer transformer) throws Exception
+ {
+ this.queue = queue;
+
+ this.executor = executor;
+
+ this.maxBatchSize = maxBatchSize;
+
+ this.maxBatchTime = maxBatchTime;
+
+ this.storageManager = storageManager;
+
+ this.postOffice = postOffice;
+
+ this.queueSettingsRepository = queueSettingsRepository;
+
+ this.transformer = transformer;
+
+ this.csf = new ClientSessionFactoryImpl(connectorConfig);
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ createTx();
+
+ session = csf.createSession(false, false, false);
+
+ producer = session.createProducer(null);
+
+ queue.addConsumer(this);
+
+ started = true;
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ started = false;
+
+ queue.removeConsumer(this);
+
+ // Wait until all batches are complete
+
+ Future future = new Future();
+
+ executor.execute(future);
+
+ boolean ok = future.await(10000);
+
+ if (!ok)
+ {
+ log.warn("Timed out waiting for batch to be sent");
+ }
+
+ session.close();
+
+ started = false;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ // Consumer implementation ---------------------------------------
+
+ public HandleStatus handle(final MessageReference reference) throws Exception
+ {
+ if (busy)
+ {
+ return HandleStatus.BUSY;
+ }
+
+ synchronized (this)
+ {
+ if (!started)
+ {
+ return HandleStatus.BUSY;
+ }
+
+ refs.add(reference);
+
+ count++;
+
+ if (count == maxBatchSize)
+ {
+ busy = true;
+
+ executor.execute(new BatchSender());
+ }
+
+ return HandleStatus.HANDLED;
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void sendBatch()
+ {
+ try
+ {
+ synchronized (this)
+ {
+ // TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
+
+ while (true)
+ {
+ MessageReference ref = refs.poll();
+
+ if (ref == null)
+ {
+ break;
+ }
+
+ tx.addAcknowledgement(ref);
+
+ ServerMessage message = ref.getMessage();
+
+ if (transformer != null)
+ {
+ message = transformer.transform(message);
+ }
+
+ producer.send(message.getDestination(), message);
+ }
+
+ session.commit();
+
+ tx.commit();
+
+ createTx();
+
+ busy = false;
+
+ count = 0;
+ }
+
+ queue.deliverAsync(executor);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to forward batch", e);
+
+ try
+ {
+ tx.rollback(queueSettingsRepository);
+ }
+ catch (Exception e2)
+ {
+ log.error("Failed to rollback", e2);
+ }
+ }
+ }
+
+ private void createTx()
+ {
+ tx = new TransactionImpl(storageManager, postOffice);
+ }
+
+ // Inner classes -------------------------------------------------
+
+ private class BatchSender implements Runnable
+ {
+ public void run()
+ {
+ sendBatch();
+ }
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,292 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.server.cluster.DiscoveryListener;
+import org.jboss.messaging.core.server.cluster.Forwarder;
+import org.jboss.messaging.core.server.cluster.MessageFlow;
+import org.jboss.messaging.core.server.cluster.Transformer;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UUIDGenerator;
+
+/**
+ * A MessageFlowImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 17 Nov 2008 15:55:49
+ *
+ *
+ */
+public class MessageFlowImpl implements DiscoveryListener, MessageFlow
+{
+ private static final Logger log = Logger.getLogger(MessageFlowImpl.class);
+
+ private final SimpleString name;
+
+ private final SimpleString address;
+
+ private final SimpleString filterString;
+
+ private final boolean fanout;
+
+ private final int maxBatchSize;
+
+ private final long maxBatchTime;
+
+ private final ExecutorFactory executorFactory;
+
+ private final StorageManager storageManager;
+
+ private final PostOffice postOffice;
+
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+ private final Transformer transformer;
+
+ private Map<TransportConfiguration, Forwarder> forwarders = new HashMap<TransportConfiguration, Forwarder>();
+
+ private final DiscoveryGroup discoveryGroup;
+
+ private volatile boolean started;
+
+ /*
+ * Constructor using static list of connectors
+ */
+ public MessageFlowImpl(final SimpleString name,
+ final SimpleString address,
+ final int maxBatchSize,
+ final long maxBatchTime,
+ final SimpleString filterString,
+ final boolean fanout,
+ final ExecutorFactory executorFactory,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final Transformer transformer,
+ final List<TransportConfiguration> connectors) throws Exception
+ {
+ this.name = name;
+
+ this.address = address;
+
+ this.maxBatchSize = maxBatchSize;
+
+ this.maxBatchTime = maxBatchTime;
+
+ this.filterString = filterString;
+
+ this.fanout = fanout;
+
+ this.executorFactory = executorFactory;
+
+ this.storageManager = storageManager;
+
+ this.postOffice = postOffice;
+
+ this.queueSettingsRepository = queueSettingsRepository;
+
+ this.transformer = transformer;
+
+ this.discoveryGroup = null;
+
+ this.updateConnectors(connectors);
+ }
+
+ /*
+ * Constructor using discovery to get connectors
+ */
+ public MessageFlowImpl(final SimpleString name,
+ final SimpleString address,
+ final int maxBatchSize,
+ final long maxBatchTime,
+ final SimpleString filterString,
+ final boolean fanout,
+ final ExecutorFactory executorFactory,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final Transformer transformer,
+ final DiscoveryGroup discoveryGroup) throws Exception
+ {
+ this.name = name;
+
+ this.address = address;
+
+ this.maxBatchSize = maxBatchSize;
+
+ this.maxBatchTime = maxBatchTime;
+
+ this.filterString = filterString;
+
+ this.fanout = fanout;
+
+ this.executorFactory = executorFactory;
+
+ this.storageManager = storageManager;
+
+ this.postOffice = postOffice;
+
+ this.queueSettingsRepository = queueSettingsRepository;
+
+ this.transformer = transformer;
+
+ this.discoveryGroup = discoveryGroup;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+ if (discoveryGroup != null)
+ {
+ updateConnectors(discoveryGroup.getConnectors());
+
+ discoveryGroup.registerListener(this);
+ }
+
+ started = true;
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ if (discoveryGroup != null)
+ {
+ discoveryGroup.unregisterListener(this);
+ }
+
+ for (Forwarder forwarder : forwarders.values())
+ {
+ forwarder.stop();
+ }
+
+ started = false;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ // DiscoveryListener implementation ------------------------------------------------------------------
+
+ public void connectorsChanged()
+ {
+ try
+ {
+ List<TransportConfiguration> connectors = discoveryGroup.getConnectors();
+
+ updateConnectors(connectors);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to update connectors", e);
+ }
+ }
+
+ private void updateConnectors(final List<TransportConfiguration> connectors) throws Exception
+ {
+ Set<TransportConfiguration> connectorSet = new HashSet<TransportConfiguration>();
+
+ connectorSet.addAll(connectors);
+
+ Iterator<Map.Entry<TransportConfiguration, Forwarder>> iter = forwarders.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry<TransportConfiguration, Forwarder> entry = iter.next();
+
+ if (!connectorSet.contains(entry.getKey()))
+ {
+ // Connector no longer there - we should remove and close it
+
+ entry.getValue().stop();
+
+ iter.remove();
+ }
+ }
+
+ for (TransportConfiguration connector : connectors)
+ {
+ if (!forwarders.containsKey(connector))
+ {
+ SimpleString queueName = new SimpleString("outflow." + name +
+ "." +
+ UUIDGenerator.getInstance().generateSimpleStringUUID());
+
+ Binding binding = postOffice.getBinding(queueName);
+
+ // TODO need to delete store and forward queues that are no longer in the config
+ // and also allow ability to change filterstring etc. while keeping the same name
+ if (binding == null)
+ {
+ Filter filter = filterString == null ? null : new FilterImpl(filterString);
+
+ binding = postOffice.addBinding(address, queueName, filter, true, false, fanout);
+ }
+
+ Forwarder forwarder = new ForwarderImpl(binding.getQueue(),
+ connector,
+ executorFactory.getExecutor(),
+ maxBatchSize,
+ maxBatchTime,
+ storageManager,
+ postOffice,
+ queueSettingsRepository,
+ transformer);
+
+ forwarders.put(connector, forwarder);
+
+ binding.getQueue().addConsumer(forwarder);
+
+ forwarder.start();
+ }
+ }
+ }
+
+}
Deleted: trunk/src/main/org/jboss/messaging/core/server/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ForwarderImpl.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ForwarderImpl.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -1,254 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.impl;
-
-import java.util.LinkedList;
-import java.util.concurrent.Executor;
-
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.Forwarder;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.core.transaction.impl.TransactionImpl;
-import org.jboss.messaging.util.Future;
-
-/**
- * A ForwarderImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 12 Nov 2008 11:37:35
- *
- *
- */
-public class ForwarderImpl implements Forwarder
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(ForwarderImpl.class);
-
- // Attributes ----------------------------------------------------
-
- private final Queue queue;
-
- private Executor executor;
-
- private volatile boolean busy;
-
- private int maxBatchSize;
-
- private long maxBatchTime;
-
- private int count;
-
- private java.util.Queue<MessageReference> refs = new LinkedList<MessageReference>();
-
- private boolean closed;
-
- private Transaction tx;
-
- private final StorageManager storageManager;
-
- private final PostOffice postOffice;
-
- private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
- private final ClientSession session;
-
- private final ClientProducer producer;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public ForwarderImpl(final Queue queue,
- final TransportConfiguration connectorConfig, final Executor executor, final int maxBatchSize,
- final long maxBatchTime,
- final StorageManager storageManager, final PostOffice postOffice,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository)
- throws Exception
- {
- this.queue = queue;
-
- this.executor = executor;
-
- this.maxBatchSize = maxBatchSize;
-
- this.maxBatchTime = maxBatchTime;
-
- this.storageManager = storageManager;
-
- this.postOffice = postOffice;
-
- this.queueSettingsRepository = queueSettingsRepository;
-
- createTx();
-
- ClientSessionFactory csf = new ClientSessionFactoryImpl(connectorConfig);
-
- session = csf.createSession(false, false, false);
-
- producer = session.createProducer(null);
-
- queue.addConsumer(this);
- }
-
- public synchronized void close() throws Exception
- {
- closed = true;
-
- queue.removeConsumer(this);
-
- // Wait until all batches are complete
-
- Future future = new Future();
-
- executor.execute(future);
-
- boolean ok = future.await(10000);
-
- if (!ok)
- {
- log.warn("Timed out waiting for batch to be sent");
- }
- }
-
- // Consumer implementation ---------------------------------------
-
- public HandleStatus handle(final MessageReference reference) throws Exception
- {
- if (busy)
- {
- return HandleStatus.BUSY;
- }
-
- synchronized (this)
- {
- if (closed)
- {
- return HandleStatus.BUSY;
- }
-
- refs.add(reference);
-
- count++;
-
- if (count == maxBatchSize)
- {
- busy = true;
-
- executor.execute(new BatchSender());
- }
-
- return HandleStatus.HANDLED;
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private void sendBatch()
- {
- try
- {
- synchronized (this)
- {
- //TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
-
- while (true)
- {
- MessageReference ref = refs.poll();
-
- if (ref == null)
- {
- break;
- }
-
- tx.addAcknowledgement(ref);
-
- Message message = ref.getMessage();
-
- producer.send(message.getDestination(), message);
- }
-
- session.commit();
-
- tx.commit();
-
- createTx();
-
- busy = false;
-
- count = 0;
- }
-
- queue.deliverAsync(executor);
- }
- catch (Exception e)
- {
- log.error("Failed to forward batch", e);
-
- try
- {
- tx.rollback(queueSettingsRepository);
- }
- catch (Exception e2)
- {
- log.error("Failed to rollback", e2);
- }
- }
- }
-
- private void createTx()
- {
- tx = new TransactionImpl(storageManager, postOffice);
- }
-
- // Inner classes -------------------------------------------------
-
- private class BatchSender implements Runnable
- {
- public void run()
- {
- sendBatch();
- }
- }
-
-}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -25,11 +25,11 @@
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
import org.jboss.messaging.core.management.MessagingServerControlMBean;
@@ -38,7 +38,6 @@
import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
import org.jboss.messaging.core.remoting.Channel;
@@ -54,11 +53,12 @@
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.security.SecurityStore;
import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
-import org.jboss.messaging.core.server.Forwarder;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.server.cluster.ClusterManager;
+import org.jboss.messaging.core.server.cluster.impl.ClusterManagerImpl;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -68,8 +68,6 @@
import org.jboss.messaging.util.ExecutorFactory;
import org.jboss.messaging.util.JBMThreadFactory;
import org.jboss.messaging.util.OrderedExecutorFactory;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.UUIDGenerator;
import org.jboss.messaging.util.VersionLoader;
/**
@@ -122,6 +120,8 @@
private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
+ private ClusterManager clusterManager;
+
// plugins
private StorageManager storageManager;
@@ -218,7 +218,11 @@
storeFactory.setPagingManager(pagingManager);
- resourceManager = new ResourceManagerImpl((int) configuration.getTransactionTimeout()/1000, configuration.getTransactionTimeoutScanPeriod(), storageManager, postOffice, queueSettingsRepository);
+ resourceManager = new ResourceManagerImpl((int)configuration.getTransactionTimeout() / 1000,
+ configuration.getTransactionTimeoutScanPeriod(),
+ storageManager,
+ postOffice,
+ queueSettingsRepository);
postOffice = new PostOfficeImpl(storageManager,
pagingManager,
queueFactory,
@@ -269,9 +273,35 @@
ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
}
remotingService.setMessagingServer(this);
-
- startOutflows();
+ if (configuration.isClustered())
+ {
+ clusterManager = new ClusterManagerImpl(executorFactory,
+ storageManager,
+ postOffice,
+ queueSettingsRepository,
+ scheduledExecutor);
+
+ clusterManager.start();
+
+ //Deploy the cluster artifacts
+
+ for (BroadcastGroupConfiguration config: configuration.getBroadcastGroupConfigurations())
+ {
+ clusterManager.deployBroadcastGroup(config);
+ }
+
+ for (DiscoveryGroupConfiguration config: configuration.getDiscoveryGroupConfigurations())
+ {
+ clusterManager.deployDiscoveryGroup(config);
+ }
+
+ for (MessageFlowConfiguration config: configuration.getMessageFlowConfigurations())
+ {
+ clusterManager.deployMessageFlow(config);
+ }
+ }
+
started = true;
}
@@ -281,9 +311,12 @@
{
return;
}
-
- stopOutflows();
+ if (clusterManager != null)
+ {
+ clusterManager.stop();
+ }
+
asyncDeliveryPool.shutdown();
try
@@ -310,7 +343,7 @@
queueFactory = null;
resourceManager = null;
serverManagement = null;
-
+
started = false;
}
@@ -407,7 +440,7 @@
{
return resourceManager;
}
-
+
public Version getVersion()
{
return version;
@@ -464,53 +497,6 @@
connection.freeze();
}
}
-
- private Set<Forwarder> forwarders = new HashSet<Forwarder>();
-
- private void startOutflows() throws Exception
- {
- Set<OutflowConfiguration> outflows = configuration.getOutflowConfigurations();
-
- for (OutflowConfiguration outflowConfig: outflows)
- {
- for (TransportConfiguration connectorConfig: outflowConfig.getConnectors())
- {
- SimpleString queueName = new SimpleString("outflow." + outflowConfig.getName() + "." +
- UUIDGenerator.getInstance().generateSimpleStringUUID());
-
- Binding binding = postOffice.getBinding(queueName);
-
- //TODO need to delete store and forward queues that are no longer in the config
- //and also allow ability to change filterstring etc. while keeping the same name
- if (binding == null)
- {
- SimpleString address = new SimpleString(outflowConfig.getAddress());
-
- SimpleString filterString = outflowConfig.getFilterString() == null ? null : new SimpleString(outflowConfig.getFilterString());
-
- Filter filter = filterString == null ? null : new FilterImpl(filterString);
-
- binding = postOffice.addBinding(address, queueName, filter, true, false, outflowConfig.isFanout());
- }
-
- Forwarder forwarder = new ForwarderImpl(binding.getQueue(), connectorConfig, executorFactory.getExecutor(),
- outflowConfig.getMaxBatchSize(), outflowConfig.getMaxBatchTime(),
- storageManager, postOffice, queueSettingsRepository);
-
- forwarders.add(forwarder);
-
- binding.getQueue().addConsumer(forwarder);
- }
- }
- }
-
- private void stopOutflows() throws Exception
- {
- for (Forwarder forwarder: forwarders)
- {
- forwarder.close();
- }
- }
public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
final String name,
@@ -569,7 +555,7 @@
final boolean autoCommitAcks,
final boolean xa,
final int sendWindowSize) throws Exception
- {
+ {
checkActivate(connection);
return doCreateSession(name,
@@ -693,7 +679,7 @@
executorFactory.getExecutor(),
channel,
managementService,
- this,
+ this,
configuration.getManagementAddress());
sessions.put(name, session);
@@ -703,7 +689,7 @@
channel.setHandler(handler);
connection.addFailureListener(session);
-
+
return new CreateSessionResponseMessage(version.getIncrementingVersion());
}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -1,223 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- *
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- *
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * A OutflowBatchSizeTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 15 Nov 2008 09:06:55
- *
- *
- */
-public class OutflowBatchSizeTest extends TestCase
-{
- private static final Logger log = Logger.getLogger(OutflowBatchSizeTest.class);
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private MessagingService service0;
-
- private MessagingService service1;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testBatchSize() throws Exception
- {
- Configuration service0Conf = new ConfigurationImpl();
- service0Conf.setSecurityEnabled(false);
- Map<String, Object> service0Params = new HashMap<String, Object>();
- service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
- service0Conf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service0Params));
-
- Configuration service1Conf = new ConfigurationImpl();
- service1Conf.setSecurityEnabled(false);
- Map<String, Object> service1Params = new HashMap<String, Object>();
- service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service1Params));
- service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
- service1.start();
-
- List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
- TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service1Params);
- connectors.add(server1tc);
-
- final SimpleString address1 = new SimpleString("testaddress");
-
- final int batchSize = 10;
-
- OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", address1.toString(), null, true, batchSize, 0, connectors);
- Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
- ofconfigs.add(ofconfig);
- service0Conf.setOutFlowConfigurations(ofconfigs);
-
- service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
- service0.start();
-
- TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service0Params);
-
- ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-
- ClientSession session0 = csf0.createSession(false, true, true);
-
- ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-
- ClientSession session1 = csf1.createSession(false, true, true);
-
- session0.createQueue(address1, address1, null, false, false, true);
-
- session1.createQueue(address1, address1, null, false, false, true);
-
- ClientProducer prod0_1 = session0.createProducer(address1);
-
- ClientConsumer cons0_1 = session0.createConsumer(address1);
-
- ClientConsumer cons1_1 = session1.createConsumer(address1);
-
- session0.start();
-
- session1.start();
-
- final SimpleString propKey = new SimpleString("testkey");
-
- for (int j = 0; j < 10; j++)
- {
-
- for (int i = 0; i < batchSize - 1; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, i);
- message.getBody().flip();
-
- prod0_1.send(message);
- }
-
- for (int i = 0; i < batchSize - 1; i++)
- {
- ClientMessage rmessage1 = cons0_1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(i, rmessage1.getProperty(propKey));
- }
-
- ClientMessage rmessage1 = cons1_1.receive(250);
-
- assertNull(rmessage1);
-
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, batchSize - 1);
- message.getBody().flip();
-
- prod0_1.send(message);
-
- rmessage1 = cons0_1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(batchSize - 1, rmessage1.getProperty(propKey));
-
- for (int i = 0; i < batchSize; i++)
- {
- rmessage1 = cons1_1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(i, rmessage1.getProperty(propKey));
- }
- }
-
- session0.close();
-
- session1.close();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- service0.stop();
-
- assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-
- service1.stop();
-
- assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-
- assertEquals(0, InVMRegistry.instance.size());
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
-
-
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -0,0 +1,225 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A OutflowBatchSizeTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 15 Nov 2008 09:06:55
+ *
+ *
+ */
+public class OutflowBatchSizeTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(OutflowBatchSizeTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private MessagingService service0;
+
+ private MessagingService service1;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testBatchSize() throws Exception
+ {
+ Configuration service0Conf = new ConfigurationImpl();
+ service0Conf.setClustered(true);
+ service0Conf.setSecurityEnabled(false);
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
+ service0Conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ service0Params));
+
+ Configuration service1Conf = new ConfigurationImpl();
+ service1Conf.setClustered(true);
+ service1Conf.setSecurityEnabled(false);
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+ service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ service1Params));
+ service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
+ service1.start();
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ final int batchSize = 10;
+
+ MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), null, true, batchSize, 0, null, connectors);
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig);
+ service0Conf.setMessageFlowConfigurations(ofconfigs);
+
+ service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
+ service0.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, true);
+
+ session1.createQueue(address1, address1, null, false, false, true);
+
+ ClientProducer prod0_1 = session0.createProducer(address1);
+
+ ClientConsumer cons0_1 = session0.createConsumer(address1);
+
+ ClientConsumer cons1_1 = session1.createConsumer(address1);
+
+ session0.start();
+
+ session1.start();
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int j = 0; j < 10; j++)
+ {
+
+ for (int i = 0; i < batchSize - 1; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0_1.send(message);
+ }
+
+ for (int i = 0; i < batchSize - 1; i++)
+ {
+ ClientMessage rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+ }
+
+ ClientMessage rmessage1 = cons1_1.receive(250);
+
+ assertNull(rmessage1);
+
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, batchSize - 1);
+ message.getBody().flip();
+
+ prod0_1.send(message);
+
+ rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(batchSize - 1, rmessage1.getProperty(propKey));
+
+ for (int i = 0; i < batchSize; i++)
+ {
+ rmessage1 = cons1_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+ }
+ }
+
+ session0.close();
+
+ session1.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ service0.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+
+ service1.stop();
+
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
+
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -38,8 +38,8 @@
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
@@ -79,6 +79,7 @@
public void testWithWildcard() throws Exception
{
Configuration service0Conf = new ConfigurationImpl();
+ service0Conf.setClustered(true);
service0Conf.setSecurityEnabled(false);
Map<String, Object> service0Params = new HashMap<String, Object>();
service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
@@ -87,6 +88,7 @@
service0Params));
Configuration service1Conf = new ConfigurationImpl();
+ service1Conf.setClustered(true);
service1Conf.setSecurityEnabled(false);
Map<String, Object> service1Params = new HashMap<String, Object>();
service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -105,10 +107,10 @@
final String filter = "selectorkey='ORANGES'";
- OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", address1.toString(), filter, true, 1, 0, connectors);
- Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+ MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), filter, true, 1, 0, null, connectors);
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
- service0Conf.setOutFlowConfigurations(ofconfigs);
+ service0Conf.setMessageFlowConfigurations(ofconfigs);
service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
service0.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -38,8 +38,8 @@
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
@@ -79,6 +79,7 @@
public void testWithWildcard() throws Exception
{
Configuration service0Conf = new ConfigurationImpl();
+ service0Conf.setClustered(true);
service0Conf.setSecurityEnabled(false);
Map<String, Object> service0Params = new HashMap<String, Object>();
service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
@@ -87,6 +88,7 @@
service0Params));
Configuration service1Conf = new ConfigurationImpl();
+ service1Conf.setClustered(true);
service1Conf.setSecurityEnabled(false);
Map<String, Object> service1Params = new HashMap<String, Object>();
service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -112,10 +114,10 @@
final SimpleString match1 = new SimpleString("cheese.#");
- OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", match1.toString(), null, true, 1, 0, connectors);
- Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+ MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", match1.toString(), null, true, 1, 0, null, connectors);
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
- service0Conf.setOutFlowConfigurations(ofconfigs);
+ service0Conf.setMessageFlowConfigurations(ofconfigs);
service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
service0.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java 2008-11-18 01:24:44 UTC (rev 5380)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java 2008-11-18 11:58:28 UTC (rev 5381)
@@ -38,8 +38,8 @@
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.OutflowConfiguration;
import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
@@ -79,6 +79,7 @@
public void testSimpleOutflowFanout() throws Exception
{
Configuration service0Conf = new ConfigurationImpl();
+ service0Conf.setClustered(true);
service0Conf.setSecurityEnabled(false);
Map<String, Object> service0Params = new HashMap<String, Object>();
service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
@@ -87,6 +88,7 @@
service0Params));
Configuration service1Conf = new ConfigurationImpl();
+ service1Conf.setClustered(true);
service1Conf.setSecurityEnabled(false);
Map<String, Object> service1Params = new HashMap<String, Object>();
service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -103,10 +105,10 @@
final SimpleString testAddress = new SimpleString("testaddress");
- OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", testAddress.toString(), null, true, 1, 0, connectors);
- Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+ MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", testAddress.toString(), null, true, 1, 0, null, connectors);
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
- service0Conf.setOutFlowConfigurations(ofconfigs);
+ service0Conf.setMessageFlowConfigurations(ofconfigs);
service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
service0.start();
@@ -168,6 +170,7 @@
public void testSimpleOutflowRoundRobin() throws Exception
{
Configuration service0Conf = new ConfigurationImpl();
+ service0Conf.setClustered(true);
service0Conf.setSecurityEnabled(false);
Map<String, Object> service0Params = new HashMap<String, Object>();
service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
@@ -176,6 +179,7 @@
service0Params));
Configuration service1Conf = new ConfigurationImpl();
+ service1Conf.setClustered(true);
service1Conf.setSecurityEnabled(false);
Map<String, Object> service1Params = new HashMap<String, Object>();
service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -192,10 +196,10 @@
final SimpleString testAddress = new SimpleString("testaddress");
- OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", testAddress.toString(), null, false, 1, 0, connectors);
- Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+ MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", testAddress.toString(), null, false, 1, 0, null, connectors);
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
ofconfigs.add(ofconfig);
- service0Conf.setOutFlowConfigurations(ofconfigs);
+ service0Conf.setMessageFlowConfigurations(ofconfigs);
service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
service0.start();
More information about the jboss-cvs-commits
mailing list