JBoss hornetq SVN: r7997 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-09-28 04:59:11 -0400 (Mon, 28 Sep 2009)
New Revision: 7997
Modified:
trunk/docs/user-manual/en/embedding-hornetq.xml
Log:
documentation update
* replaced occurrences of Messaging & MessagingServer types by
HornetQ & HornetQServer
Modified: trunk/docs/user-manual/en/embedding-hornetq.xml
===================================================================
--- trunk/docs/user-manual/en/embedding-hornetq.xml 2009-09-26 01:59:13 UTC (rev 7996)
+++ trunk/docs/user-manual/en/embedding-hornetq.xml 2009-09-28 08:59:11 UTC (rev 7997)
@@ -64,20 +64,20 @@
config.setAcceptorConfigurations(transports);</programlisting>
<para>You need to instantiate and start HornetQ server. The class <literal
- >org.hornetq.core.server.Messaging</literal> has a few static methods for creating
+ >org.hornetq.core.server.HornetQ</literal> has a few static methods for creating
servers with common configurations.</para>
- <programlisting>import org.hornetq.core.server.Messaging;
-import org.hornetq.core.server.MessagingServer;
+ <programlisting>import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
...
-MessagingServer server = Messaging.newMessagingServer(config);
+HornetQServer server = HornetQ.newHornetQServer(config);
server.start();</programlisting>
- <para>You also have the option of instantiating <literal>MessagingServerImpl</literal>
+ <para>You also have the option of instantiating <literal>HornetQServerImpl</literal>
directly:</para>
- <programlisting>MessagingServer server =
- new MessagingServerImpl(config);
+ <programlisting>HornetQServer server =
+ new HornetQServerImpl(config);
server.start();</programlisting>
</section>
<section>
@@ -89,7 +89,7 @@
part of the HornetQ distribution provide a very complete implementation of what's needed
to bootstrap the server using JBoss Micro Container. </para>
<para>When using JBoss Micro Container, you need to provide an XML file declaring the
- <literal>MessagingServer</literal> and <literal>Configuration</literal> object, you
+ <literal>HornetQServer</literal> and <literal>Configuration</literal> object, you
can also inject a security manager and a MBean server if you want, but those are
optional.</para>
<para>A very basic XML Bean declaration for the JBoss Micro Container would be:</para>
@@ -103,8 +103,8 @@
</bean>
<!-- The core server -->
- <bean name="MessagingServer"
- class="org.hornetq.core.server.impl.MessagingServerImpl">
+ <bean name="HornetQServer"
+ class="org.hornetq.core.server.impl.HornetQServerImpl">
<constructor>
<parameter>
<inject bean="Configuration"/>
15 years, 3 months
JBoss hornetq SVN: r7996 - in branches/Replication_Clebert: src/main/org/hornetq/core/remoting/server and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-25 21:59:13 -0400 (Fri, 25 Sep 2009)
New Revision: 7996
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/
branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Just backing up ideas.. a lot of this will change
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.RemotingConnection;
+
+/**
+ * The RemotingService could be used by either the Replication Manager or by the Server.
+ *
+ * Each will need to use a different Handler, so this factory may be used to pass what handler needs to be created
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface HandlerFactory
+{
+ ChannelHandler getHandler(RemotingConnection conn, Channel channel);
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-09-26 01:57:13 UTC (rev 7995)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -15,7 +15,6 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -43,6 +42,7 @@
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.server.HandlerFactory;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.spi.Acceptor;
import org.hornetq.core.remoting.spi.AcceptorFactory;
@@ -52,6 +52,7 @@
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.HornetQPacketHandler;
+import org.hornetq.utils.ExecutorFactory;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@@ -83,8 +84,10 @@
private final Configuration config;
- private volatile HornetQServer server;
+ private final ExecutorFactory executorFactory;
+ private final HandlerFactory handlerFactory;
+
private ManagementService managementService;
private volatile RemotingConnection serverSideReplicatingConnection;
@@ -102,7 +105,8 @@
// Constructors --------------------------------------------------
public RemotingServiceImpl(final Configuration config,
- final HornetQServer server,
+ final HandlerFactory handlerFactory,
+ final ExecutorFactory executorFactory,
final ManagementService managementService,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
@@ -110,6 +114,10 @@
{
transportConfigs = config.getAcceptorConfigurations();
+ this.executorFactory = executorFactory;
+
+ this.handlerFactory = handlerFactory;
+
ClassLoader loader = Thread.currentThread().getContextClassLoader();
for (String interceptorClass : config.getInterceptorClassNames())
{
@@ -125,7 +133,6 @@
}
this.config = config;
- this.server = server;
this.managementService = managementService;
this.threadPool = threadPool;
this.scheduledThreadPool = scheduledThreadPool;
@@ -176,7 +183,7 @@
// Remove this code when this is implemented without having to require a special acceptor
// https://jira.jboss.org/jira/browse/JBMESSAGING-1649
- if (config.isJMXManagementEnabled())
+ if (config.isJMXManagementEnabled() && managementService != null)
{
Map<String, Object> params = new HashMap<String, Object>();
@@ -188,12 +195,9 @@
acceptors.add(acceptor);
- if (managementService != null)
- {
- TransportConfiguration info = new TransportConfiguration(InVMAcceptorFactory.class.getName(), params);
+ TransportConfiguration info = new TransportConfiguration(InVMAcceptorFactory.class.getName(), params);
- managementService.registerAcceptor(acceptor, info);
- }
+ managementService.registerAcceptor(acceptor, info);
}
for (Acceptor a : acceptors)
@@ -233,28 +237,31 @@
}
failureCheckThread.close();
-
+
// We need to stop them accepting first so no new connections are accepted after we send the disconnect message
for (Acceptor acceptor : acceptors)
{
acceptor.pause();
}
-
+
for (ConnectionEntry entry : connections.values())
- {
+ {
entry.connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
}
-
+
for (Acceptor acceptor : acceptors)
{
acceptor.stop();
}
-
+
acceptors.clear();
connections.clear();
- managementService.unregisterAcceptors();
+ if (managementService != null)
+ {
+ managementService.unregisterAcceptors();
+ }
started = false;
}
@@ -299,31 +306,14 @@
public void connectionCreated(final Connection connection)
{
- if (server == null)
- {
- throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
- }
-
- RemotingConnection rc = new RemotingConnectionImpl(connection,
- interceptors,
- server.getConfiguration().isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
- .getExecutor()
- : null);
+ RemotingConnection rc = createChannel(connection);
- Channel channel1 = rc.getChannel(1, -1, false);
-
- ChannelHandler handler = new HornetQPacketHandler(server, channel1, rc);
-
- channel1.setHandler(handler);
-
long ttl = ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
if (config.getConnectionTTLOverride() != -1)
{
ttl = config.getConnectionTTLOverride();
}
- final ConnectionEntry entry = new ConnectionEntry(rc,
- System.currentTimeMillis(),
- ttl);
+ final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(), ttl);
connections.put(connection.getID(), entry);
@@ -402,6 +392,29 @@
// Protected -----------------------------------------------------
+ protected List<Interceptor> getInterceptors()
+ {
+ return this.interceptors;
+ }
+
+ /**
+ * Subclasses (on tests) may use this to create a different channel.
+ */
+ protected RemotingConnection createChannel(final Connection connection)
+ {
+ RemotingConnection rc = new RemotingConnectionImpl(connection,
+ interceptors,
+ executorFactory != null ? executorFactory.getExecutor() : null);
+
+ Channel channel1 = rc.getChannel(1, -1, false);
+
+ ChannelHandler handler = handlerFactory.getHandler(rc, channel1);
+
+ channel1.setHandler(handler);
+
+ return rc;
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@@ -467,7 +480,7 @@
}
public void run()
- {
+ {
while (!closed)
{
long now = System.currentTimeMillis();
@@ -499,9 +512,9 @@
RemotingConnection conn = removeConnection(id);
HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive ping from " + conn.getRemoteAddress() +
- ". It is likely the client has exited or crashed without " +
- "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+ "Did not receive ping from " + conn.getRemoteAddress() +
+ ". It is likely the client has exited or crashed without " +
+ "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
conn.fail(me);
}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication;
+
+/**
+ * A ReplicationListener
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface BackupListener
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ void onReplication(byte data[]);
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication;
+
+
+/**
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface ReplicationManager
+{
+ void replicate(byte[] bytes, ReplicationToken token);
+
+
+ /** to be used on the backup node only */
+ void addBackupListener(BackupListener listener);
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication;
+
+
+/**
+ * This represents a set of operations done as part of replication.
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface ReplicationToken
+{
+ /** To be called by the replication manager, when new replication is added to the queue */
+ void linedUp();
+
+ /** To be called by the replication manager, when data is confirmed on the channel */
+ void replicated();
+
+ void addFutureCompletion(Runnable runnable);
+
+ /** To be called when there are no more operations pending */
+ void complete();
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import org.hornetq.core.replication.BackupListener;
+import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.ReplicationToken;
+
+/**
+ * A RepplicationManagerImpl
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationManagerImpl implements ReplicationManager
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#addListener(org.hornetq.core.replication.ReplicationListener)
+ */
+ public void addBackupListener(BackupListener listener)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
+ */
+ public void replicate(byte[] bytes, ReplicationToken token)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.Packet;
+
+/**
+ * A ReplicationPacketHandler
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationPacketHandler implements ChannelHandler
+{
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+ */
+ public void handlePacket(Packet packet)
+ {
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+
+import org.hornetq.core.replication.ReplicationToken;
+
+/**
+ * A ReplicationToken
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationTokenImpl implements ReplicationToken
+{
+ final Executor executor;
+
+ private ArrayList<Runnable> tasks;
+
+ private int pendings;
+
+ /**
+ * @param executor
+ */
+ public ReplicationTokenImpl(Executor executor)
+ {
+ super();
+ this.executor = executor;
+ }
+
+ /** To be called by the replication manager, when new replication is added to the queue */
+ public synchronized void linedUp()
+ {
+ pendings++;
+ }
+
+ /** To be called by the replication manager, when data is confirmed on the channel */
+ public synchronized void replicated()
+ {
+ if (pendings-- == 0)
+ {
+ if (tasks != null)
+ {
+ for (Runnable run : tasks)
+ {
+ executor.execute(run);
+ }
+ tasks.clear();
+ }
+ }
+ }
+
+ /** You may have several actions to be done after a replication operation is completed. */
+ public synchronized void addFutureCompletion(Runnable runnable)
+ {
+ if (pendings == 0)
+ {
+ executor.execute(runnable);
+ }
+ else
+ {
+ if (tasks == null)
+ {
+ tasks = new ArrayList<Runnable>();
+ }
+
+ tasks.add(runnable);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationToken#complete()
+ */
+ public void complete()
+ {
+ // TODO Auto-generated method stub
+
+ }
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-26 01:57:13 UTC (rev 7995)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -65,9 +65,11 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.server.HandlerFactory;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.security.CheckType;
@@ -959,8 +961,19 @@
managementService = new ManagementServiceImpl(mbeanServer, configuration, managementConnectorID);
+ final HandlerFactory handlerFactory = new HandlerFactory()
+ {
+
+ public ChannelHandler getHandler(RemotingConnection conn, Channel channel)
+ {
+ return new HornetQPacketHandler(HornetQServerImpl.this, channel, conn);
+ }
+
+ };
+
remotingService = new RemotingServiceImpl(configuration,
- this,
+ handlerFactory,
+ (configuration.isAsyncConnectionExecutionEnabled() ? this.executorFactory : null),
managementService,
threadPool,
scheduledPool,
Added: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java (rev 0)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.replication;
+
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+import javax.management.MBeanServer;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ConnectionManager;
+import org.hornetq.core.client.impl.ConnectionManagerImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.management.impl.HornetQServerControlImpl;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.server.HandlerFactory;
+import org.hornetq.core.remoting.server.RemotingService;
+import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
+import org.hornetq.core.remoting.spi.Connection;
+import org.hornetq.core.security.HornetQSecurityManager;
+import org.hornetq.core.security.Role;
+import org.hornetq.core.server.ActivateCallback;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.impl.HornetQPacketHandler;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.version.Version;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A ReplicationTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private RemotingService remoting;
+
+ private ThreadFactory tFactory;
+
+ private ExecutorService executor;
+
+ private ConnectionManager connectionManagerLive;
+
+ private ScheduledExecutorService scheduledExecutor;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testBasicConnection() throws Exception
+ {
+
+ RemotingConnection conn = connectionManagerLive.getConnection(1);
+
+ Channel chann = conn.getChannel(2, -1, false);
+
+ chann.close();
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ class LocalHandler implements ChannelHandler
+ {
+
+ final Channel channel;
+
+ /**
+ * @param channel
+ */
+ public LocalHandler(Channel channel)
+ {
+ super();
+ this.channel = channel;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+ */
+ public void handlePacket(Packet packet)
+ {
+ channel.send(new NullResponseMessage());
+ }
+
+ }
+
+ HandlerFactory handlerFactory = new HandlerFactory()
+ {
+
+ public ChannelHandler getHandler(RemotingConnection conn, Channel channel)
+ {
+ System.out.println("Created a handler");
+ return new LocalHandler(channel);
+ }
+
+ };
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ Configuration config = createDefaultConfig(false);
+
+ tFactory = new HornetQThreadFactory("HornetQ-ReplicationTest", false);
+
+ executor = Executors.newCachedThreadPool(tFactory);
+
+ scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
+
+ remoting = new RemotingServiceImpl(config, handlerFactory, null, null, executor, scheduledExecutor, 0);
+
+ remoting.start();
+
+ TransportConfiguration connectorConfig = new TransportConfiguration(InVMConnectorFactory.class.getName(),
+ new HashMap<String, Object>(),
+ randomString());
+
+ List<Interceptor> interceptors = new ArrayList<Interceptor>();
+
+ connectionManagerLive = new ConnectionManagerImpl(null,
+ connectorConfig,
+ null,
+ false,
+ 1,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+ 0,
+ 1.0d,
+ 0,
+ false,
+ executor,
+ scheduledExecutor,
+ interceptors);
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ remoting.stop();
+
+ executor.shutdown();
+
+ scheduledExecutor.shutdown();
+
+ remoting = null;
+
+ tFactory = null;
+
+ scheduledExecutor = null;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
15 years, 3 months
JBoss hornetq SVN: r7995 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-25 21:57:13 -0400 (Fri, 25 Sep 2009)
New Revision: 7995
Added:
branches/Replication_Clebert/
Log:
Creating temporary branch for work
Copied: branches/Replication_Clebert (from rev 7994, trunk)
15 years, 3 months
JBoss hornetq SVN: r7993 - trunk/tests/jms-tests/src/org/hornetq/jms/tests.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-24 22:45:51 -0400 (Thu, 24 Sep 2009)
New Revision: 7993
Modified:
trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
Log:
Some libraries are not available on jms-tests. Reverting some changes temporarily.
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2009-09-24 23:06:29 UTC (rev 7992)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2009-09-25 02:45:51 UTC (rev 7993)
@@ -30,9 +30,6 @@
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.referenceable.ConnectionFactoryObjectFactory;
import org.hornetq.jms.referenceable.DestinationObjectFactory;
-import org.hornetq.ra.HornetQRAConnectionFactory;
-import org.hornetq.ra.HornetQRAManagedConnectionFactory;
-import org.hornetq.ra.HornetQResourceAdapter;
/**
*
@@ -139,34 +136,34 @@
}
- public void testReferenceRAManagedCF() throws Exception
- {
- HornetQResourceAdapter ra = new HornetQResourceAdapter();
-
- HornetQRAManagedConnectionFactory mcf = new HornetQRAManagedConnectionFactory();
-
- mcf.setResourceAdapter(ra);
-
- ConnectionFactory racf = (ConnectionFactory)mcf.createConnectionFactory();
-
- Reference cfRef = ((Referenceable)racf).getReference();
-
- String factoryName = cfRef.getFactoryClassName();
-
- Class factoryClass = Class.forName(factoryName);
-
- ConnectionFactoryObjectFactory factory = (ConnectionFactoryObjectFactory)factoryClass.newInstance();
-
- Object instance = factory.getObjectInstance(cfRef, null, null, null);
-
- assertTrue(instance instanceof HornetQRAConnectionFactory);
-
- HornetQRAConnectionFactory racf2 = (HornetQRAConnectionFactory)instance;
-
- //TODO implement simpleSendReceive with test inside the container or wait until https://jira.jboss.org/jira/browse/HORNETQ-140 is done.
- }
+// public void testReferenceRAManagedCF() throws Exception
+// {
+// HornetQResourceAdapter ra = new HornetQResourceAdapter();
+//
+// HornetQRAManagedConnectionFactory mcf = new HornetQRAManagedConnectionFactory();
+//
+// mcf.setResourceAdapter(ra);
+//
+// ConnectionFactory racf = (ConnectionFactory)mcf.createConnectionFactory();
+//
+// Reference cfRef = ((Referenceable)racf).getReference();
+//
+// String factoryName = cfRef.getFactoryClassName();
+//
+// Class factoryClass = Class.forName(factoryName);
+//
+// ConnectionFactoryObjectFactory factory = (ConnectionFactoryObjectFactory)factoryClass.newInstance();
+//
+// Object instance = factory.getObjectInstance(cfRef, null, null, null);
+//
+// assertTrue(instance instanceof HornetQRAConnectionFactory);
+//
+// HornetQRAConnectionFactory racf2 = (HornetQRAConnectionFactory)instance;
+//
+// //TODO implement simpleSendReceive with test inside the container or wait until https://jira.jboss.org/jira/browse/HORNETQ-140 is done.
+// }
+//
-
protected void simpleSendReceive(ConnectionFactory cf, Destination dest) throws Exception
{
Connection conn = null;
15 years, 3 months
JBoss hornetq SVN: r7992 - in trunk: src/main/org/hornetq/jms/client and 3 other directories.
by do-not-reply@jboss.org
Author: plugtree
Date: 2009-09-24 19:06:29 -0400 (Thu, 24 Sep 2009)
New Revision: 7992
Modified:
trunk/src/main/org/hornetq/jms/HornetQDestination.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/src/main/org/hornetq/jms/referenceable/ConnectionFactoryObjectFactory.java
trunk/src/main/org/hornetq/ra/HornetQRAConnectionFactory.java
trunk/src/main/org/hornetq/ra/HornetQRAConnectionFactoryImpl.java
trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-24 Added Referenceable support for Destinations and ConnectionFactories.
Modified: trunk/src/main/org/hornetq/jms/HornetQDestination.java
===================================================================
--- trunk/src/main/org/hornetq/jms/HornetQDestination.java 2009-09-24 22:02:11 UTC (rev 7991)
+++ trunk/src/main/org/hornetq/jms/HornetQDestination.java 2009-09-24 23:06:29 UTC (rev 7992)
@@ -19,6 +19,7 @@
import javax.jms.Destination;
import javax.naming.NamingException;
import javax.naming.Reference;
+import javax.naming.Referenceable;
import org.hornetq.jms.referenceable.DestinationObjectFactory;
import org.hornetq.jms.referenceable.SerializableObjectRefAddr;
@@ -32,7 +33,7 @@
*
* $Id$
*/
-public abstract class HornetQDestination implements Destination, Serializable/*, Referenceable http://jira.jboss.org/jira/browse/JBMESSAGING-395*/
+public abstract class HornetQDestination implements Destination, Serializable, Referenceable
{
// Constants -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-09-24 22:02:11 UTC (rev 7991)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-09-24 23:06:29 UTC (rev 7992)
@@ -32,6 +32,7 @@
import javax.jms.XATopicConnectionFactory;
import javax.naming.NamingException;
import javax.naming.Reference;
+import javax.naming.Referenceable;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
@@ -47,10 +48,7 @@
* @version <tt>$Revision$</tt> $Id$
*/
public class HornetQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
- XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory, Serializable/*
- * , Referenceable
- * http://jira.jboss.org/jira/browse/JBMESSAGING-395
- */
+ XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory, Serializable, Referenceable
{
// Constants ------------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/referenceable/ConnectionFactoryObjectFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/referenceable/ConnectionFactoryObjectFactory.java 2009-09-24 22:02:11 UTC (rev 7991)
+++ trunk/src/main/org/hornetq/jms/referenceable/ConnectionFactoryObjectFactory.java 2009-09-24 23:06:29 UTC (rev 7992)
@@ -15,13 +15,12 @@
import java.util.Hashtable;
+import javax.jms.ConnectionFactory;
import javax.naming.Context;
import javax.naming.Name;
import javax.naming.Reference;
import javax.naming.spi.ObjectFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-
/**
*
* A ConnectionFactoryObjectFactory.
@@ -43,7 +42,7 @@
byte[] bytes = (byte[])r.get("HornetQ-CF").getContent();
// Deserialize
- return (HornetQConnectionFactory)SerializableObjectRefAddr.deserialize(bytes);
+ return (ConnectionFactory)SerializableObjectRefAddr.deserialize(bytes);
}
}
Modified: trunk/src/main/org/hornetq/ra/HornetQRAConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAConnectionFactory.java 2009-09-24 22:02:11 UTC (rev 7991)
+++ trunk/src/main/org/hornetq/ra/HornetQRAConnectionFactory.java 2009-09-24 23:06:29 UTC (rev 7992)
@@ -21,6 +21,7 @@
import javax.jms.XAConnectionFactory;
import javax.jms.XAQueueConnectionFactory;
import javax.jms.XATopicConnectionFactory;
+import javax.naming.Referenceable;
/**
* An aggregate interface for the JMS connection factories
@@ -30,7 +31,7 @@
* @version $Revision: $
*/
public interface HornetQRAConnectionFactory extends ConnectionFactory, TopicConnectionFactory, QueueConnectionFactory,
- XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory, Serializable
+ XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory, Serializable, Referenceable
{
/** Connection factory capable of handling connections */
public static final int CONNECTION = 0;
Modified: trunk/src/main/org/hornetq/ra/HornetQRAConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAConnectionFactoryImpl.java 2009-09-24 22:02:11 UTC (rev 7991)
+++ trunk/src/main/org/hornetq/ra/HornetQRAConnectionFactoryImpl.java 2009-09-24 23:06:29 UTC (rev 7992)
@@ -20,11 +20,13 @@
import javax.jms.XAConnection;
import javax.jms.XAQueueConnection;
import javax.jms.XATopicConnection;
+import javax.naming.NamingException;
import javax.naming.Reference;
-import javax.resource.Referenceable;
import javax.resource.spi.ConnectionManager;
import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.referenceable.ConnectionFactoryObjectFactory;
+import org.hornetq.jms.referenceable.SerializableObjectRefAddr;
/**
* The connection factory
@@ -33,7 +35,7 @@
* @author <a href="mailto:jesper.pedersen@jboss.org">Jesper Pedersen</a>
* @version $Revision: $
*/
-public class HornetQRAConnectionFactoryImpl implements HornetQRAConnectionFactory, Referenceable
+public class HornetQRAConnectionFactoryImpl implements HornetQRAConnectionFactory
{
/** Serial version UID */
static final long serialVersionUID = 7981708919479859360L;
@@ -111,8 +113,24 @@
{
log.trace("getReference()");
}
-
- return reference;
+ if(reference == null)
+ {
+ try
+ {
+ reference = new Reference(this.getClass().getCanonicalName(),
+ new SerializableObjectRefAddr("HornetQ-CF", this),
+ ConnectionFactoryObjectFactory.class.getCanonicalName(),
+ null);
+ }
+ catch (NamingException e)
+ {
+ log.error("Error while giving object Reference.", e);
+ }
+ }
+
+ return reference;
+
+
}
/**
Modified: trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2009-09-24 22:02:11 UTC (rev 7991)
+++ trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2009-09-24 23:06:29 UTC (rev 7992)
@@ -12,6 +12,7 @@
*/
package org.hornetq.ra;
+import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,7 +44,7 @@
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @version $Revision: $
*/
-public class HornetQResourceAdapter implements ResourceAdapter
+public class HornetQResourceAdapter implements ResourceAdapter, Serializable
{
/**
* The logger
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2009-09-24 22:02:11 UTC (rev 7991)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2009-09-24 23:06:29 UTC (rev 7992)
@@ -22,7 +22,18 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import org.hornetq.jms.HornetQQueue;
+import org.hornetq.jms.HornetQTopic;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.referenceable.ConnectionFactoryObjectFactory;
+import org.hornetq.jms.referenceable.DestinationObjectFactory;
+import org.hornetq.ra.HornetQRAConnectionFactory;
+import org.hornetq.ra.HornetQRAManagedConnectionFactory;
+import org.hornetq.ra.HornetQResourceAdapter;
+
/**
*
* A ReferenceableTest.
@@ -55,15 +66,14 @@
assertTrue(topic1 instanceof Serializable);
}
- /* http://jira.jboss.org/jira/browse/JBMESSAGING-395
public void testReferenceable() throws Exception
{
assertTrue(cf instanceof Referenceable);
- assertTrue(queue instanceof Referenceable);
+ assertTrue(queue1 instanceof Referenceable);
- assertTrue(topic instanceof Referenceable);
+ assertTrue(topic1 instanceof Referenceable);
}
public void testReferenceCF() throws Exception
@@ -78,16 +88,16 @@
Object instance = factory.getObjectInstance(cfRef, null, null, null);
- assertTrue(instance instanceof HornetQRAConnectionFactory);
+ assertTrue(instance instanceof HornetQConnectionFactory);
- HornetQRAConnectionFactory cf2 = (HornetQRAConnectionFactory)instance;
+ HornetQConnectionFactory cf2 = (HornetQConnectionFactory)instance;
- simpleSendReceive(cf2, queue);
+ simpleSendReceive(cf2, queue1);
}
public void testReferenceQueue() throws Exception
{
- Reference queueRef = ((Referenceable)queue).getReference();
+ Reference queueRef = ((Referenceable)queue1).getReference();
String factoryName = queueRef.getFactoryClassName();
@@ -101,7 +111,7 @@
HornetQQueue queue2 = (HornetQQueue)instance;
- assertEquals(queue.getQueueName(), queue2.getQueueName());
+ assertEquals(queue1.getQueueName(), queue2.getQueueName());
simpleSendReceive(cf, queue2);
@@ -109,7 +119,7 @@
public void testReferenceTopic() throws Exception
{
- Reference topicRef = ((Referenceable)topic).getReference();
+ Reference topicRef = ((Referenceable)topic1).getReference();
String factoryName = topicRef.getFactoryClassName();
@@ -123,14 +133,40 @@
HornetQTopic topic2 = (HornetQTopic)instance;
- assertEquals(topic.getTopicName(), topic2.getTopicName());
+ assertEquals(topic1.getTopicName(), topic2.getTopicName());
simpleSendReceive(cf, topic2);
}
- */
+ public void testReferenceRAManagedCF() throws Exception
+ {
+ HornetQResourceAdapter ra = new HornetQResourceAdapter();
+
+ HornetQRAManagedConnectionFactory mcf = new HornetQRAManagedConnectionFactory();
+
+ mcf.setResourceAdapter(ra);
+
+ ConnectionFactory racf = (ConnectionFactory)mcf.createConnectionFactory();
+
+ Reference cfRef = ((Referenceable)racf).getReference();
+
+ String factoryName = cfRef.getFactoryClassName();
+
+ Class factoryClass = Class.forName(factoryName);
+
+ ConnectionFactoryObjectFactory factory = (ConnectionFactoryObjectFactory)factoryClass.newInstance();
+
+ Object instance = factory.getObjectInstance(cfRef, null, null, null);
+
+ assertTrue(instance instanceof HornetQRAConnectionFactory);
+
+ HornetQRAConnectionFactory racf2 = (HornetQRAConnectionFactory)instance;
+
+ //TODO implement simpleSendReceive with test inside the container or wait until https://jira.jboss.org/jira/browse/HORNETQ-140 is done.
+ }
+
protected void simpleSendReceive(ConnectionFactory cf, Destination dest) throws Exception
{
Connection conn = null;
15 years, 3 months
JBoss hornetq SVN: r7990 - in branches/Branch_Replication_Changes: src/main/org/hornetq/core/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-24 12:14:43 -0400 (Thu, 24 Sep 2009)
New Revision: 7990
Modified:
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
Log:
replication changes
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-09-24 15:50:40 UTC (rev 7989)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-09-24 16:14:43 UTC (rev 7990)
@@ -500,22 +500,22 @@
private void fail()
{
+ log.info("bridge " + name + " has failed");
+
if (started)
{
- //executor.execute(new FailRunnable());
-
try
{
- cancelRefs();
+ setupNotificationConsumer();
- //setupNotificationConsumer();
+ cancelRefs();
}
catch (Exception e)
{
log.error("Failed to handle failure", e);
}
- }
- }
+ }
+ }
private ClientConsumer notifConsumer;
@@ -620,8 +620,7 @@
}
try
- {
- csf = null;
+ {
if (discoveryAddress != null)
{
csf = new ClientSessionFactoryImpl(discoveryAddress, discoveryPort);
@@ -708,56 +707,6 @@
}
}
-// private class FailRunnable implements Runnable
-// {
-// public void run()
-// {
-// synchronized (BridgeImpl.this)
-// {
-//
-// if (!started)
-// {
-// return;
-// }
-//
-// if (flowRecord != null)
-// {
-// try
-// {
-// // flowRecord.reset();
-// }
-// catch (Exception e)
-// {
-// log.error("Failed to reset", e);
-// }
-// }
-//
-// active = false;
-// }
-//
-// try
-// {
-// queue.removeConsumer(BridgeImpl.this);
-//
-// session.cleanUp();
-//
-// cancelRefs();
-//
-// csf.close();
-// }
-// catch (Exception e)
-// {
-// log.error("Failed to stop", e);
-// }
-//
-// if (!createObjects())
-// {
-// started = false;
-// }
-// }
-// }
-// }
-
private class CreateObjectsRunnable implements Runnable
{
public synchronized void run()
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-09-24 15:50:40 UTC (rev 7989)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-09-24 16:14:43 UTC (rev 7990)
@@ -388,7 +388,7 @@
retryInterval,
1d,
-1,
- false,
+ true,
useDuplicateDetection,
managementService.getManagementAddress(),
managementService.getManagementNotificationAddress(),
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-09-24 15:50:40 UTC (rev 7989)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-09-24 16:14:43 UTC (rev 7990)
@@ -1348,12 +1348,19 @@
public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
{
final ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
+
Packet response;
try
{
- consumer.close();
+ if (consumer != null)
+ {
+ consumer.close();
+ }
+ else
+ {
+ log.error("Cannot find consumer with id " + packet.getConsumerID());
+ }
response = new NullResponseMessage();
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2009-09-24 15:50:40 UTC (rev 7989)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2009-09-24 16:14:43 UTC (rev 7990)
@@ -147,8 +147,12 @@
long start = System.currentTimeMillis();
+ log.info("stopping server 1");
+
stopServers(1);
+ log.info("restarting server 1");
+
startServers(1);
long end = System.currentTimeMillis();
15 years, 3 months
JBoss hornetq SVN: r7989 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-09-24 11:50:40 -0400 (Thu, 24 Sep 2009)
New Revision: 7989
Modified:
trunk/docs/user-manual/en/management.xml
Log:
documentation
* added section about removeMessages() filter. Setting it to an empty string
means to select all messages
Modified: trunk/docs/user-manual/en/management.xml
===================================================================
--- trunk/docs/user-manual/en/management.xml 2009-09-24 11:47:36 UTC (rev 7988)
+++ trunk/docs/user-manual/en/management.xml 2009-09-24 15:50:40 UTC (rev 7989)
@@ -175,7 +175,9 @@
<para>Messages can also be removed from the queue by using the <literal
>removeMessages()</literal> method which returns a <literal
>boolean</literal> for the single message ID variant or the number of
- removed messages for the filter variant.</para>
+ removed messages for the filter variant. The <literal>removeMessages()</literal>
+ method takes a <literal>filter</literal> argument to remove only filtered messages. Setting
+ the filter to an empty string will in effect remove all messages.</para>
</listitem>
<listitem>
<para>Counting messages</para>
@@ -396,7 +398,10 @@
<para>Messages can also be removed from the queue by using the <literal
>removeMessages()</literal> method which returns a <literal
>boolean</literal> for the single message ID variant or the number of
- removed messages for the filter variant.</para>
+ removed messages for the filter variant.
+ The <literal>removeMessages()</literal>
+ method takes a <literal>filter</literal> argument to remove only filtered messages. Setting
+ the filter to an empty string will in effect remove all messages.</para>
</listitem>
<listitem>
<para>Counting messages</para>
15 years, 3 months
JBoss hornetq SVN: r7988 - in branches/hornetq_grouping: src/main/org/hornetq/core/postoffice/impl and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-09-24 07:47:36 -0400 (Thu, 24 Sep 2009)
New Revision: 7988
Modified:
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
Log:
example fixes
Modified: branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java 2009-09-24 09:58:07 UTC (rev 7987)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java 2009-09-24 11:47:36 UTC (rev 7988)
@@ -39,7 +39,6 @@
public boolean runExample() throws Exception
{
- Thread.sleep(5000);
Connection connection0 = null;
Connection connection1 = null;
@@ -73,7 +72,7 @@
ic2 = getContext(2);
// Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
- ConnectionFactory cf2 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+ ConnectionFactory cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
// Step 6. We create a JMS Connection connection0 which is a connection to server 0
connection0 = cf0.createConnection();
@@ -122,11 +121,30 @@
producer0.send(message);
+ System.out.println("Sent messages: " + message.getText() + " to node 0");
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session1.createTextMessage("This is text message " + (i + 10));
+
+ message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+
producer1.send(message);
+ System.out.println("Sent messages: " + message.getText() + " to node 1");
+
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session2.createTextMessage("This is text message " + (i + 20));
+
+ message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+
producer2.send(message);
- System.out.println("Sent messages: " + message.getText());
+ System.out.println("Sent messages: " + message.getText() + " to node 2");
}
// Step 14. We now consume those messages on *both* server 0 and server 1.
@@ -134,7 +152,7 @@
// JMS Queues implement point-to-point message where each message is only ever consumed by a
// maximum of one consumer
- for (int i = 0; i < numMessages; i += 2)
+ for (int i = 0; i < numMessages * 3; i++)
{
TextMessage message0 = (TextMessage)consumer.receive(5000);
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-24 09:58:07 UTC (rev 7987)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-24 11:47:36 UTC (rev 7988)
@@ -264,11 +264,6 @@
public void route(final ServerMessage message, final Transaction tx) throws Exception
{
boolean routed = false;
- Object o = message.getProperty("count_prop");
- if (o != null)
- {
- System.out.println("routing message " + o);
- }
if (!exclusiveBindings.isEmpty())
{
for (Binding binding : exclusiveBindings)
@@ -478,7 +473,6 @@
if( chosen != null )
{
- System.out.println("sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
chosen.willRoute(message);
chosen.getBindable().preroute(message, tx);
chosen.getBindable().route(message, tx);
@@ -503,14 +497,12 @@
}
if( chosen != null)
{
- System.out.println("found sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
chosen.willRoute(message);
chosen.getBindable().preroute(message, tx);
chosen.getBindable().route(message, tx);
}
else
{
- System.out.println("BindingsImpl.route");
throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
}
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-09-24 09:58:07 UTC (rev 7987)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-09-24 11:47:36 UTC (rev 7988)
@@ -70,12 +70,10 @@
Response response = new Response(proposal.getProposalType(), proposal.getProposal());
if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
{
- log.info("accepted proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
return response;
}
else
{
- log.info("denied proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
return new Response(proposal.getProposalType(), proposal.getProposal(), map.get(proposal.getProposalType()));
}
}
@@ -87,7 +85,6 @@
public void send(Response response, int distance) throws Exception
{
Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
- log.info("sending proposal response for " + response.getResponseType() + " with value " + value);
TypedProperties props = new TypedProperties();
props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)response.getOriginal());
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-09-24 09:58:07 UTC (rev 7987)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-09-24 11:47:36 UTC (rev 7988)
@@ -77,7 +77,6 @@
{
lock.lock();
TypedProperties props = new TypedProperties();
- log.info("sending proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
@@ -102,7 +101,6 @@
public void proposed(Response response) throws Exception
{
Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
- log.info("received proposal response for " + response.getResponseType() + " with value " + value);
try
{
lock.lock();
15 years, 3 months
JBoss hornetq SVN: r7987 - in branches/hornetq_grouping: examples/jms/clustered-grouping and 18 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-09-24 05:58:07 -0400 (Thu, 24 Sep 2009)
New Revision: 7987
Added:
branches/hornetq_grouping/examples/jms/clustered-grouping/
branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml
branches/hornetq_grouping/examples/jms/clustered-grouping/src/
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/
branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
Removed:
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java
Modified:
branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd
branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
example and fixes
Copied: branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml (from rev 7977, branches/hornetq_grouping/examples/jms/clustered-queue/build.xml)
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/build.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+<project default="run" name="HornetQ JMS Clustered Grouping Example">
+
+ <import file="../../common/build.xml"/>
+
+ <target name="run">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.ClusteredGroupingExample"/>
+ <param name="hornetq.example.beans.file" value="server0 server1 server2"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.ClusteredGroupingExample"/>
+ <param name="hornetq.example.runServer" value="false"/>
+ </antcall>
+ </target>
+
+</project>
Copied: branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html (from rev 7977, branches/hornetq_grouping/examples/jms/clustered-queue/readme.html)
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/readme.html 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,174 @@
+<html>
+ <head>
+ <title>HornetQ JMS Load Balanced Queue Example</title>
+ <link rel="stylesheet" type="text/css" href="../../common/common.css">
+ </head>
+ <body>
+ <h1>HornetQ JMS Load Balanced Clustered Queue Example</h1>
+ <br>
+ <p>This example demonstrates a JMS queue deployed on two different nodes. The two nodes are configured to form a cluster.</p>
+ <p>We then create a consumer on the queue on each node, and we create a producer on only one of the nodes.</p>
+ <p>We then send some messages via the producer, and we verify that <b>both</b> consumers receive the sent messages
+ in a round-robin fashion.</p>
+ <p>In other words, HornetQ <b>load balances</b> the sent messages across all consumers on the cluster</p>
+ <p>This example uses JNDI to lookup the JMS Queue and ConnectionFactory objects. If you prefer not to use
+ JNDI, these could be instantiated directly.</p>
+ <p>Here's the relevant snippet from the server configuration, which tells the server to form a cluster between the two nodes
+ and to load balance the messages between the nodes.</p>
+ <pre>
+ <code><cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </code>
+ </pre>
+ <p>For more information on HornetQ load balancing, and clustering in general, please see the clustering
+ section of the user manual.</p>
+ <h2>Example step-by-step</h2>
+ <p><i>To run the example, simply type <code>ant</code> from this directory</i></p>
+ <br>
+ <ol>
+ <li> Get an initial context for looking up JNDI from server 0.</li>
+ <pre>
+ <code>
+ ic0 = getContext(0);
+ </code>
+ </pre>
+
+ <li>Look-up the JMS Queue object from JNDI</li>
+ <pre>
+ <code>Queue queue = (Queue)ic0.lookup("/queue/exampleQueue");</code>
+ </pre>
+
+ <li>Look-up a JMS Connection Factory object from JNDI on server 0</li>
+ <pre>
+ <code>ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");</code>
+ </pre>
+
+ <li>Get an initial context for looking up JNDI from server 1.</li>
+ <pre>
+ <code>ic1 = getContext(1);</code>
+ </pre>
+
+ <li>Look-up a JMS Connection Factory object from JNDI on server 1</li>
+ <pre>
+ <code>ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+ </code>
+ </pre>
+
+ <li>We create a JMS Connection connection0 which is a connection to server 0</li>
+ <pre>
+ <code>
+ connection0 = cf0.createConnection();
+ </code>
+ </pre>
+
+ <li>We create a JMS Connection connection1 which is a connection to server 1</li>
+ <pre>
+ <code>
+ connection1 = cf1.createConnection();
+ </code>
+ </pre>
+
+ <li>We create a JMS Session on server 0</li>
+ <pre>
+ <code>
+ Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ </code>
+ </pre>
+
+ <li>We create a JMS Session on server 1</li>
+ <pre>
+ <code>
+ Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ </code>
+ </pre>
+
+ <li>We start the connections to ensure delivery occurs on them</li>
+ <pre>
+ <code>
+ connection0.start();
+
+ connection1.start();
+ </code>
+ </pre>
+
+ <li>We create JMS MessageConsumer objects on server 0 and server 1</li>
+ <pre>
+ <code>
+ MessageConsumer consumer0 = session0.createConsumer(queue);
+
+ MessageConsumer consumer1 = session1.createConsumer(queue);
+ </code>
+ </pre>
+
+ <li>We create a JMS MessageProducer object on server 0.</li>
+ <pre>
+ <code>
+ MessageProducer producer = session0.createProducer(queue);</code>
+ </pre>
+
+ <li>We send some messages to server 0.</li>
+ <pre>
+ <code>
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session0.createTextMessage("This is text message " + i);
+
+ producer.send(message);
+
+ System.out.println("Sent message: " + message.getText());
+ }
+ </code>
+ </pre>
+
+ <li>We now consume those messages on *both* server 0 and server 1.
+ We note the messages have been distributed between servers in a round robin fashion.
+ HornetQ has <b>load balanced</b> the messages between the available consumers on the different nodes.
+ HornetQ can be configured to always load balance messages to all nodes, or to only balance messages
+ to nodes which have consumers with no or matching selectors. See the user manual for more details.</li>
+ JMS Queues implement point-to-point message where each message is only ever consumed by a
+ maximum of one consumer.
+ <pre>
+ <code>
+ for (int i = 0; i < numMessages; i += 2)
+ {
+ TextMessage message0 = (TextMessage)consumer0.receive(5000);
+
+ System.out.println("Got message: " + message0.getText() + " from node 0");
+
+ TextMessage message1 = (TextMessage)consumer1.receive(5000);
+
+ System.out.println("Got message: " + message1.getText() + " from node 1");
+ }
+ </code>
+ </pre>
+
+ <li>And finally (no pun intended), <b>always</b> remember to close your JMS resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
+
+ <pre>
+ <code>
+ finally
+ {
+ if (connection0 != null)
+ {
+ connection0.close();
+ }
+
+ if (connection1 != null)
+ {
+ connection1.close();
+ }
+ }
+ </code>
+ </pre>
+
+ </ol>
+ </body>
+</html>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/client-jndi.properties 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-beans.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+
+</deployment>
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-configuration.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,71 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <clustered>true</clustered>
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Clustering configuration -->
+ <broadcast-groups>
+ <broadcast-group name="my-broadcast-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>100</broadcast-period>
+ <connector-ref connector-name="netty-connector"/>
+ </broadcast-group>
+ </broadcast-groups>
+
+ <discovery-groups>
+ <discovery-group name="my-discovery-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>10000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
+
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>LOCAL</type>
+ <address>jms</address>
+ </grouping-handler>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-jms.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server0/hornetq-users.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/client-jndi.properties 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:2099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-beans.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">2099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">2098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-configuration.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,70 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <clustered>true</clustered>
+
+ <!-- Connectors -->
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5446" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5446" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Clustering configuration -->
+ <broadcast-groups>
+ <broadcast-group name="my-broadcast-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>100</broadcast-period>
+ <connector-ref connector-name="netty-connector"/>
+ </broadcast-group>
+ </broadcast-groups>
+
+ <discovery-groups>
+ <discovery-group name="my-discovery-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>10000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
+
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>REMOTE</type>
+ <address>jms</address>
+ </grouping-handler>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-jms.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server1/hornetq-users.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/client-jndi.properties 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:3099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-beans.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">3099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">3098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-configuration.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,70 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <clustered>true</clustered>
+
+ <!-- Connectors -->
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5447" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5447" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Clustering configuration -->
+ <broadcast-groups>
+ <broadcast-group name="my-broadcast-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>100</broadcast-period>
+ <connector-ref connector-name="netty-connector"/>
+ </broadcast-group>
+ </broadcast-groups>
+
+ <discovery-groups>
+ <discovery-group name="my-discovery-group">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>10000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
+
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <retry-interval>500</retry-interval>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>true</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <discovery-group-ref discovery-group-name="my-discovery-group"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <grouping-handler name="my-grouping-handler">
+ <type>REMOTE</type>
+ <address>jms</address>
+ </grouping-handler>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-jms.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,17 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Added: branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/server2/hornetq-users.xml 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Copied: branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java (from rev 7977, branches/hornetq_grouping/examples/jms/clustered-queue/src/org/hornetq/jms/example/ClusteredQueueExample.java)
===================================================================
--- branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java (rev 0)
+++ branches/hornetq_grouping/examples/jms/clustered-grouping/src/org/hornetq/jms/example/ClusteredGroupingExample.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+import org.hornetq.jms.client.HornetQMessage;
+
+/**
+ * A simple example that demonstrates server side load-balancing of messages between the queue instances on different
+ * nodes of the cluster.
+ *
+ * @author <a href="tim.fox(a)jboss.com>Tim Fox</a>
+ */
+public class ClusteredGroupingExample extends HornetQExample
+{
+ public static void main(String[] args)
+ {
+ new ClusteredGroupingExample().run(args);
+ }
+
+ public boolean runExample() throws Exception
+ {
+ Thread.sleep(5000);
+ Connection connection0 = null;
+
+ Connection connection1 = null;
+
+ Connection connection2 = null;
+
+ InitialContext ic0 = null;
+
+ InitialContext ic1 = null;
+
+ InitialContext ic2 = null;
+
+ try
+ {
+ // Step 1. Get an initial context for looking up JNDI from server 0
+ ic0 = getContext(0);
+
+ // Step 2. Look-up the JMS Queue object from JNDI
+ Queue queue = (Queue)ic0.lookup("/queue/exampleQueue");
+
+ // Step 3. Look-up a JMS Connection Factory object from JNDI on server 0
+ ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+
+ // Step 4. Get an initial context for looking up JNDI from server 1
+ ic1 = getContext(1);
+
+ // Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
+ ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ // Step 4. Get an initial context for looking up JNDI from server 1
+ ic2 = getContext(2);
+
+ // Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
+ ConnectionFactory cf2 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ // Step 6. We create a JMS Connection connection0 which is a connection to server 0
+ connection0 = cf0.createConnection();
+
+ // Step 7. We create a JMS Connection connection1 which is a connection to server 1
+ connection1 = cf1.createConnection();
+
+ // Step 7. We create a JMS Connection connection1 which is a connection to server 1
+ connection2 = cf2.createConnection();
+
+ // Step 8. We create a JMS Session on server 0
+ Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Step 9. We create a JMS Session on server 1
+ Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Step 10. We start the connections to ensure delivery occurs on them
+ connection0.start();
+
+ connection1.start();
+
+ connection2.start();
+
+ // Step 11. We create JMS MessageConsumer objects on server 0 and server 1
+ MessageConsumer consumer = session0.createConsumer(queue);
+
+
+ // Step 12. We create a JMS MessageProducer object on server 0
+ MessageProducer producer0 = session0.createProducer(queue);
+
+ MessageProducer producer1 = session1.createProducer(queue);
+
+ MessageProducer producer2 = session2.createProducer(queue);
+
+ // Step 13. We send some messages to server 0
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session0.createTextMessage("This is text message " + i);
+
+ message.setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+
+ producer0.send(message);
+
+ producer1.send(message);
+
+ producer2.send(message);
+
+ System.out.println("Sent messages: " + message.getText());
+ }
+
+ // Step 14. We now consume those messages on *both* server 0 and server 1.
+ // We note the messages have been distributed between servers in a round robin fashion
+ // JMS Queues implement point-to-point message where each message is only ever consumed by a
+ // maximum of one consumer
+
+ for (int i = 0; i < numMessages; i += 2)
+ {
+ TextMessage message0 = (TextMessage)consumer.receive(5000);
+
+ System.out.println("Got message: " + message0.getText() + " from node 0");
+
+ }
+
+ return true;
+ }
+ finally
+ {
+ // Step 15. Be sure to close our resources!
+
+ if (connection0 != null)
+ {
+ connection0.close();
+ }
+
+ if (connection1 != null)
+ {
+ connection1.close();
+ }
+
+ if (ic0 != null)
+ {
+ ic0.close();
+ }
+
+ if (ic1 != null)
+ {
+ ic1.close();
+ }
+ }
+ }
+
+}
\ No newline at end of file
Modified: branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/config/common/schema/hornetq-configuration.xsd 2009-09-24 09:58:07 UTC (rev 7987)
@@ -128,6 +128,8 @@
</xsd:sequence>
</xsd:complexType>
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="grouping-handler" type="groupingHandlerType">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="paging-directory" type="xsd:string">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="bindings-directory" type="xsd:string">
@@ -383,6 +385,21 @@
</xsd:restriction>
</xsd:simpleType>
+ <xsd:complexType name="groupingHandlerType">
+ <xsd:sequence>
+ <xsd:element maxOccurs="1" minOccurs="1" name="type" type="groupingHandlerTypeType"/>
+ <xsd:element maxOccurs="1" minOccurs="1" name="address" type="xsd:string"/>
+ </xsd:sequence>
+ <xsd:attribute name="name" type="xsd:string" use="required"/>
+ </xsd:complexType>
+
+ <xsd:simpleType name="groupingHandlerTypeType">
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="LOCAL"/>
+ <xsd:enumeration value="REMOTE"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+
<xsd:element name="security-settings">
<xsd:complexType>
<xsd:sequence>
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/Configuration.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -25,6 +25,7 @@
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.utils.SimpleString;
/**
@@ -125,6 +126,10 @@
void setDiscoveryGroupConfigurations(Map<String, DiscoveryGroupConfiguration> configs);
+ List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations();
+
+ void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration);
+
List<BridgeConfiguration> getBridgeConfigurations();
void setBridgeConfigurations(final List<BridgeConfiguration> configs);
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -30,6 +30,7 @@
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.utils.SimpleString;
/**
@@ -216,6 +217,8 @@
protected Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations = new LinkedHashMap<String, DiscoveryGroupConfiguration>();
+ protected List<GroupingHandlerConfiguration> groupingHandlerConfiguration = new ArrayList<GroupingHandlerConfiguration>();
+
// Paging related attributes ------------------------------------------------------------
protected String pagingDirectory = DEFAULT_PAGING_DIR;
@@ -465,6 +468,17 @@
this.backupConnectorName = backupConnectorName;
}
+ public List<GroupingHandlerConfiguration> getGroupingHandlerConfigurations()
+ {
+ return groupingHandlerConfiguration;
+ }
+
+ public void setGroupingHandlerConfigurationConfigurations(List<GroupingHandlerConfiguration> groupingHandlerConfiguration)
+ {
+ this.groupingHandlerConfiguration = groupingHandlerConfiguration;
+ }
+
+
public List<BridgeConfiguration> getBridgeConfigurations()
{
return bridgeConfigurations;
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -46,6 +46,7 @@
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.XMLUtil;
@@ -249,6 +250,15 @@
parseBridgeConfiguration(mfNode);
}
+ NodeList gaNodes = e.getElementsByTagName("grouping-handler");
+ System.out.println("gaNodes.getLength() = " + gaNodes.getLength());
+ for (int i = 0; i < gaNodes.getLength(); i++)
+ {
+ Element gaNode = (Element) gaNodes.item(i);
+
+ parseGroupingHandlerConfiguration(gaNode);
+ }
+
NodeList ccNodes = e.getElementsByTagName("cluster-connection");
for (int i = 0; i < ccNodes.getLength(); i++)
@@ -558,6 +568,20 @@
clusterConfigurations.add(config);
}
+ private void parseGroupingHandlerConfiguration(final Element node)
+ {
+ String name = node.getAttribute("name");
+ String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
+ String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
+ GroupingHandlerConfiguration arbitratorConfiguration =
+ new GroupingHandlerConfiguration(new SimpleString(name),
+ type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())? GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
+ new SimpleString(address));
+ System.out.println("arbitratorConfiguration = " + arbitratorConfiguration);
+ groupingHandlerConfiguration.add(arbitratorConfiguration);
+ }
+
+
private void parseBridgeConfiguration(final Element brNode)
{
String name = brNode.getAttribute("name");
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -19,7 +19,7 @@
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -69,7 +69,7 @@
Object getNotificationLock();
- void addArbitrator(Arbitrator arbitrator);
+ void setGroupingHandler(GroupingHandler groupingHandler);
- Arbitrator getArbitrator();
+ GroupingHandler getGroupingHandler();
}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -34,7 +34,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.utils.SimpleString;
@@ -284,96 +284,15 @@
if (!routed)
{
- Arbitrator groupingArbitrator = postOffice.getArbitrator();
+ GroupingHandler groupingGroupingHandler = postOffice.getGroupingHandler();
if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
{
routeFromCluster(message, tx);
}
- else if(groupingArbitrator != null && message.getProperty(MessageImpl.HDR_GROUP_ID)!= null)
+ else if(groupingGroupingHandler != null && message.getProperty(MessageImpl.HDR_GROUP_ID)!= null)
{
- SimpleString groupId = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
- Response resp = groupingArbitrator.propose(new Proposal(groupId, null));
- if(resp == null)
- {
- for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
- {
- SimpleString routingName = entry.getKey();
-
- List<Binding> bindings = entry.getValue();
- Binding chosen = null;
- Binding lowestPriorityBinding = null;
- int lowestPriority = Integer.MAX_VALUE;
- for (Binding binding : bindings)
- {
- boolean bindingIsHighAcceptPriority = binding.isHighAcceptPriority(message);
- int distance = binding.getDistance();
- if((distance < lowestPriority))
- {
- lowestPriorityBinding = binding;
- lowestPriority = distance;
- if(bindingIsHighAcceptPriority)
- {
- chosen = binding;
- }
- }
- }
- if(chosen == null)
- {
- chosen = lowestPriorityBinding;
- }
- resp = groupingArbitrator.propose(new Proposal(groupId, chosen.getClusterName()));
- if(!resp.getChosen().equals(chosen.getClusterName()))
- {
- for (Binding binding : bindings)
- {
- if (binding.getClusterName().equals(resp.getChosen()))
- {
- chosen = binding;
- break;
- }
- }
- }
-
- if( chosen != null )
- {
- System.out.println("sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
- chosen.willRoute(message);
- chosen.getBindable().preroute(message, tx);
- chosen.getBindable().route(message, tx);
- }
- }
- }
- else
- {
- for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
- {
- SimpleString routingName = entry.getKey();
-
- List<Binding> bindings = entry.getValue();
- Binding chosen = null;
- for (Binding binding : bindings)
- {
- if(binding.getClusterName().equals(resp.getChosen()))
- {
- chosen = binding;
- break;
- }
- }
- if( chosen != null)
- {
- System.out.println("found sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
- chosen.willRoute(message);
- chosen.getBindable().preroute(message, tx);
- chosen.getBindable().route(message, tx);
- }
- else
- {
- System.out.println("BindingsImpl.route");
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
- }
- }
- }
+ routeUsingStrictOrdering(message, tx, groupingGroupingHandler);
}
else
{
@@ -511,6 +430,93 @@
}
}
+ private void routeUsingStrictOrdering(ServerMessage message, Transaction tx, GroupingHandler groupingGroupingHandler)
+ throws Exception
+ {
+ SimpleString groupId = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
+ Response resp = groupingGroupingHandler.propose(new Proposal(groupId, null));
+ if(resp == null)
+ {
+ for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
+ {
+ SimpleString routingName = entry.getKey();
+
+ List<Binding> bindings = entry.getValue();
+ Binding chosen = null;
+ Binding lowestPriorityBinding = null;
+ int lowestPriority = Integer.MAX_VALUE;
+ for (Binding binding : bindings)
+ {
+ boolean bindingIsHighAcceptPriority = binding.isHighAcceptPriority(message);
+ int distance = binding.getDistance();
+ if((distance < lowestPriority))
+ {
+ lowestPriorityBinding = binding;
+ lowestPriority = distance;
+ if(bindingIsHighAcceptPriority)
+ {
+ chosen = binding;
+ }
+ }
+ }
+ if(chosen == null)
+ {
+ chosen = lowestPriorityBinding;
+ }
+ resp = groupingGroupingHandler.propose(new Proposal(groupId, chosen.getClusterName()));
+ if(!resp.getChosen().equals(chosen.getClusterName()))
+ {
+ for (Binding binding : bindings)
+ {
+ if (binding.getClusterName().equals(resp.getChosen()))
+ {
+ chosen = binding;
+ break;
+ }
+ }
+ }
+
+ if( chosen != null )
+ {
+ System.out.println("sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
+ chosen.willRoute(message);
+ chosen.getBindable().preroute(message, tx);
+ chosen.getBindable().route(message, tx);
+ }
+ }
+ }
+ else
+ {
+ for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
+ {
+ SimpleString routingName = entry.getKey();
+
+ List<Binding> bindings = entry.getValue();
+ Binding chosen = null;
+ for (Binding binding : bindings)
+ {
+ if(binding.getClusterName().equals(resp.getChosen()))
+ {
+ chosen = binding;
+ break;
+ }
+ }
+ if( chosen != null)
+ {
+ System.out.println("found sending message" + message.getProperty("count_prop") + " to " + chosen.getClusterName());
+ chosen.willRoute(message);
+ chosen.getBindable().preroute(message, tx);
+ chosen.getBindable().route(message, tx);
+ }
+ else
+ {
+ System.out.println("BindingsImpl.route");
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "queue " + resp.getChosen() + " has been removed cannot deliver message, queues should not be removed when grouping is used");
+ }
+ }
+ }
+ }
+
private final int incrementPos(int pos, int length)
{
pos++;
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -50,7 +50,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -125,7 +125,7 @@
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
- private Arbitrator groupingArbitrator;
+ private GroupingHandler groupingGroupingHandler;
public PostOfficeImpl(final HornetQServer server,
final StorageManager storageManager,
@@ -743,14 +743,14 @@
}
- public void addArbitrator(Arbitrator arbitrator)
+ public void setGroupingHandler(GroupingHandler groupingHandler)
{
- groupingArbitrator = arbitrator;
+ groupingGroupingHandler = groupingHandler;
}
- public Arbitrator getArbitrator()
+ public GroupingHandler getGroupingHandler()
{
- return groupingArbitrator;
+ return groupingGroupingHandler;
}
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -638,10 +638,10 @@
}
SimpleString val = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
- Response response = postOffice.getArbitrator().receive(new Proposal(type, val), hops + 1);
+ Response response = postOffice.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
if(response != null)
{
- postOffice.getArbitrator().send(response, 0);
+ postOffice.getGroupingHandler().send(response, 0);
}
}
@@ -656,8 +656,8 @@
SimpleString alt = (SimpleString) message.getProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE);
Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
Response response = new Response(type, val, alt);
- postOffice.getArbitrator().proposed(response);
- postOffice.getArbitrator().send(response, hops + 1);
+ postOffice.getGroupingHandler().proposed(response);
+ postOffice.getGroupingHandler().send(response, hops + 1);
}
private synchronized void clearBindings() throws Exception
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -40,6 +40,10 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.group.impl.LocalGroupingHandler;
+import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.cluster.ClusterConnection;
@@ -150,6 +154,11 @@
{
deployClusterConnection(config);
}
+
+ for (GroupingHandlerConfiguration config : configuration.getGroupingHandlerConfigurations())
+ {
+ deployGroupingHandlerConfigurations(config);
+ }
started = true;
}
@@ -486,6 +495,21 @@
bridge.start();
}
+ private synchronized void deployGroupingHandlerConfigurations(final GroupingHandlerConfiguration config) throws Exception
+ {
+ GroupingHandler groupingHandler;
+ if (config.getType() == GroupingHandlerConfiguration.TYPE.LOCAL)
+ {
+ groupingHandler = new LocalGroupingHandler(managementService, config.getName(), config.getAddress(), scheduledExecutor);
+ }
+ else
+ {
+ groupingHandler = new RemoteGroupingHandler(managementService, config.getName(), config.getAddress());
+ }
+ log.info("deploying grouping handler: " + groupingHandler);
+ postOffice.setGroupingHandler(groupingHandler);
+ }
+
private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
{
if (config.getName() == null)
Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,35 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group;
-
-import org.hornetq.utils.SimpleString;
-import org.hornetq.core.server.group.impl.Proposal;
-import org.hornetq.core.server.group.impl.Response;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public interface Arbitrator
-{
- SimpleString getName();
-
- Response propose(Proposal proposal) throws Exception;
-
- void proposed(Response response) throws Exception;
-
- void send(Response response, int distance) throws Exception;
-
- Response receive(Proposal proposal, int distance) throws Exception;
-
- Response rePropose(Proposal proposal) throws Exception;
-}
Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group;
+
+import org.hornetq.utils.SimpleString;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.Response;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public interface GroupingHandler
+{
+ SimpleString getName();
+
+ Response propose(Proposal proposal) throws Exception;
+
+ void proposed(Response response) throws Exception;
+
+ void send(Response response, int distance) throws Exception;
+
+ Response receive(Proposal proposal, int distance) throws Exception;
+
+ Response rePropose(Proposal proposal) throws Exception;
+}
Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,67 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public class ArbitratorConfiguration
-{
- private final SimpleString name;
-
- private final TYPE type;
-
- private final SimpleString address;
-
- public ArbitratorConfiguration(final SimpleString name, final TYPE type, SimpleString address)
- {
- this.type = type;
- this.name = name;
- this.address = address;
- }
-
- public SimpleString getName()
- {
- return name;
- }
-
- public TYPE getType()
- {
- return type;
- }
-
- public SimpleString getAddress()
- {
- return address;
- }
-
- public enum TYPE
- {
- LOCAL("LOCAL"),
- REMOTE("REMOTE");
-
- private String type;
-
- TYPE(String type)
- {
- this.type = type;
- }
-
- public String getType()
- {
- return type;
- }
- }
-}
Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class GroupingHandlerConfiguration
+{
+ private final SimpleString name;
+
+ private final TYPE type;
+
+ private final SimpleString address;
+
+ public GroupingHandlerConfiguration(final SimpleString name, final TYPE type, SimpleString address)
+ {
+ this.type = type;
+ this.name = name;
+ this.address = address;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ public TYPE getType()
+ {
+ return type;
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public enum TYPE
+ {
+ LOCAL("LOCAL"),
+ REMOTE("REMOTE");
+
+ private String type;
+
+ TYPE(String type)
+ {
+ this.type = type;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+ }
+}
Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,129 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.core.management.NotificationType;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.client.management.impl.ManagementHelper;
-import org.hornetq.core.postoffice.BindingType;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.group.Arbitrator;
-import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.TypedProperties;
-import org.hornetq.utils.ConcurrentHashSet;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public class LocalArbitrator implements Arbitrator
-{
- private static Logger log = Logger.getLogger(LocalArbitrator.class);
-
- private ConcurrentHashMap<SimpleString, Object> map = new ConcurrentHashMap<SimpleString, Object>();
-
- private final SimpleString name;
-
- private final ManagementService managementService;
-
- private SimpleString address;
-
- private ScheduledExecutorService scheduledExecutor;
-
- private ConcurrentHashSet<SimpleString> reProposals = new ConcurrentHashSet<SimpleString>();
-
- public LocalArbitrator(final ManagementService managementService, final SimpleString name, final SimpleString address, ScheduledExecutorService scheduledExecutor)
- {
- this.managementService = managementService;
- this.name = name;
- this.address = address;
- this.scheduledExecutor = scheduledExecutor;
- }
-
- public SimpleString getName()
- {
- return name;
- }
-
-
- public Response propose(Proposal proposal) throws Exception
- {
- if(proposal.getProposal() == null)
- {
- Object original = map.get(proposal.getProposalType());
- return original == null?null:new Response(proposal.getProposalType(), original);
- }
- Response response = new Response(proposal.getProposalType(), proposal.getProposal());
- if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
- {
- log.info("accepted proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
- return response;
- }
- else
- {
- log.info("denied proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
- return new Response(proposal.getProposalType(), proposal.getProposal(), map.get(proposal.getProposalType()));
- }
- }
-
- public void proposed(Response response) throws Exception
- {
- }
-
- public void send(Response response, int distance) throws Exception
- {
- Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
- log.info("sending proposal response for " + response.getResponseType() + " with value " + value);
- TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)response.getOriginal());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, (SimpleString)response.getAlternative());
- props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
- props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
- Notification notification = new Notification(null, NotificationType.PROPOSAL_RESPONSE, props);
- managementService.sendNotification(notification);
- }
-
- public Response receive(Proposal proposal, int distance) throws Exception
- {
- return propose(proposal);
- }
-
- public Response rePropose(final Proposal proposal) throws Exception
- {
- if(reProposals.addIfAbsent(proposal.getProposalType()))
- {
- Response response = new Response(proposal.getProposalType(), proposal.getProposal());
- map.replace(proposal.getProposalType(), response);
- send(response, 0);
- scheduledExecutor.schedule(new Runnable()
- {
- public void run()
- {
- reProposals.remove(proposal.getProposalType());
- }
- }, 2000, TimeUnit.MILLISECONDS);
- return response;
- }
- else
- {
- return new Response(proposal.getProposalType(), map.get(proposal.getProposalType()));
- }
- }
-}
-
Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.ConcurrentHashSet;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class LocalGroupingHandler implements GroupingHandler
+{
+ private static Logger log = Logger.getLogger(LocalGroupingHandler.class);
+
+ private ConcurrentHashMap<SimpleString, Object> map = new ConcurrentHashMap<SimpleString, Object>();
+
+ private final SimpleString name;
+
+ private final ManagementService managementService;
+
+ private SimpleString address;
+
+ private ScheduledExecutorService scheduledExecutor;
+
+ private ConcurrentHashSet<SimpleString> reProposals = new ConcurrentHashSet<SimpleString>();
+
+ public LocalGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address, ScheduledExecutorService scheduledExecutor)
+ {
+ this.managementService = managementService;
+ this.name = name;
+ this.address = address;
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+
+ public Response propose(Proposal proposal) throws Exception
+ {
+ if(proposal.getProposal() == null)
+ {
+ Object original = map.get(proposal.getProposalType());
+ return original == null?null:new Response(proposal.getProposalType(), original);
+ }
+ Response response = new Response(proposal.getProposalType(), proposal.getProposal());
+ if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
+ {
+ log.info("accepted proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+ return response;
+ }
+ else
+ {
+ log.info("denied proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+ return new Response(proposal.getProposalType(), proposal.getProposal(), map.get(proposal.getProposalType()));
+ }
+ }
+
+ public void proposed(Response response) throws Exception
+ {
+ }
+
+ public void send(Response response, int distance) throws Exception
+ {
+ Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
+ log.info("sending proposal response for " + response.getResponseType() + " with value " + value);
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, response.getResponseType());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)response.getOriginal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE, (SimpleString)response.getAlternative());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+ Notification notification = new Notification(null, NotificationType.PROPOSAL_RESPONSE, props);
+ managementService.sendNotification(notification);
+ }
+
+ public Response receive(Proposal proposal, int distance) throws Exception
+ {
+ return propose(proposal);
+ }
+
+ public Response rePropose(final Proposal proposal) throws Exception
+ {
+ if(reProposals.addIfAbsent(proposal.getProposalType()))
+ {
+ Response response = new Response(proposal.getProposalType(), proposal.getProposal());
+ map.replace(proposal.getProposalType(), response);
+ send(response, 0);
+ scheduledExecutor.schedule(new Runnable()
+ {
+ public void run()
+ {
+ reProposals.remove(proposal.getProposalType());
+ }
+ }, 2000, TimeUnit.MILLISECONDS);
+ return response;
+ }
+ else
+ {
+ return new Response(proposal.getProposalType(), map.get(proposal.getProposalType()));
+ }
+ }
+}
+
Deleted: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -1,136 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.group.impl;
-
-import org.hornetq.core.management.NotificationType;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.client.management.impl.ManagementHelper;
-import org.hornetq.core.postoffice.BindingType;
-import org.hornetq.core.server.group.Arbitrator;
-import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.TypedProperties;
-
-import java.util.logging.Logger;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public class RemoteArbitrator implements Arbitrator
-{
- private static Logger log = Logger.getLogger(RemoteArbitrator.class.getName());
-
- private final SimpleString name;
-
- private final ManagementService managementService;
-
- private final SimpleString address;
-
- private Map<SimpleString, Response> responses = new HashMap<SimpleString, Response>();
-
- private final Lock lock = new ReentrantLock();
-
- private final Condition sendCondition = lock.newCondition();
-
- private int waitTime = 1000;
-
- public RemoteArbitrator(final ManagementService managementService, final SimpleString name, final SimpleString address)
- {
- this.name = name;
- this.address = address;
- this.managementService = managementService;
- }
-
- public SimpleString getName()
- {
- return name;
- }
-
- public Response propose(final Proposal proposal) throws Exception
- {
- Response response = responses.get(proposal.getProposalType());
- if( response != null)
- {
- return response;
- }
- if (proposal.getProposal() == null)
- {
- return null;
- }
- try
- {
- lock.lock();
- TypedProperties props = new TypedProperties();
- log.info("sending proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
- props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
- props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
- Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
- managementService.sendNotification(notification);
- sendCondition.await(waitTime, TimeUnit.MILLISECONDS);
- }
- finally
- {
- lock.unlock();
- }
- return responses.get(proposal.getProposalType());
- }
-
- public void proposed(Response response) throws Exception
- {
- Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
- log.info("received proposal response for " + response.getResponseType() + " with value " + value);
- try
- {
- lock.lock();
- responses.put(response.getResponseType(), response);
- sendCondition.signal();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- public Response receive(Proposal proposal, int distance) throws Exception
- {
- TypedProperties props = new TypedProperties();
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
- props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
- props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
- props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
- Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
- managementService.sendNotification(notification);
- return null;
- }
-
- public void send(Response response, int distance) throws Exception
- {
- }
-
- public Response rePropose(Proposal proposal)
- {
- return null;
- }
-
-}
-
Copied: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java (from rev 7977, branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java)
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java (rev 0)
+++ branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+
+import java.util.logging.Logger;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class RemoteGroupingHandler implements GroupingHandler
+{
+ private static Logger log = Logger.getLogger(RemoteGroupingHandler.class.getName());
+
+ private final SimpleString name;
+
+ private final ManagementService managementService;
+
+ private final SimpleString address;
+
+ private Map<SimpleString, Response> responses = new HashMap<SimpleString, Response>();
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition sendCondition = lock.newCondition();
+
+ private int waitTime = 1000;
+
+ public RemoteGroupingHandler(final ManagementService managementService, final SimpleString name, final SimpleString address)
+ {
+ this.name = name;
+ this.address = address;
+ this.managementService = managementService;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ public Response propose(final Proposal proposal) throws Exception
+ {
+ Response response = responses.get(proposal.getProposalType());
+ if( response != null)
+ {
+ return response;
+ }
+ if (proposal.getProposal() == null)
+ {
+ return null;
+ }
+ try
+ {
+ lock.lock();
+ TypedProperties props = new TypedProperties();
+ log.info("sending proposal for " + proposal.getProposalType() + " with value " + proposal.getProposal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
+ Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
+ managementService.sendNotification(notification);
+ sendCondition.await(waitTime, TimeUnit.MILLISECONDS);
+ response = responses.get(proposal.getProposalType());
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ if(response == null)
+ {
+ throw new IllegalStateException("no response received from group handler for " + proposal.getProposalType());
+ }
+ return response;
+ }
+
+ public void proposed(Response response) throws Exception
+ {
+ Object value = response.getAlternative() != null ? response.getAlternative() : response.getOriginal();
+ log.info("received proposal response for " + response.getResponseType() + " with value " + value);
+ try
+ {
+ lock.lock();
+ responses.put(response.getResponseType(), response);
+ sendCondition.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public Response receive(Proposal proposal, int distance) throws Exception
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE, proposal.getProposalType());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE, (SimpleString)proposal.getProposal());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+ Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
+ managementService.sendNotification(notification);
+ return null;
+ }
+
+ public void send(Response response, int distance) throws Exception
+ {
+ }
+
+ public Response rePropose(Proposal proposal)
+ {
+ return null;
+ }
+
+}
+
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -45,10 +45,10 @@
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.group.impl.ArbitratorConfiguration;
-import org.hornetq.core.server.group.impl.LocalArbitrator;
-import org.hornetq.core.server.group.impl.RemoteArbitrator;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.group.impl.LocalGroupingHandler;
+import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.integration.transports.netty.TransportConstants;
@@ -457,18 +457,18 @@
}
- protected void setUpGroupArbitrator(ArbitratorConfiguration.TYPE type, int node)
+ protected void setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE type, int node)
{
- Arbitrator arbitrator;
- if(type == ArbitratorConfiguration.TYPE.LOCAL)
+ GroupingHandler groupingHandler;
+ if(type == GroupingHandlerConfiguration.TYPE.LOCAL)
{
- arbitrator = new LocalArbitrator(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), null);
+ groupingHandler = new LocalGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"), null);
}
else
{
- arbitrator = new RemoteArbitrator(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
+ groupingHandler = new RemoteGroupingHandler(servers[node].getManagementService(), new SimpleString("grouparbitrator"), new SimpleString("queues"));
}
- this.servers[node].getPostOffice().addArbitrator(arbitrator);
+ this.servers[node].getPostOffice().setGroupingHandler(groupingHandler);
}
protected void send(int node, String address, int numMessages, boolean durable, String filterVal) throws Exception
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -13,7 +13,7 @@
package org.hornetq.tests.integration.cluster.distribution;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.group.impl.ArbitratorConfiguration;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.utils.SimpleString;
@@ -39,9 +39,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -95,9 +95,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -154,9 +154,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -216,9 +216,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -278,9 +278,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -340,9 +340,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -400,9 +400,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -490,9 +490,9 @@
try
{
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
- setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -526,13 +526,23 @@
sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ startServers(1);
+ setupSessionFactory(1, isNetty());
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ addConsumer(1, 1, "queue0", null);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
verifyReceiveAllInRange(10, 20, 1);
- sendInRange(0, "queues.testaddress", 20, 30, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
verifyReceiveAllInRange(20, 30, 1);
System.out.println("*****************************************************************************");
}
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
finally
{
//closeAllConsumers();
Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-09-24 09:25:44 UTC (rev 7986)
+++ branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-09-24 09:58:07 UTC (rev 7987)
@@ -23,7 +23,7 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -151,11 +151,11 @@
}
- public void addArbitrator(Arbitrator arbitrator)
+ public void setGroupingHandler(GroupingHandler groupingHandler)
{
}
- public Arbitrator getArbitrator()
+ public GroupingHandler getGroupingHandler()
{
return null;
}
15 years, 3 months
JBoss hornetq SVN: r7986 - in branches/Branch_Replication_Changes: src/config/common/schema and 20 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-24 05:25:44 -0400 (Thu, 24 Sep 2009)
New Revision: 7986
Added:
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java
Removed:
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java
Modified:
branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml
branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd
branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml
branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
replication changes
Modified: branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml
===================================================================
--- branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml 2009-09-24 09:25:44 UTC (rev 7986)
@@ -325,14 +325,6 @@
<entry>true</entry>
</row>
<row>
- <entry><link linkend="queue.activation.timeout"
- >queue-activation-timeout</link></entry>
- <entry>Long</entry>
- <entry>after failover occurs, this timeout specifies how long (in ms) to
- wait for consumers to re-attach before starting delivery</entry>
- <entry>30000</entry>
- </row>
- <row>
<entry><link linkend="server.scheduled.thread.pool"
>scheduled-thread-pool-max-size</link></entry>
<entry>Integer</entry>
Modified: branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd 2009-09-24 09:25:44 UTC (rev 7986)
@@ -102,6 +102,9 @@
<xsd:element name="reconnect-attempts" type="xsd:int"
maxOccurs="1" minOccurs="0">
</xsd:element>
+ <xsd:element name="use-reattach" type="xsd:boolean"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
<xsd:element name="failover-on-server-shutdown" type="xsd:boolean"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -861,12 +861,8 @@
return true;
}
- log.info("session handling failover");
-
boolean ok = false;
- log.info("Failover occurring");
-
// Need to stop all consumers outside the lock
for (ClientConsumerInternal consumer : consumers.values())
{
@@ -882,21 +878,13 @@
consumer.clearAtFailover();
}
- log.info("stopped consumers");
-
// We lock the channel to prevent any packets being sent during the failover process
channel.lock();
- log.info("got lock");
-
try
{
- log.info("transferring connection");
-
channel.transferConnection(backupConnection);
- log.info("transferred connection");
-
remotingConnection = backupConnection;
Packet request = new CreateSessionMessage(name,
@@ -913,15 +901,10 @@
Channel channel1 = backupConnection.getChannel(1, -1, false);
- log.info("sending create session");
-
CreateSessionResponseMessage response = (CreateSessionResponseMessage)channel1.sendBlocking(request);
- log.info("got response from create session");
-
if (response.isCreated())
{
- log.info("craeted ok");
// Session was created ok
// Now we need to recreate the consumers
@@ -959,7 +942,7 @@
conn.write(buffer, false);
}
}
-
+
if ((!autoCommitAcks || !autoCommitSends) && workDone)
{
// Session is transacted - set for rollback only
@@ -990,18 +973,14 @@
}
ok = true;
-
- log.info("session created ok");
}
else
{
- log.info("not created ok");
// This means the server we failed onto is not ready to take new sessions - perhaps it hasn't actually
// failed over
}
// We cause any blocking calls to return - since they won't get responses.
- log.info("calling returnblocking");
channel.returnBlocking();
}
catch (Throwable t)
@@ -1012,7 +991,7 @@
{
channel.unlock();
}
-
+
return ok;
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -551,9 +551,9 @@
// until failover is complete
boolean serverShutdown = me.getCode() == HornetQException.DISCONNECTED;
-
+
boolean attemptFailoverOrReconnect = (backupConnectorFactory != null || reconnectAttempts != 0) && (failoverOnServerShutdown || !serverShutdown);
-
+
if (attemptFailoverOrReconnect)
{
lockAllChannel1s();
@@ -784,6 +784,8 @@
ok = false;
}
}
+
+ log.info("Reconnected ok");
}
return ok;
@@ -792,7 +794,7 @@
private RemotingConnection getConnectionWithRetry(final int initialRefCount, final int reconnectAttempts)
{
long interval = retryInterval;
-
+
int count = 0;
while (true)
@@ -1121,7 +1123,7 @@
public void run()
{
conn.fail(new HornetQException(HornetQException.DISCONNECTED,
- "The connection was exitLoop by the server"));
+ "The connection was disconnected because of server shutdown"));
}
});
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -92,7 +92,7 @@
}
URL url = getClass().getClassLoader().getResource(configurationUrl);
- log.info("Loading server configuration from " + url);
+ log.debug("Loading server configuration from " + url);
Reader reader = new InputStreamReader(url.openStream());
String xml = org.hornetq.utils.XMLUtil.readerToString(reader);
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -125,7 +125,7 @@
try
{
- log.info("Deploying " + url + " for " + deployer.getClass().getSimpleName());
+ log.debug("Deploying " + url + " for " + deployer.getClass().getSimpleName());
deployer.deploy(url);
}
catch (Exception e)
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -32,7 +32,7 @@
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
-
+
private final Bridge bridge;
private final BridgeConfiguration configuration;
@@ -41,8 +41,7 @@
// Constructors --------------------------------------------------
- public BridgeControlImpl(final Bridge bridge, final BridgeConfiguration configuration)
- throws Exception
+ public BridgeControlImpl(final Bridge bridge, final BridgeConfiguration configuration) throws Exception
{
super(BridgeControl.class);
this.bridge = bridge;
@@ -54,10 +53,10 @@
public String[] getConnectorPair() throws Exception
{
String[] pair = new String[2];
-
+
pair[0] = configuration.getConnectorPair().a;
pair[1] = configuration.getConnectorPair().b != null ? configuration.getConnectorPair().b : null;
-
+
return pair;
}
@@ -70,7 +69,7 @@
{
return configuration.getQueueName();
}
-
+
public String getDiscoveryGroupName()
{
return configuration.getDiscoveryGroupName();
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -389,15 +389,12 @@
public String[] listRemoteAddresses()
{
- log.info("listing remote addresses");
Set<RemotingConnection> connections = remotingService.getConnections();
String[] remoteAddresses = new String[connections.size()];
int i = 0;
for (RemotingConnection connection : connections)
{
- log.info("connection " + connection + " is on server");
-
remoteAddresses[i++] = connection.getRemoteAddress();
}
return remoteAddresses;
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -169,16 +169,16 @@
}
public HornetQServerControlImpl registerServer(final PostOffice postOffice,
- final StorageManager storageManager,
- final Configuration configuration,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final HierarchicalRepository<Set<Role>> securityRepository,
- final ResourceManager resourceManager,
- final RemotingService remotingService,
- final HornetQServer messagingServer,
- final QueueFactory queueFactory,
- final ScheduledExecutorService scheduledThreadPool,
- final boolean backup) throws Exception
+ final StorageManager storageManager,
+ final Configuration configuration,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final HierarchicalRepository<Set<Role>> securityRepository,
+ final ResourceManager resourceManager,
+ final RemotingService remotingService,
+ final HornetQServer messagingServer,
+ final QueueFactory queueFactory,
+ final ScheduledExecutorService scheduledThreadPool,
+ final boolean backup) throws Exception
{
this.postOffice = postOffice;
this.addressSettingsRepository = addressSettingsRepository;
@@ -191,12 +191,12 @@
messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
messagingServerControl = new HornetQServerControlImpl(postOffice,
- configuration,
- resourceManager,
- remotingService,
- messagingServer,
- messageCounterManager,
- broadcaster);
+ configuration,
+ resourceManager,
+ remotingService,
+ messagingServer,
+ messageCounterManager,
+ broadcaster);
ObjectName objectName = ObjectNames.getHornetQServerObjectName();
registerInJMX(objectName, messagingServerControl);
registerInRegistry(ResourceNames.CORE_SERVER, messagingServerControl);
@@ -298,7 +298,7 @@
public void unregisterAcceptors()
{
- List<String> acceptors = new ArrayList<String>();
+ List<String> acceptors = new ArrayList<String>();
for (String resourceName : registry.keySet())
{
if (resourceName.startsWith(ResourceNames.CORE_ACCEPTOR))
@@ -306,7 +306,7 @@
acceptors.add(resourceName);
}
}
-
+
for (String acceptor : acceptors)
{
String name = acceptor.substring(ResourceNames.CORE_ACCEPTOR.length());
@@ -320,7 +320,7 @@
}
}
}
-
+
public synchronized void unregisterAcceptor(final String name) throws Exception
{
ObjectName objectName = ObjectNames.getAcceptorObjectName(name);
@@ -478,7 +478,7 @@
}
private Set<ObjectName> registeredNames = new HashSet<ObjectName>();
-
+
public void registerInJMX(final ObjectName objectName, final Object managedResource) throws Exception
{
if (!jmxManagementEnabled)
@@ -595,14 +595,15 @@
}
if (!unexpectedResourceNames.isEmpty())
{
- log.warn("On ManagementService stop, there are " + unexpectedResourceNames.size() + " unexpected registered MBeans");
+ log.warn("On ManagementService stop, there are " + unexpectedResourceNames.size() +
+ " unexpected registered MBeans");
}
for (ObjectName on : this.registeredNames)
{
try
{
- mbeanServer.unregisterMBean(on);
+ mbeanServer.unregisterMBean(on);
}
catch (Exception ignore)
{
@@ -611,16 +612,19 @@
}
}
- messageCounterManager.stop();
+ if (messageCounterManager != null)
+ {
+ messageCounterManager.stop();
- messageCounterManager.resetAllCounters();
+ messageCounterManager.resetAllCounters();
- messageCounterManager.resetAllCounterHistories();
+ messageCounterManager.resetAllCounterHistories();
- messageCounterManager.clear();
-
+ messageCounterManager.clear();
+ }
+
registeredNames.clear();
-
+
started = false;
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -403,7 +403,7 @@
}
for (Bindable bindable : chosen)
- {
+ {
bindable.route(message, tx);
}
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -85,8 +85,6 @@
private volatile boolean started;
- // private volatile boolean backup;
-
private final ManagementService managementService;
private final Reaper reaperRunnable = new Reaper();
@@ -127,7 +125,7 @@
final ManagementService managementService,
final long reaperPeriod,
final int reaperPriority,
- final boolean enableWildCardRouting,
+ final boolean enableWildCardRouting,
final int idCacheSize,
final boolean persistIDCache,
final ExecutorFactory orderedExecutorFactory,
@@ -182,7 +180,7 @@
// This is to avoid thread leakages where the Reaper would run beyong the life cycle of the PostOffice
started = true;
- startExpiryScanner();
+ startExpiryScanner();
}
public synchronized void stop() throws Exception
@@ -344,8 +342,7 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay,
- redistributorExecutorFactory.getExecutor());
+ queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
}
}
}
@@ -415,8 +412,7 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay,
- redistributorExecutorFactory.getExecutor());
+ queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
}
}
}
@@ -870,7 +866,7 @@
log.warn("Reaper thread being restarted");
closed = false;
}
-
+
// The reaper thread should be finished case the PostOffice is gone
// This is to avoid leaks on PostOffice between stops and starts
while (PostOfficeImpl.this.isStarted())
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -63,7 +63,11 @@
Version getVersion();
HornetQServerControlImpl getHornetQServerControl();
+
+ void registerActivateCallback(ActivateCallback callback);
+ void unregisterActivateCallback(ActivateCallback callback);
+
ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
CreateSessionResponseMessage createSession(String name,
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -241,7 +241,7 @@
}
private void cancelRefs() throws Exception
- {
+ {
MessageReference ref;
LinkedList<MessageReference> list = new LinkedList<MessageReference>();
@@ -251,10 +251,17 @@
list.addFirst(ref);
}
+ Queue queue = null;
for (MessageReference ref2 : list)
{
- ref2.getQueue().cancel(ref2);
+ queue = ref2.getQueue();
+ queue.cancel(ref2);
}
+
+ if (queue != null)
+ {
+ queue.deliverAsync(executor);
+ }
}
public void stop() throws Exception
@@ -495,10 +502,116 @@
{
if (started)
{
- executor.execute(new FailRunnable());
+ //executor.execute(new FailRunnable());
+
+ try
+ {
+ cancelRefs();
+
+ //setupNotificationConsumer();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle failure", e);
+ }
}
}
+
+ private ClientConsumer notifConsumer;
+
+ // TODO - we should move this code to the ClusterConnectorImpl - and just execute it when the bridge
+ // connection is opened and closed - we can use
+ // a callback to tell us that
+ private void setupNotificationConsumer() throws Exception
+ {
+ if (flowRecord != null)
+ {
+ if (notifConsumer != null)
+ {
+ try
+ {
+ notifConsumer.close();
+
+ notifConsumer = null;
+ }
+ catch (HornetQException e)
+ {
+ log.error("Failed to close consumer", e);
+ }
+ }
+
+ // Get the queue data
+ // Create a queue to catch the notifications - the name must be deterministic on live and backup, but
+ // different each time this is called
+ // Otherwise it may already exist if server is restarted before it has been deleted on backup
+
+ String qName = "notif." + nodeUUID.toString() + "." + name.toString();
+
+ SimpleString notifQueueName = new SimpleString(qName);
+
+ SimpleString filter = new SimpleString(ManagementHelper.HDR_BINDING_TYPE + "<>" +
+ BindingType.DIVERT.toInt() +
+ " AND " +
+ ManagementHelper.HDR_NOTIFICATION_TYPE +
+ " IN ('" +
+ NotificationType.BINDING_ADDED +
+ "','" +
+ NotificationType.BINDING_REMOVED +
+ "','" +
+ NotificationType.CONSUMER_CREATED +
+ "','" +
+ NotificationType.CONSUMER_CLOSED +
+ "') AND " +
+ ManagementHelper.HDR_DISTANCE +
+ "<" +
+ flowRecord.getMaxHops() +
+ " AND (" +
+ ManagementHelper.HDR_ADDRESS +
+ " LIKE '" +
+ flowRecord.getAddress() +
+ "%')");
+
+ // The queue can't be temporary, since if the node with the bridge crashes then is restarted quickly
+ // it might get deleted on the target when it does connection cleanup
+
+ // When the backup activates the queue might already exist, so we catch this and ignore
+ try
+ {
+ session.createQueue(managementNotificationAddress, notifQueueName, filter, false);
+ }
+ catch (HornetQException me)
+ {
+ if (me.getCode() == HornetQException.QUEUE_EXISTS)
+ {
+ // Ok
+ }
+ else
+ {
+ throw me;
+ }
+ }
+
+ notifConsumer = session.createConsumer(notifQueueName);
+
+ notifConsumer.setMessageHandler(flowRecord);
+
+ session.start();
+
+ ClientMessage message = session.createClientMessage(false);
+
+ ManagementHelper.putOperationInvocation(message,
+ ResourceNames.CORE_SERVER,
+ "sendQueueInfoToQueue",
+ notifQueueName.toString(),
+ flowRecord.getAddress());
+
+ ClientProducer prod = session.createProducer(managementAddress);
+
+ prod.send(message);
+ }
+ }
+
private synchronized boolean createObjects()
{
if (!started)
@@ -507,9 +620,7 @@
}
try
- {
- queue.addConsumer(BridgeImpl.this);
-
+ {
csf = null;
if (discoveryAddress != null)
{
@@ -540,92 +651,22 @@
session.setSendAcknowledgementHandler(BridgeImpl.this);
- // TODO - we should move this code to the ClusterConnectorImpl - and just execute it when the bridge
- // connection is opened and closed - we can use
- // a callback to tell us that
- if (flowRecord != null)
- {
- // Get the queue data
-
- // Create a queue to catch the notifications - the name must be deterministic on live and backup, but
- // different each time this is called
- // Otherwise it may already exist if server is restarted before it has been deleted on backup
-
- String qName = "notif." + nodeUUID.toString() + "." + name.toString();
-
- SimpleString notifQueueName = new SimpleString(qName);
-
- SimpleString filter = new SimpleString(ManagementHelper.HDR_BINDING_TYPE + "<>" +
- BindingType.DIVERT.toInt() +
- " AND " +
- ManagementHelper.HDR_NOTIFICATION_TYPE +
- " IN ('" +
- NotificationType.BINDING_ADDED +
- "','" +
- NotificationType.BINDING_REMOVED +
- "','" +
- NotificationType.CONSUMER_CREATED +
- "','" +
- NotificationType.CONSUMER_CLOSED +
- "') AND " +
- ManagementHelper.HDR_DISTANCE +
- "<" +
- flowRecord.getMaxHops() +
- " AND (" +
- ManagementHelper.HDR_ADDRESS +
- " LIKE '" +
- flowRecord.getAddress() +
- "%')");
-
- // The queue can't be temporary, since if the node with the bridge crashes then is restarted quickly
- // it might get deleted on the target when it does connection cleanup
-
- // When the backup activates the queue might already exist, so we catch this and ignore
- try
- {
- session.createQueue(managementNotificationAddress, notifQueueName, filter, false);
- }
- catch (HornetQException me)
- {
- if (me.getCode() == HornetQException.QUEUE_EXISTS)
- {
- // Ok
- }
- else
- {
- throw me;
- }
- }
-
- ClientConsumer notifConsumer = session.createConsumer(notifQueueName);
-
- notifConsumer.setMessageHandler(flowRecord);
-
- session.start();
-
- ClientMessage message = session.createClientMessage(false);
-
- ManagementHelper.putOperationInvocation(message,
- ResourceNames.CORE_SERVER,
- "sendQueueInfoToQueue",
- notifQueueName.toString(),
- flowRecord.getAddress());
-
- ClientProducer prod = session.createProducer(managementAddress);
-
- prod.send(message);
- }
-
+ setupNotificationConsumer();
+
active = true;
+
+ queue.addConsumer(BridgeImpl.this);
queue.deliverAsync(executor);
+
+ log.info("Bridge " + name + " is now connected to destination ");
return true;
}
catch (Exception e)
{
- log.warn("Unable to connect. Bridge is now disabled.", e);
-
+ log.warn("Bridge " + name + " is unable to connect to destination. It will be disabled.");
+
return false;
}
}
@@ -667,54 +708,56 @@
}
}
- private class FailRunnable implements Runnable
- {
- public void run()
- {
- synchronized (BridgeImpl.this)
- {
- if (!started)
- {
- return;
- }
+// private class FailRunnable implements Runnable
+// {
+// public void run()
+// {
+// synchronized (BridgeImpl.this)
+// {
+//
+// if (!started)
+// {
+// return;
+// }
+//
+// if (flowRecord != null)
+// {
+// try
+// {
+// // flowRecord.reset();
+// }
+// catch (Exception e)
+// {
+// log.error("Failed to reset", e);
+// }
+// }
+//
+// active = false;
+// }
+//
+// try
+// {
+// queue.removeConsumer(BridgeImpl.this);
+//
+// session.cleanUp();
+//
+// cancelRefs();
+//
+// csf.close();
+// }
+// catch (Exception e)
+// {
+// log.error("Failed to stop", e);
+// }
+//
+// if (!createObjects())
+// {
+// started = false;
+// }
+// }
+// }
+// }
- if (flowRecord != null)
- {
- try
- {
- // flowRecord.reset();
- }
- catch (Exception e)
- {
- log.error("Failed to reset", e);
- }
- }
-
- active = false;
- }
-
- try
- {
- queue.removeConsumer(BridgeImpl.this);
-
- session.cleanUp();
-
- cancelRefs();
-
- csf.close();
- }
- catch (Exception e)
- {
- log.error("Failed to stop", e);
- }
-
- if (!createObjects())
- {
- started = false;
- }
- }
- }
-
private class CreateObjectsRunnable implements Runnable
{
public synchronized void run()
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.Distributor;
@@ -23,6 +24,8 @@
*/
public abstract class DistributorImpl implements Distributor
{
+ private static final Logger log = Logger.getLogger(DistributorImpl.class);
+
protected final List<Consumer> consumers = new ArrayList<Consumer>();
public void addConsumer(Consumer consumer)
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -75,6 +75,7 @@
import org.hornetq.core.security.Role;
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.security.impl.SecurityStoreImpl;
+import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.Divert;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.MemoryManager;
@@ -179,17 +180,19 @@
private Deployer securityDeployer;
private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
-
+
private final Object initialiseLock = new Object();
private boolean initialised;
-
+
private int managementConnectorID;
private static AtomicInteger managementConnectorSequence = new AtomicInteger(0);
-
+
private ConnectionManager replicatingConnectionManager;
+ private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -273,6 +276,8 @@
// so it can be initialised by the live node
remotingService.start();
+ started = true;
+
log.info("HornetQ Server version " + getVersion().getFullVersion() + " started");
}
@@ -334,16 +339,26 @@
managementService.stop();
- storageManager.stop();
+ if (storageManager != null)
+ {
+ storageManager.stop();
+ }
if (securityManager != null)
{
securityManager.stop();
}
-
- resourceManager.stop();
- postOffice.stop();
+ if (resourceManager != null)
+ {
+ resourceManager.stop();
+ }
+
+ if (postOffice != null)
+ {
+ postOffice.stop();
+ }
+
// Need to shutdown pools before shutting down paging manager to make sure everything is written ok
List<Runnable> tasks = scheduledPool.shutdownNow();
@@ -369,7 +384,10 @@
scheduledPool = null;
threadPool = null;
- pagingManager.stop();
+ if (pagingManager != null)
+ {
+ pagingManager.stop();
+ }
memoryManager.stop();
@@ -390,7 +408,7 @@
initialised = false;
uuid = null;
nodeID = null;
-
+
log.info("HornetQ Server version " + getVersion().getFullVersion() + " stopped");
}
@@ -487,7 +505,7 @@
}
public CreateSessionResponseMessage createSession(final String name,
- final long channelID,
+ final long channelID,
final String username,
final String password,
final int minLargeMessageSize,
@@ -498,7 +516,7 @@
final boolean preAcknowledge,
final boolean xa,
final int sendWindowSize) throws Exception
- {
+ {
if (version.getIncrementingVersion() != incrementingVersion)
{
log.warn("Client with version " + incrementingVersion +
@@ -511,11 +529,11 @@
"interoperate properly");
return null;
}
-
+
if (!checkActivate())
{
- //Backup server is not ready to accept connections
-
+ // Backup server is not ready to accept connections
+
return new CreateSessionResponseMessage(false, version.getIncrementingVersion());
}
@@ -532,7 +550,7 @@
Channel channel = connection.getChannel(channelID, sendWindowSize, false);
- final ServerSessionImpl session = new ServerSessionImpl(name,
+ final ServerSessionImpl session = new ServerSessionImpl(name,
username,
password,
minLargeMessageSize,
@@ -552,7 +570,7 @@
queueFactory,
this,
configuration.getManagementAddress());
-
+
sessions.put(name, session);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session);
@@ -602,120 +620,119 @@
}
}
-// public void initialiseBackup(final UUID theUUID, final long liveUniqueID) throws Exception
-// {
-// if (theUUID == null)
-// {
-// throw new IllegalArgumentException("node id is null");
-// }
-//
-// synchronized (initialiseLock)
-// {
-// if (initialised)
-// {
-// throw new IllegalStateException("Server is already initialised");
-// }
-//
-// this.uuid = theUUID;
-//
-// this.nodeID = new SimpleString(uuid.toString());
-//
-// initialisePart2();
-//
-// long backupID = storageManager.getCurrentUniqueID();
-//
-// if (liveUniqueID != backupID)
-// {
-// initialised = false;
-//
-// throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID +
-// ":" +
-// backupID +
-// "). You're probably trying to restart a live backup pair after a crash");
-// }
-//
-// log.info("Backup server is now operational");
-// }
-// }
-
-// private boolean setupReplicatingConnection() throws Exception
-// {
-// String backupConnectorName = configuration.getBackupConnectorName();
-//
-// if (backupConnectorName != null)
-// {
-// TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
-//
-// if (backupConnector == null)
-// {
-// log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
-// }
-// else
-// {
-// replicatingConnectionManager = new ConnectionManagerImpl(null,
-// backupConnector,
-// null,
-// false,
-// 1,
-// ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
-// ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
-// ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
-// 0,
-// 1.0d,
-// 0,
-// threadPool,
-// scheduledPool);
-//
-// replicatingConnection = replicatingConnectionManager.getConnection(1);
-//
-// if (replicatingConnection != null)
-// {
-// replicatingChannel = replicatingConnection.getChannel(2, -1, false);
-//
-// replicatingConnection.addFailureListener(new FailureListener()
-// {
-// public void connectionFailed(HornetQException me)
-// {
-// replicatingChannel.executeOutstandingDelayedResults();
-// }
-// });
-//
-// // First time we get channel we send a message down it informing the backup of our node id -
-// // backup and live must have the same node id
-//
-// Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
-//
-// final Future future = new Future();
-//
-// replicatingChannel.replicatePacket(packet, 1, new Runnable()
-// {
-// public void run()
-// {
-// future.run();
-// }
-// });
-//
-// // This may take a while especially if the journal is large
-// boolean ok = future.await(60000);
-//
-// if (!ok)
-// {
-// throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
-// }
-// }
-// else
-// {
-// log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
-//
-// return false;
-// }
-// }
-// }
-//
-// return true;
-// }
-
-
+ // public void initialiseBackup(final UUID theUUID, final long liveUniqueID) throws Exception
+ // {
+ // if (theUUID == null)
+ // {
+ // throw new IllegalArgumentException("node id is null");
+ // }
+ //
+ // synchronized (initialiseLock)
+ // {
+ // if (initialised)
+ // {
+ // throw new IllegalStateException("Server is already initialised");
+ // }
+ //
+ // this.uuid = theUUID;
+ //
+ // this.nodeID = new SimpleString(uuid.toString());
+ //
+ // initialisePart2();
+ //
+ // long backupID = storageManager.getCurrentUniqueID();
+ //
+ // if (liveUniqueID != backupID)
+ // {
+ // initialised = false;
+ //
+ // throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID +
+ // ":" +
+ // backupID +
+ // "). You're probably trying to restart a live backup pair after a crash");
+ // }
+ //
+ // log.info("Backup server is now operational");
+ // }
+ // }
+
+ // private boolean setupReplicatingConnection() throws Exception
+ // {
+ // String backupConnectorName = configuration.getBackupConnectorName();
+ //
+ // if (backupConnectorName != null)
+ // {
+ // TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
+ //
+ // if (backupConnector == null)
+ // {
+ // log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
+ // }
+ // else
+ // {
+ // replicatingConnectionManager = new ConnectionManagerImpl(null,
+ // backupConnector,
+ // null,
+ // false,
+ // 1,
+ // ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ // ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ // ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+ // 0,
+ // 1.0d,
+ // 0,
+ // threadPool,
+ // scheduledPool);
+ //
+ // replicatingConnection = replicatingConnectionManager.getConnection(1);
+ //
+ // if (replicatingConnection != null)
+ // {
+ // replicatingChannel = replicatingConnection.getChannel(2, -1, false);
+ //
+ // replicatingConnection.addFailureListener(new FailureListener()
+ // {
+ // public void connectionFailed(HornetQException me)
+ // {
+ // replicatingChannel.executeOutstandingDelayedResults();
+ // }
+ // });
+ //
+ // // First time we get channel we send a message down it informing the backup of our node id -
+ // // backup and live must have the same node id
+ //
+ // Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
+ //
+ // final Future future = new Future();
+ //
+ // replicatingChannel.replicatePacket(packet, 1, new Runnable()
+ // {
+ // public void run()
+ // {
+ // future.run();
+ // }
+ // });
+ //
+ // // This may take a while especially if the journal is large
+ // boolean ok = future.await(60000);
+ //
+ // if (!ok)
+ // {
+ // throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
+ // }
+ // }
+ // else
+ // {
+ // log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
+ //
+ // return false;
+ // }
+ // }
+ // }
+ //
+ // return true;
+ // }
+
public HornetQServerControlImpl getHornetQServerControl()
{
return messagingServerControl;
@@ -827,6 +844,16 @@
postOffice.removeBinding(queueName);
}
+ public synchronized void registerActivateCallback(final ActivateCallback callback)
+ {
+ activateCallbacks.add(callback);
+ }
+
+ public synchronized void unregisterActivateCallback(final ActivateCallback callback)
+ {
+ activateCallbacks.remove(callback);
+ }
+
public ExecutorFactory getExecutorFactory()
{
return executorFactory;
@@ -852,26 +879,37 @@
// Private
// --------------------------------------------------------------------------------------
+ private synchronized void callActivateCallbacks()
+ {
+ for (ActivateCallback callback : activateCallbacks)
+ {
+ callback.activated();
+ }
+ }
+
private synchronized boolean checkActivate() throws Exception
- {
+ {
if (configuration.isBackup())
{
- //Handle backup server activation
-
+ // Handle backup server activation
+
if (configuration.isSharedStore())
{
- //Complete the startup procedure
-
+ // Complete the startup procedure
+
+ log.info("Activating server");
+
configuration.setBackup(false);
-
- initialisePart2();
+
+ initialisePart2();
}
else
{
- //just load journal
+ // TODO
+ // just load journal
}
}
-
+
return true;
}
@@ -951,7 +989,7 @@
managementService,
configuration.getMessageExpiryScanPeriod(),
configuration.getMessageExpiryThreadPriority(),
- configuration.isWildcardRoutingEnabled(),
+ configuration.isWildcardRoutingEnabled(),
configuration.getIDCacheSize(),
configuration.isPersistIDCache(),
executorFactory,
@@ -1024,6 +1062,10 @@
queueDeployer.start();
}
+ // We need to call this here, this gives any dependent server a chance to deploy its own destinations
+ // this needs to be done before clustering is initialised
+ callActivateCallbacks();
+
// Deply any pre-defined diverts
deployDiverts();
@@ -1036,7 +1078,7 @@
scheduledPool,
managementService,
configuration,
- uuid,
+ uuid,
configuration.isBackup());
clusterManager.start();
@@ -1050,9 +1092,9 @@
pagingManager.resumeDepages();
final ServerInfo dumper = new ServerInfo(this, pagingManager);
-
+
long dumpInfoInterval = configuration.getServerDumpInterval();
-
+
if (dumpInfoInterval > 0)
{
scheduledPool.scheduleWithFixedDelay(new Runnable()
@@ -1063,10 +1105,8 @@
}
}, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
}
-
+
initialised = true;
-
- started = true;
}
private void deployQueuesFromConfiguration() throws Exception
@@ -1079,7 +1119,7 @@
config.isDurable());
}
}
-
+
private void loadJournal() throws Exception
{
List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
@@ -1294,5 +1334,5 @@
}
// Inner classes
- // --------------------------------------------------------------------------------
+ // --------------------------------------------------------------------------------
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1260,6 +1260,7 @@
promptDelivery = false;
return;
}
+
continue;
}
else
@@ -1307,7 +1308,7 @@
}
HandleStatus status = handle(reference, consumer);
-
+
if (status == HandleStatus.HANDLED)
{
if (iterator == null)
@@ -1335,6 +1336,7 @@
{
groups.remove(consumer);
}
+
continue;
}
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1377,11 +1377,19 @@
}
public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
- {
- try
+ {
+ ServerConsumer consumer = consumers.get(packet.getConsumerID());
+
+ if (consumer == null)
{
- consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+ log.error("There is no consumer with id " + packet.getConsumerID());
+ return;
}
+
+ try
+ {
+ consumer.receiveCredits(packet.getCredits());
+ }
catch (Exception e)
{
log.error("Failed to receive credits " + this.server.getConfiguration().isBackup(), e);
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -654,7 +654,17 @@
{
sessionFactory.setReconnectAttempts(reconnectAttempts);
}
+
+ public synchronized boolean isUseReattach()
+ {
+ return sessionFactory.isUseReattach();
+ }
+ public synchronized void setUseReattach(boolean reattach)
+ {
+ sessionFactory.setUseReattach(reattach);
+ }
+
public synchronized boolean isFailoverOnServerShutdown()
{
return sessionFactory.isFailoverOnServerShutdown();
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -133,7 +133,7 @@
{
return;
}
-
+
if (!contextSet)
{
context = new InitialContext();
@@ -141,6 +141,8 @@
deploymentManager = new FileDeploymentManager(server.getConfiguration().getFileDeployerScanPeriod());
+ server.registerActivateCallback(this);
+
server.start();
started = true;
Deleted: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1,178 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.server.management.impl;
-
-import java.util.List;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.management.ConnectionFactoryControl;
-
-/**
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class ConnectionFactoryControlImpl implements ConnectionFactoryControl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final HornetQConnectionFactory cf;
-
- private final List<String> bindings;
-
- private final String name;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ConnectionFactoryControlImpl(final HornetQConnectionFactory cf, final String name, final List<String> bindings) throws NotCompliantMBeanException
- {
- this.cf = cf;
- this.name = name;
- this.bindings = bindings;
- }
-
- // Public --------------------------------------------------------
-
- // ManagedConnectionFactoryMBean implementation ------------------
-
- public List<String> getBindings()
- {
- return bindings;
- }
-
- public String getClientID()
- {
- return cf.getClientID();
- }
-
- public long getClientFailureCheckPeriod()
- {
- return cf.getClientFailureCheckPeriod();
- }
-
- public long getCallTimeout()
- {
- return cf.getCallTimeout();
- }
-
- public int getConsumerMaxRate()
- {
- return cf.getConsumerMaxRate();
- }
-
- public int getConsumerWindowSize()
- {
- return cf.getConsumerWindowSize();
- }
-
- public int getProducerMaxRate()
- {
- return cf.getProducerMaxRate();
- }
-
- public int getProducerWindowSize()
- {
- return cf.getProducerWindowSize();
- }
-
- public int getDupsOKBatchSize()
- {
- return cf.getDupsOKBatchSize();
- }
-
- public boolean isBlockOnAcknowledge()
- {
- return cf.isBlockOnAcknowledge();
- }
-
- public boolean isBlockOnNonPersistentSend()
- {
- return cf.isBlockOnNonPersistentSend();
- }
-
- public boolean isBlockOnPersistentSend()
- {
- return cf.isBlockOnPersistentSend();
- }
-
- public boolean isPreAcknowledge()
- {
- return cf.isPreAcknowledge();
- }
-
- public String getName()
- {
- return name;
- }
-
- public long getConnectionTTL()
- {
- return cf.getConnectionTTL();
- }
-
- public int getMaxConnections()
- {
- return cf.getMaxConnections();
- }
-
- public int getReconnectAttempts()
- {
- return cf.getReconnectAttempts();
- }
-
- public boolean isFailoverOnNodeShutdown()
- {
- return cf.isFailoverOnServerShutdown();
- }
-
- public long getMinLargeMessageSize()
- {
- return cf.getMinLargeMessageSize();
- }
-
- public long getRetryInterval()
- {
- return cf.getRetryInterval();
- }
-
- public double getRetryIntervalMultiplier()
- {
- return cf.getRetryIntervalMultiplier();
- }
-
- public long getTransactionBatchSize()
- {
- return cf.getTransactionBatchSize();
- }
-
- public boolean isAutoGroup()
- {
- return cf.isAutoGroup();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Copied: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java (from rev 7946, branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java)
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java (rev 0)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.management.impl;
+
+import java.util.List;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.StandardMBean;
+
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.management.ConnectionFactoryControl;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class JMSConnectionFactoryControlImpl extends StandardMBean implements ConnectionFactoryControl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final HornetQConnectionFactory cf;
+
+ private final List<String> bindings;
+
+ private final String name;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public JMSConnectionFactoryControlImpl(final HornetQConnectionFactory cf, final String name, final List<String> bindings) throws NotCompliantMBeanException
+ {
+ super(ConnectionFactoryControl.class);
+ this.cf = cf;
+ this.name = name;
+ this.bindings = bindings;
+ }
+
+ // Public --------------------------------------------------------
+
+ // ManagedConnectionFactoryMBean implementation ------------------
+
+ public List<String> getBindings()
+ {
+ return bindings;
+ }
+
+ public String getClientID()
+ {
+ return cf.getClientID();
+ }
+
+ public long getClientFailureCheckPeriod()
+ {
+ return cf.getClientFailureCheckPeriod();
+ }
+
+ public long getCallTimeout()
+ {
+ return cf.getCallTimeout();
+ }
+
+ public int getConsumerMaxRate()
+ {
+ return cf.getConsumerMaxRate();
+ }
+
+ public int getConsumerWindowSize()
+ {
+ return cf.getConsumerWindowSize();
+ }
+
+ public int getProducerMaxRate()
+ {
+ return cf.getProducerMaxRate();
+ }
+
+ public int getProducerWindowSize()
+ {
+ return cf.getProducerWindowSize();
+ }
+
+ public int getDupsOKBatchSize()
+ {
+ return cf.getDupsOKBatchSize();
+ }
+
+ public boolean isBlockOnAcknowledge()
+ {
+ return cf.isBlockOnAcknowledge();
+ }
+
+ public boolean isBlockOnNonPersistentSend()
+ {
+ return cf.isBlockOnNonPersistentSend();
+ }
+
+ public boolean isBlockOnPersistentSend()
+ {
+ return cf.isBlockOnPersistentSend();
+ }
+
+ public boolean isPreAcknowledge()
+ {
+ return cf.isPreAcknowledge();
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public long getConnectionTTL()
+ {
+ return cf.getConnectionTTL();
+ }
+
+ public int getMaxConnections()
+ {
+ return cf.getMaxConnections();
+ }
+
+ public int getReconnectAttempts()
+ {
+ return cf.getReconnectAttempts();
+ }
+
+ public boolean isFailoverOnNodeShutdown()
+ {
+ return cf.isFailoverOnServerShutdown();
+ }
+
+ public long getMinLargeMessageSize()
+ {
+ return cf.getMinLargeMessageSize();
+ }
+
+ public long getRetryInterval()
+ {
+ return cf.getRetryInterval();
+ }
+
+ public double getRetryIntervalMultiplier()
+ {
+ return cf.getRetryIntervalMultiplier();
+ }
+
+ public long getTransactionBatchSize()
+ {
+ return cf.getTransactionBatchSize();
+ }
+
+ public boolean isAutoGroup()
+ {
+ return cf.isAutoGroup();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -106,7 +106,7 @@
{
ObjectName objectName = ObjectNames.getJMSTopicObjectName(topic.getTopicName());
AddressControl addressControl = (AddressControl)managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
- TopicControlImpl control = new TopicControlImpl(topic, addressControl, jndiBinding, managementService);
+ JMSTopicControlImpl control = new JMSTopicControlImpl(topic, addressControl, jndiBinding, managementService);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_TOPIC + topic.getTopicName(), control);
}
@@ -123,7 +123,7 @@
final List<String> bindings) throws Exception
{
ObjectName objectName = ObjectNames.getConnectionFactoryObjectName(name);
- ConnectionFactoryControlImpl control = new ConnectionFactoryControlImpl(connectionFactory, name, bindings);
+ JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(connectionFactory, name, bindings);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_CONNECTION_FACTORY + name, control);
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -15,6 +15,8 @@
import java.util.Map;
+import javax.management.StandardMBean;
+
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.MessageCounterInfo;
@@ -34,7 +36,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public class JMSQueueControlImpl implements JMSQueueControl
+public class JMSQueueControlImpl extends StandardMBean implements JMSQueueControl
{
// Constants -----------------------------------------------------
@@ -57,7 +59,8 @@
*/
public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException
{
- return (selectorStr == null || selectorStr.trim().length() == 0) ? null : SelectorTranslator.convertToHornetQFilterString(selectorStr);
+ return (selectorStr == null || selectorStr.trim().length() == 0) ? null
+ : SelectorTranslator.convertToHornetQFilterString(selectorStr);
}
private static String createFilterForJMSMessageID(String jmsMessageID) throws Exception
@@ -71,7 +74,7 @@
for (int i = 0; i < messages.length; i++)
{
Map<String, Object> message = messages[i];
- array.put(new JSONObject(message));
+ array.put(new JSONObject(message));
}
return array.toString();
}
@@ -79,10 +82,11 @@
// Constructors --------------------------------------------------
public JMSQueueControlImpl(final HornetQQueue managedQueue,
- final QueueControl coreQueueControl,
- final String jndiBinding,
- final MessageCounter counter)
+ final QueueControl coreQueueControl,
+ final String jndiBinding,
+ final MessageCounter counter) throws Exception
{
+ super(JMSQueueControl.class);
this.managedQueue = managedQueue;
this.coreQueueControl = coreQueueControl;
this.binding = jndiBinding;
@@ -187,10 +191,10 @@
String filter = createFilterFromJMSSelector(filterStr);
Map<String, Object>[] coreMessages = coreQueueControl.listMessages(filter);
- Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
-
+ Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
+
int i = 0;
-
+
for (Map<String, Object> coreMessage : coreMessages)
{
Map<String, Object> jmsMessage = HornetQMessage.coreMaptoJMSMap(coreMessage);
@@ -203,7 +207,7 @@
throw new IllegalStateException(e.getMessage());
}
}
-
+
public String listMessagesAsJSON(String filter) throws Exception
{
return toJSON(listMessages(filter));
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -25,6 +25,7 @@
import javax.management.NotificationEmitter;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
+import javax.management.StandardMBean;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.config.TransportConfiguration;
@@ -39,7 +40,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public class JMSServerControlImpl implements JMSServerControl, NotificationEmitter
+public class JMSServerControlImpl extends StandardMBean implements JMSServerControl, NotificationEmitter
{
// Constants -----------------------------------------------------
@@ -132,8 +133,9 @@
// Constructors --------------------------------------------------
- public JMSServerControlImpl(final JMSServerManager server)
+ public JMSServerControlImpl(final JMSServerManager server) throws Exception
{
+ super(JMSServerControl.class);
this.server = server;
broadcaster = new NotificationBroadcasterSupport();
}
Copied: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java (from rev 7946, branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java)
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java (rev 0)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -0,0 +1,354 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.management.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.StandardMBean;
+
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.management.AddressControl;
+import org.hornetq.core.management.HornetQServerControl;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.management.QueueControl;
+import org.hornetq.core.management.ResourceNames;
+import org.hornetq.jms.HornetQTopic;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.jms.client.SelectorTranslator;
+import org.hornetq.jms.server.management.TopicControl;
+import org.hornetq.utils.Pair;
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class JMSTopicControlImpl extends StandardMBean implements TopicControl
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(JMSTopicControlImpl.class);
+
+ // Attributes ----------------------------------------------------
+
+ private final HornetQTopic managedTopic;
+
+ private final String binding;
+
+ private AddressControl addressControl;
+
+ private ManagementService managementService;
+
+ // Static --------------------------------------------------------
+
+ public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException
+ {
+ return (selectorStr == null || selectorStr.trim().length() == 0) ? null
+ : SelectorTranslator.convertToHornetQFilterString(selectorStr);
+ }
+
+ // Constructors --------------------------------------------------
+
+ public JMSTopicControlImpl(final HornetQTopic topic,
+ final AddressControl addressControl,
+ final String jndiBinding,
+ final ManagementService managementService) throws Exception
+ {
+ super(TopicControl.class);
+ this.managedTopic = topic;
+ this.addressControl = addressControl;
+ this.binding = jndiBinding;
+ this.managementService = managementService;
+ }
+
+ // TopicControlMBean implementation ------------------------------
+
+ public String getName()
+ {
+ return managedTopic.getName();
+ }
+
+ public boolean isTemporary()
+ {
+ return managedTopic.isTemporary();
+ }
+
+ public String getAddress()
+ {
+ return managedTopic.getAddress();
+ }
+
+ public String getJNDIBinding()
+ {
+ return binding;
+ }
+
+ public int getMessageCount()
+ {
+ return getMessageCount(DurabilityType.ALL);
+ }
+
+ public int getDurableMessageCount()
+ {
+ return getMessageCount(DurabilityType.DURABLE);
+ }
+
+ public int getNonDurableMessageCount()
+ {
+ return getMessageCount(DurabilityType.NON_DURABLE);
+ }
+
+ public int getSubscriptionCount()
+ {
+ return getQueues(DurabilityType.ALL).size();
+ }
+
+ public int getDurableSubscriptionCount()
+ {
+ return getQueues(DurabilityType.DURABLE).size();
+ }
+
+ public int getNonDurableSubscriptionCount()
+ {
+ return getQueues(DurabilityType.NON_DURABLE).size();
+ }
+
+ public Object[] listAllSubscriptions()
+ {
+ return listSubscribersInfos(DurabilityType.ALL);
+ }
+
+ public String listAllSubscriptionsAsJSON() throws Exception
+ {
+ return listSubscribersInfosAsJSON(DurabilityType.ALL);
+ }
+
+ public Object[] listDurableSubscriptions()
+ {
+ return listSubscribersInfos(DurabilityType.DURABLE);
+ }
+
+ public String listDurableSubscriptionsAsJSON() throws Exception
+ {
+ return listSubscribersInfosAsJSON(DurabilityType.DURABLE);
+ }
+
+ public Object[] listNonDurableSubscriptions()
+ {
+ return listSubscribersInfos(DurabilityType.NON_DURABLE);
+ }
+
+ public String listNonDurableSubscriptionsAsJSON() throws Exception
+ {
+ return listSubscribersInfosAsJSON(DurabilityType.NON_DURABLE);
+ }
+
+ public Map<String, Object>[] listMessagesForSubscription(final String queueName) throws Exception
+ {
+ QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+ if (coreQueueControl == null)
+ {
+ throw new IllegalArgumentException("No subscriptions with name " + queueName);
+ }
+
+ Map<String, Object>[] coreMessages = coreQueueControl.listMessages(null);
+
+ Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
+
+ int i = 0;
+
+ for (Map<String, Object> coreMessage : coreMessages)
+ {
+ jmsMessages[i++] = HornetQMessage.coreMaptoJMSMap(coreMessage);
+ }
+ return jmsMessages;
+ }
+
+ public String listMessagesForSubscriptionAsJSON(String queueName) throws Exception
+ {
+ return JMSQueueControlImpl.toJSON(listMessagesForSubscription(queueName));
+ }
+
+ public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
+ {
+ String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
+ QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+ if (coreQueueControl == null)
+ {
+ throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
+ }
+ String filter = createFilterFromJMSSelector(filterStr);
+ return coreQueueControl.listMessages(filter).length;
+ }
+
+ public int removeMessages(String filterStr) throws Exception
+ {
+ String filter = createFilterFromJMSSelector(filterStr);
+ int count = 0;
+ String[] queues = addressControl.getQueueNames();
+ for (String queue : queues)
+ {
+ QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
+ count += coreQueueControl.removeMessages(filter);
+ }
+
+ return count;
+ }
+
+ public void dropDurableSubscription(String clientID, String subscriptionName) throws Exception
+ {
+ String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
+ QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+ if (coreQueueControl == null)
+ {
+ throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
+ }
+ HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
+ serverControl.destroyQueue(queueName);
+ }
+
+ public void dropAllSubscriptions() throws Exception
+ {
+ HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
+ String[] queues = addressControl.getQueueNames();
+ for (String queue : queues)
+ {
+ serverControl.destroyQueue(queue);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private Object[] listSubscribersInfos(final DurabilityType durability)
+ {
+ List<QueueControl> queues = getQueues(durability);
+ List<Object[]> subInfos = new ArrayList<Object[]>(queues.size());
+
+ for (QueueControl queue : queues)
+ {
+ String clientID = null;
+ String subName = null;
+
+ if (queue.isDurable())
+ {
+ Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName()
+ .toString());
+ clientID = pair.a;
+ subName = pair.b;
+ }
+
+ String filter = queue.getFilter() != null ? queue.getFilter() : null;
+
+ Object[] subscriptionInfo = new Object[6];
+ subscriptionInfo[0] = queue.getName();
+ subscriptionInfo[1] = clientID;
+ subscriptionInfo[2] = subName;
+ subscriptionInfo[3] = queue.isDurable();
+ subscriptionInfo[4] = queue.getMessageCount();
+
+ subInfos.add(subscriptionInfo);
+ }
+ return subInfos.toArray(new Object[subInfos.size()]);
+ }
+
+ private String listSubscribersInfosAsJSON(final DurabilityType durability) throws Exception
+ {
+ List<QueueControl> queues = getQueues(durability);
+ JSONArray array = new JSONArray();
+
+ for (QueueControl queue : queues)
+ {
+ String clientID = null;
+ String subName = null;
+
+ if (queue.isDurable())
+ {
+ Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName()
+ .toString());
+ clientID = pair.a;
+ subName = pair.b;
+ }
+
+ String filter = queue.getFilter() != null ? queue.getFilter() : null;
+
+ JSONObject info = new JSONObject();
+ info.put("queueName", queue.getName());
+ info.put("clientID", clientID);
+ info.put("selector", filter);
+ info.put("name", subName);
+ info.put("durable", queue.isDurable());
+ info.put("messageCount", queue.getMessageCount());
+ array.put(info);
+ }
+
+ return array.toString();
+ }
+
+ private int getMessageCount(final DurabilityType durability)
+ {
+ List<QueueControl> queues = getQueues(durability);
+ int count = 0;
+ for (QueueControl queue : queues)
+ {
+ count += queue.getMessageCount();
+ }
+ return count;
+ }
+
+ private List<QueueControl> getQueues(final DurabilityType durability)
+ {
+ try
+ {
+ List<QueueControl> matchingQueues = new ArrayList<QueueControl>();
+ String[] queues = addressControl.getQueueNames();
+ for (String queue : queues)
+ {
+ QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
+
+ // Ignore the "special" subscription
+ if (!coreQueueControl.getName().equals(addressControl.getAddress()))
+ {
+ if (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && coreQueueControl.isDurable()) ||
+ (durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()))
+ {
+ matchingQueues.add(coreQueueControl);
+ }
+ }
+ }
+ return matchingQueues;
+ }
+ catch (Exception e)
+ {
+ return Collections.emptyList();
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+ private enum DurabilityType
+ {
+ ALL, DURABLE, NON_DURABLE
+ }
+}
Deleted: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1,348 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.server.management.impl;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.AddressControl;
-import org.hornetq.core.management.HornetQServerControl;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.QueueControl;
-import org.hornetq.core.management.ResourceNames;
-import org.hornetq.jms.HornetQTopic;
-import org.hornetq.jms.client.HornetQMessage;
-import org.hornetq.jms.client.SelectorTranslator;
-import org.hornetq.jms.server.management.TopicControl;
-import org.hornetq.utils.Pair;
-import org.hornetq.utils.json.JSONArray;
-import org.hornetq.utils.json.JSONObject;
-
-/**
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class TopicControlImpl implements TopicControl
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(TopicControlImpl.class);
-
- // Attributes ----------------------------------------------------
-
- private final HornetQTopic managedTopic;
-
- private final String binding;
-
- private AddressControl addressControl;
-
- private ManagementService managementService;
-
- // Static --------------------------------------------------------
-
- public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException
- {
- return (selectorStr == null || selectorStr.trim().length() == 0) ? null : SelectorTranslator.convertToHornetQFilterString(selectorStr);
- }
-
- // Constructors --------------------------------------------------
-
- public TopicControlImpl(final HornetQTopic topic,
- final AddressControl addressControl,
- final String jndiBinding,
- final ManagementService managementService)
- {
- this.managedTopic = topic;
- this.addressControl = addressControl;
- this.binding = jndiBinding;
- this.managementService = managementService;
- }
-
- // TopicControlMBean implementation ------------------------------
-
- public String getName()
- {
- return managedTopic.getName();
- }
-
- public boolean isTemporary()
- {
- return managedTopic.isTemporary();
- }
-
- public String getAddress()
- {
- return managedTopic.getAddress();
- }
-
- public String getJNDIBinding()
- {
- return binding;
- }
-
- public int getMessageCount()
- {
- return getMessageCount(DurabilityType.ALL);
- }
-
- public int getDurableMessageCount()
- {
- return getMessageCount(DurabilityType.DURABLE);
- }
-
- public int getNonDurableMessageCount()
- {
- return getMessageCount(DurabilityType.NON_DURABLE);
- }
-
- public int getSubscriptionCount()
- {
- return getQueues(DurabilityType.ALL).size();
- }
-
- public int getDurableSubscriptionCount()
- {
- return getQueues(DurabilityType.DURABLE).size();
- }
-
- public int getNonDurableSubscriptionCount()
- {
- return getQueues(DurabilityType.NON_DURABLE).size();
- }
-
- public Object[] listAllSubscriptions()
- {
- return listSubscribersInfos(DurabilityType.ALL);
- }
-
- public String listAllSubscriptionsAsJSON() throws Exception
- {
- return listSubscribersInfosAsJSON(DurabilityType.ALL);
- }
-
- public Object[] listDurableSubscriptions()
- {
- return listSubscribersInfos(DurabilityType.DURABLE);
- }
-
- public String listDurableSubscriptionsAsJSON() throws Exception
- {
- return listSubscribersInfosAsJSON(DurabilityType.DURABLE);
- }
-
- public Object[] listNonDurableSubscriptions()
- {
- return listSubscribersInfos(DurabilityType.NON_DURABLE);
- }
-
- public String listNonDurableSubscriptionsAsJSON() throws Exception
- {
- return listSubscribersInfosAsJSON(DurabilityType.NON_DURABLE);
- }
-
- public Map<String, Object>[] listMessagesForSubscription(final String queueName) throws Exception
- {
- QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
- if (coreQueueControl == null)
- {
- throw new IllegalArgumentException("No subscriptions with name " + queueName);
- }
-
- Map<String, Object>[] coreMessages = coreQueueControl.listMessages(null);
-
- Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
-
- int i = 0;
-
- for (Map<String, Object> coreMessage : coreMessages)
- {
- jmsMessages[i++] = HornetQMessage.coreMaptoJMSMap(coreMessage);
- }
- return jmsMessages;
- }
-
- public String listMessagesForSubscriptionAsJSON(String queueName) throws Exception
- {
- return JMSQueueControlImpl.toJSON(listMessagesForSubscription(queueName));
- }
-
- public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
- {
- String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
- QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
- if (coreQueueControl == null)
- {
- throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
- }
- String filter = createFilterFromJMSSelector(filterStr);
- return coreQueueControl.listMessages(filter).length;
- }
-
- public int removeMessages(String filterStr) throws Exception
- {
- String filter = createFilterFromJMSSelector(filterStr);
- int count = 0;
- String[] queues = addressControl.getQueueNames();
- for (String queue : queues)
- {
- QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
- count += coreQueueControl.removeMessages(filter);
- }
-
- return count;
- }
-
- public void dropDurableSubscription(String clientID, String subscriptionName) throws Exception
- {
- String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
- QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
- if (coreQueueControl == null)
- {
- throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
- }
- HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
- serverControl.destroyQueue(queueName);
- }
-
- public void dropAllSubscriptions() throws Exception
- {
- HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
- String[] queues = addressControl.getQueueNames();
- for (String queue : queues)
- {
- serverControl.destroyQueue(queue);
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private Object[] listSubscribersInfos(final DurabilityType durability)
- {
- List<QueueControl> queues = getQueues(durability);
- List<Object[]> subInfos = new ArrayList<Object[]>(queues.size());
-
- for (QueueControl queue : queues)
- {
- String clientID = null;
- String subName = null;
-
- if (queue.isDurable())
- {
- Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName().toString());
- clientID = pair.a;
- subName = pair.b;
- }
-
- String filter = queue.getFilter() != null ? queue.getFilter() : null;
-
- Object[] subscriptionInfo = new Object[6];
- subscriptionInfo[0] = queue.getName();
- subscriptionInfo[1] = clientID;
- subscriptionInfo[2] = subName;
- subscriptionInfo[3] = queue.isDurable();
- subscriptionInfo[4] = queue.getMessageCount();
-
- subInfos.add(subscriptionInfo);
- }
- return subInfos.toArray(new Object[subInfos.size()]);
- }
-
- private String listSubscribersInfosAsJSON(final DurabilityType durability) throws Exception
- {
- List<QueueControl> queues = getQueues(durability);
- JSONArray array = new JSONArray();
-
- for (QueueControl queue : queues)
- {
- String clientID = null;
- String subName = null;
-
- if (queue.isDurable())
- {
- Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName().toString());
- clientID = pair.a;
- subName = pair.b;
- }
-
- String filter = queue.getFilter() != null ? queue.getFilter() : null;
-
- JSONObject info = new JSONObject();
- info.put("queueName", queue.getName());
- info.put("clientID", clientID);
- info.put("selector", filter);
- info.put("name", subName);
- info.put("durable", queue.isDurable());
- info.put("messageCount", queue.getMessageCount());
- array.put(info);
- }
-
- return array.toString();
- }
-
- private int getMessageCount(final DurabilityType durability)
- {
- List<QueueControl> queues = getQueues(durability);
- int count = 0;
- for (QueueControl queue : queues)
- {
- count += queue.getMessageCount();
- }
- return count;
- }
-
- private List<QueueControl> getQueues(final DurabilityType durability)
- {
- try
- {
- List<QueueControl> matchingQueues = new ArrayList<QueueControl>();
- String[] queues = addressControl.getQueueNames();
- for (String queue : queues)
- {
- QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
-
- // Ignore the "special" subscription
- if (!coreQueueControl.getName().equals(addressControl.getAddress()))
- {
- if (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && coreQueueControl.isDurable()) ||
- (durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()))
- {
- matchingQueues.add(coreQueueControl);
- }
- }
- }
- return matchingQueues;
- }
- catch (Exception e)
- {
- return Collections.emptyList();
- }
- }
-
- // Inner classes -------------------------------------------------
-
- private enum DurabilityType
- {
- ALL, DURABLE, NON_DURABLE
- }
-}
Modified: branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml 2009-09-24 09:25:44 UTC (rev 7986)
@@ -25,8 +25,7 @@
<message-expiry-scan-period>10111213</message-expiry-scan-period>
<message-expiry-thread-priority>8</message-expiry-thread-priority>
<id-cache-size>127</id-cache-size>
- <persist-id-cache>true</persist-id-cache>
- <queue-activation-timeout>12456</queue-activation-timeout>
+ <persist-id-cache>true</persist-id-cache>
<backup>true</backup>
<shared-store>true</shared-store>
<persist-delivery-count-before-delivery>true</persist-delivery-count-before-delivery>
Modified: branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -120,7 +120,9 @@
protected void tearDown() throws Exception
{
super.tearDown();
+
getJmsServerManager().destroyConnectionFactory("testsuitecf");
+
cf = null;
assertRemainingMessages(0);
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -31,6 +31,7 @@
import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
@@ -49,6 +50,8 @@
*/
public class SessionFactoryTest extends ServiceTestBase
{
+ private static final Logger log = Logger.getLogger(SessionFactoryTest.class);
+
private final String groupAddress = "230.1.2.3";
private final int groupPort = 8765;
@@ -62,14 +65,14 @@
private TransportConfiguration backupTC;
protected void tearDown() throws Exception
- {
+ {
if (liveService != null && liveService.isStarted())
- {
+ {
liveService.stop();
- }
+ }
if (backupService != null && backupService.isStarted())
- {
- liveService.stop();
+ {
+ backupService.stop();
}
liveService = null;
backupService = null;
@@ -106,7 +109,7 @@
{
try
{
- startLiveAndBackup();
+ startLiveAndBackup();
ClientSessionFactory cf = new ClientSessionFactoryImpl();
assertFactoryParams(cf,
null,
@@ -136,7 +139,7 @@
ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL,
ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS,
- ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
+ ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
try
{
ClientSession session = cf.createSession(false, true, true);
@@ -144,8 +147,9 @@
}
catch (HornetQException e)
{
+ e.printStackTrace();
// Ok
- }
+ }
final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
Pair<TransportConfiguration, TransportConfiguration> pair0 = new Pair<TransportConfiguration, TransportConfiguration>(this.liveTC,
this.backupTC);
@@ -854,10 +858,12 @@
{
if (liveService.isStarted())
{
+ log.info("stopping live");
liveService.stop();
}
if (backupService.isStarted())
{
+ log.info("stopping backup");
backupService.stop();
}
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -50,8 +50,7 @@
{
private static final Logger log = Logger.getLogger(BridgeReconnectTest.class);
-
- //Fail bridge and reconnecting immediately
+ // Fail bridge and reconnecting immediately
public void testFailoverAndReconnectImmediately() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
@@ -94,16 +93,16 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = 1;
-
+
Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), server2tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
- null,
null,
+ null,
retryInterval,
- retryIntervalMultiplier,
+ retryIntervalMultiplier,
reconnectAttempts,
true,
false,
@@ -151,7 +150,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = session0.createClientMessage(false);
+ ClientMessage message = session0.createClientMessage(true);
message.putIntProperty(propKey, i);
prod0.send(message);
@@ -175,7 +174,7 @@
assertEquals(0, server1.getRemotingService().getConnections().size());
assertEquals(0, service2.getRemotingService().getConnections().size());
}
-
+
// Fail bridge and attempt failover a few times before succeeding
public void testFailoverAndReconnectAfterAFewTries() throws Exception
{
@@ -225,8 +224,8 @@
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
- null,
null,
+ null,
retryInterval,
retryIntervalMultiplier,
reconnectAttempts,
@@ -272,7 +271,7 @@
InVMConnector.numberOfFailures = reconnectAttempts - 1;
forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
- forwardingConnection = getForwardingConnection(bridge);
+ forwardingConnection = getForwardingConnection(bridge);
forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
final int numMessages = 10;
@@ -305,7 +304,7 @@
assertEquals(0, server1.getRemotingService().getConnections().size());
assertEquals(0, service2.getRemotingService().getConnections().size());
}
-
+
// Fail bridge and reconnect same node, no backup specified
public void testReconnectSameNode() throws Exception
{
@@ -344,8 +343,8 @@
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
- null,
null,
+ null,
retryInterval,
retryIntervalMultiplier,
reconnectAttempts,
@@ -389,7 +388,7 @@
InVMConnector.numberOfFailures = reconnectAttempts - 1;
forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
- forwardingConnection = getForwardingConnection(bridge);
+ forwardingConnection = getForwardingConnection(bridge);
forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
final int numMessages = 10;
@@ -420,7 +419,7 @@
assertEquals(0, server0.getRemotingService().getConnections().size());
assertEquals(0, server1.getRemotingService().getConnections().size());
}
-
+
public void testShutdownServerCleanlyAndReconnectSameNode() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
@@ -458,8 +457,8 @@
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
- null,
null,
+ null,
retryInterval,
retryIntervalMultiplier,
reconnectAttempts,
@@ -491,7 +490,7 @@
server1.stop();
server1.start();
-
+
ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
ClientSession session1 = csf1.createSession(false, true, true);
@@ -527,7 +526,7 @@
assertEquals(0, server0.getRemotingService().getConnections().size());
assertEquals(0, server1.getRemotingService().getConnections().size());
}
-
+
public void testFailoverThenFailAgainAndReconnect() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
@@ -564,8 +563,8 @@
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
- null,
null,
+ null,
retryInterval,
retryIntervalMultiplier,
reconnectAttempts,
@@ -626,13 +625,13 @@
assertNotNull(r1);
assertEquals(i, r1.getProperty(propKey));
}
-
- //Fail again - should reconnect
+
+ // Fail again - should reconnect
forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
InVMConnector.failOnCreateConnection = true;
InVMConnector.numberOfFailures = reconnectAttempts - 1;
forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session0.createClientMessage(false);
@@ -657,24 +656,24 @@
assertEquals(0, server0.getRemotingService().getConnections().size());
assertEquals(0, server1.getRemotingService().getConnections().size());
}
-
+
private RemotingConnection getForwardingConnection(final Bridge bridge) throws Exception
{
long start = System.currentTimeMillis();
-
+
do
{
RemotingConnection forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
-
+
if (forwardingConnection != null)
{
return forwardingConnection;
}
-
+
Thread.sleep(10);
}
while (System.currentTimeMillis() - start < 50000);
-
+
throw new IllegalStateException("Failed to get forwarding connection");
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -224,11 +224,11 @@
forwardAddress,
null,
null,
- 1000,
+ 500,
1d,
-1,
- false,
true,
+ true,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -322,7 +322,11 @@
sf1.close();
+ log.info("stopping server 1");
+
server1.stop();
+
+ log.info("stopped server 1");
for (int i = 0; i < numMessages; i++)
{
@@ -332,8 +336,12 @@
producer0.send(message);
}
+
+ log.info("sent some more messages");
server1.start();
+
+ log.info("started server1");
sf1 = new ClientSessionFactoryImpl(server1tc);
@@ -342,6 +350,8 @@
consumer1 = session1.createConsumer(queueName1);
session1.start();
+
+ log.info("started session");
for (int i = 0; i < numMessages; i++)
{
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -13,7 +13,6 @@
package org.hornetq.tests.integration.cluster.bridge;
-import java.util.HashMap;
import java.util.Map;
import org.hornetq.core.config.Configuration;
@@ -35,29 +34,6 @@
*/
public abstract class BridgeTestBase extends UnitTestCase
{
- protected HornetQServer createHornetQServerNIO(final int id, final Map<String, Object> params)
- {
- return createHornetQServerNIO(id, params, false);
- }
-
- protected HornetQServer createHornetQServerNIO(final int id,
- final Map<String, Object> params,
- final boolean backup)
- {
- Configuration serviceConf = new ConfigurationImpl();
- serviceConf.setClustered(true);
- serviceConf.setSecurityEnabled(false);
- serviceConf.setBackup(backup);
- serviceConf.setJournalMinFiles(2);
- serviceConf.setJournalFileSize(100 * 1024);
- params.put(TransportConstants.SERVER_ID_PROP_NAME, id);
- serviceConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
- params));
- HornetQServer service = HornetQ.newHornetQServer(serviceConf);
- return service;
- }
-
protected HornetQServer createHornetQServer(final int id, final Map<String, Object> params)
{
return createHornetQServer(id, params, false);
@@ -69,16 +45,18 @@
serviceConf.setClustered(true);
serviceConf.setSecurityEnabled(false);
serviceConf.setBackup(backup);
+ serviceConf.setSharedStore(true);
+ serviceConf.setBindingsDirectory(getBindingsDir(id, false));
+ serviceConf.setJournalMinFiles(2);
+ serviceConf.setJournalDirectory(getJournalDir(id, false));
+ serviceConf.setPagingDirectory(getPageDir(id, false));
+ serviceConf.setLargeMessagesDirectory(getLargeMessagesDir(id, false));
+
params.put(TransportConstants.SERVER_ID_PROP_NAME, id);
serviceConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
- params));
- HornetQServer service = HornetQ.newHornetQServer(serviceConf, false);
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", params));
+ HornetQServer service = HornetQ.newHornetQServer(serviceConf, true);
return service;
}
- protected HornetQServer createHornetQServer(final int id)
- {
- return this.createHornetQServer(id, new HashMap<String, Object>());
- }
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1421,8 +1421,13 @@
closeSessionFactory(3);
stopServers(0, 3);
+ log.info("stopped servers");
startServers(3, 0);
+
+ log.info("restarted servers");
+
+ Thread.sleep(2000);
setupSessionFactory(0, isNetty());
setupSessionFactory(3, isNetty());
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -32,15 +32,12 @@
{
if (packet.getType() == PacketImpl.SESS_SEND)
{
- try
- {
- Thread.sleep(2000);
- }
- catch (Exception e)
- {
- }
+ //Lose the send
+ return false;
}
-
- return true;
+ else
+ {
+ return true;
+ }
}
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -18,6 +18,7 @@
import org.hornetq.core.remoting.Interceptor;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
/**
* A DelayInterceptor2
@@ -32,16 +33,15 @@
public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
{
- try
+ if (packet.getType() == PacketImpl.NULL_RESPONSE)
{
- Thread.sleep(2000);
+ //Lose the response from the commit
+
+ return false;
}
- catch (InterruptedException e)
+ else
{
+ return true;
}
-
- log.info("proceeding");
-
- return true;
}
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -35,16 +35,12 @@
{
if (packet.getType() == PacketImpl.SESS_COMMIT)
{
- log.info("got sess commit, delaying");
- try
- {
- Thread.sleep(2000);
- }
- catch (Exception e)
- {
- }
+ //lose the commit
+ return false;
}
-
- return false;
+ else
+ {
+ return true;
+ }
}
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -472,8 +472,6 @@
RemotingConnection conn = ((ClientSessionInternal)session2).getConnection();
- log.info("Failing connection**");
-
// Simulate failure on connection
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
@@ -483,16 +481,12 @@
assertTrue(ok);
- log.info("** creating the consumer");
-
consumer = session2.createConsumer(ADDRESS);
for (int i = numMessages / 2; i < numMessages; i++)
{
ClientMessage message = consumer.receive(1000);
- log.info("got message " + message);
-
assertNotNull(message);
assertEquals("message" + i, message.getBody().readString());
@@ -1173,9 +1167,7 @@
assertTrue(ok);
- log.info("closing session");
session.close();
- log.info("closed session");
sf = new ClientSessionFactoryImpl(getConnectorTransportConfiguration(false));
@@ -1477,8 +1469,6 @@
assertTrue(ok);
- log.info("after failover");
-
for (int i = 0; i < numMessages; i++)
{
// Only the persistent messages will survive
@@ -1487,8 +1477,6 @@
{
ClientMessage message = consumer.receive(1000);
- log.info("got message " + i);
-
assertNotNull(message);
assertEquals("message" + i, message.getBody().readString());
@@ -1788,9 +1776,7 @@
sf.setBlockOnPersistentSend(true);
sf.setBlockOnAcknowledge(true);
- log.info("creating session");
final ClientSession session = sf.createSession(false, false);
- log.info("created session");
session.createQueue(ADDRESS, ADDRESS, null, true);
@@ -1840,23 +1826,18 @@
sf.addInterceptor(interceptor);
session.commit();
-
- log.info("Initial commit succeeded");
}
catch (HornetQException e)
{
if (e.getCode() == HornetQException.UNBLOCKED)
{
- log.info("commit unblocked");
-
// Ok - now we retry the commit after removing the interceptor
sf.removeInterceptor(interceptor);
try
{
- log.info("retrying commit");
- session.commit();
+ session.commit();
}
catch (HornetQException e2)
{
@@ -1958,9 +1939,7 @@
sf.setBlockOnPersistentSend(true);
sf.setBlockOnAcknowledge(true);
- log.info("creating session");
final ClientSession session = sf.createSession(false, false);
- log.info("created session");
session.createQueue(ADDRESS, ADDRESS, null, true);
@@ -2002,15 +1981,11 @@
server0Service.getRemotingService().addInterceptor(interceptor);
session.commit();
-
- log.info("Initial commit succeeded");
}
catch (HornetQException e)
{
if (e.getCode() == HornetQException.UNBLOCKED)
{
- log.info("commit unblocked");
-
// Ok - now we retry the commit after removing the interceptor
server0Service.getRemotingService().removeInterceptor(interceptor);
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -17,6 +17,7 @@
import java.util.Map;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@@ -79,12 +80,12 @@
public void testAutomaticFailover() throws Exception
{
HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"),
- new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams));
-
+ new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
jbcf.setBlockOnPersistentSend(true);
jbcf.setBlockOnNonPersistentSend(true);
-
+
Connection conn = jbcf.createConnection();
MyExceptionListener listener = new MyExceptionListener();
@@ -99,7 +100,7 @@
SimpleString jmsQueueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
- coreSession.createQueue(jmsQueueName, jmsQueueName, null, false);
+ coreSession.createQueue(jmsQueueName, jmsQueueName, null, true);
Queue queue = sess.createQueue("myqueue");
@@ -107,6 +108,8 @@
MessageProducer producer = sess.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
MessageConsumer consumer = sess.createConsumer(queue);
for (int i = 0; i < numMessages; i++)
@@ -137,19 +140,20 @@
conn.close();
- assertNull(listener.e);
+ assertNotNull(listener.e);
+
+ assertTrue(me == listener.e.getCause());
}
public void testManualFailover() throws Exception
{
HornetQConnectionFactory jbcfLive = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
-
+
jbcfLive.setBlockOnNonPersistentSend(true);
jbcfLive.setBlockOnPersistentSend(true);
-
HornetQConnectionFactory jbcfBackup = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams));
+ backupParams));
jbcfBackup.setBlockOnNonPersistentSend(true);
jbcfBackup.setBlockOnPersistentSend(true);
@@ -167,7 +171,7 @@
SimpleString jmsQueueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
- coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, false);
+ coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, true);
Queue queue = sessLive.createQueue("myqueue");
@@ -200,11 +204,8 @@
Connection connBackup = jbcfBackup.createConnection();
- log.info("creating session on backup");
Session sessBackup = connBackup.createSession(false, Session.AUTO_ACKNOWLEDGE);
- log.info("created on backup");
-
MessageConsumer consumerBackup = sessBackup.createConsumer(queue);
connBackup.start();
@@ -238,24 +239,29 @@
backupConf.setSecurityEnabled(false);
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
- backupParams));
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", backupParams));
backupConf.setBackup(true);
- backupService = HornetQ.newHornetQServer(backupConf, false);
+ backupConf.setSharedStore(true);
+ backupConf.setBindingsDirectory(getBindingsDir());
+ backupConf.setJournalMinFiles(2);
+ backupConf.setJournalDirectory(getJournalDir());
+ backupConf.setPagingDirectory(getPageDir());
+ backupConf.setLargeMessagesDirectory(getLargeMessagesDir());
+ backupService = HornetQ.newHornetQServer(backupConf, true);
backupService.start();
Configuration liveConf = new ConfigurationImpl();
liveConf.setSecurityEnabled(false);
liveConf.getAcceptorConfigurations()
.add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration backupTC = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams,
- "backup-connector");
- connectors.put(backupTC.getName(), backupTC);
- liveConf.setConnectorConfigurations(connectors);
- liveConf.setBackupConnectorName(backupTC.getName());
- liveService = HornetQ.newHornetQServer(liveConf, false);
+ liveConf.setSharedStore(true);
+ liveConf.setBindingsDirectory(getBindingsDir());
+ liveConf.setJournalMinFiles(2);
+ liveConf.setJournalDirectory(getJournalDir());
+ liveConf.setPagingDirectory(getPageDir());
+ liveConf.setLargeMessagesDirectory(getLargeMessagesDir());
+
+ liveService = HornetQ.newHornetQServer(liveConf, true);
liveService.start();
}
@@ -269,11 +275,11 @@
assertEquals(0, InVMRegistry.instance.size());
liveService = null;
-
+
backupService = null;
-
+
backupParams = null;
-
+
super.tearDown();
}
Deleted: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1,241 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.jms.server;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.impl.FileConfiguration;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.security.HornetQSecurityManager;
-import org.hornetq.core.security.impl.HornetQSecurityManagerImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.integration.transports.netty.NettyConnectorFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.JMSServerManager;
-import org.hornetq.jms.server.impl.JMSServerManagerImpl;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- *
- * A JMSServerStartStopWithReplicationTest
- *
- * Make sure live backup pair can be stopped and started ok multiple times with predefined queues etc
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- *
- */
-public class JMSServerStartStopWithReplicationTest extends UnitTestCase
-{
- private static final Logger log = Logger.getLogger(JMSServerStartStopWithReplicationTest.class);
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final Map<String, Object> backupParams = new HashMap<String, Object>();
-
- private JMSServerManager liveJMSServer;
-
- private JMSServerManager backupJMSServer;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testStopStartBackupBeforeLive() throws Exception
- {
- testStopStart1(true);
- }
-
- public void testStopStartLiveBeforeBackup() throws Exception
- {
- testStopStart1(false);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- this.liveJMSServer = null;
- this.backupJMSServer = null;
-
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- private void testStopStart1(final boolean backupBeforeLive) throws Exception
- {
- final int numMessages = 5;
-
- for (int j = 0; j < numMessages; j++)
- {
- log.info("Iteration " + j);
-
- startBackup();
- startLive();
-
- HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName()),
- new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(),
- backupParams));
-
- jbcf.setBlockOnPersistentSend(true);
- jbcf.setBlockOnNonPersistentSend(true);
-
- Connection conn = jbcf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = sess.createQueue("myJMSQueue");
-
- MessageProducer producer = sess.createProducer(queue);
-
- TextMessage tm = sess.createTextMessage("message" + j);
-
- producer.send(tm);
-
- conn.close();
-
- jbcf.close();
-
- if (backupBeforeLive)
- {
- stopBackup();
- stopLive();
- }
- else
- {
- stopLive();
- stopBackup();
- }
- }
-
- startBackup();
- startLive();
-
- HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName()),
- new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(),
- backupParams));
-
- jbcf.setBlockOnPersistentSend(true);
- jbcf.setBlockOnNonPersistentSend(true);
-
- Connection conn = jbcf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = sess.createQueue("myJMSQueue");
-
- MessageConsumer consumer = sess.createConsumer(queue);
-
- conn.start();
-
- for (int i = 0; i < numMessages; i++)
- {
- TextMessage tm = (TextMessage)consumer.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- conn.close();
-
- jbcf.close();
-
- if (backupBeforeLive)
- {
- stopBackup();
- stopLive();
- }
- else
- {
- stopLive();
- stopBackup();
- }
- }
-
- private void stopLive() throws Exception
- {
- liveJMSServer.stop();
- }
-
- private void stopBackup() throws Exception
- {
- backupJMSServer.stop();
- }
-
- private void startLive() throws Exception
- {
- FileConfiguration fcLive = new FileConfiguration();
-
- fcLive.setConfigurationUrl("server-start-stop-live-config1.xml");
-
- fcLive.start();
-
- HornetQSecurityManager smLive = new HornetQSecurityManagerImpl();
-
- HornetQServer liveServer = new HornetQServerImpl(fcLive, smLive);
-
- liveJMSServer = new JMSServerManagerImpl(liveServer, "server-start-stop-live-jms-config1.xml");
-
- liveJMSServer.setContext(null);
-
- liveJMSServer.start();
- }
-
- private void startBackup() throws Exception
- {
- FileConfiguration fcBackup = new FileConfiguration();
-
- fcBackup.setConfigurationUrl("server-start-stop-backup-config1.xml");
-
- fcBackup.start();
-
- HornetQSecurityManager smBackup = new HornetQSecurityManagerImpl();
-
- HornetQServer liveServer = new HornetQServerImpl(fcBackup, smBackup);
-
- backupJMSServer = new JMSServerManagerImpl(liveServer, "server-start-stop-backup-jms-config1.xml");
-
- backupJMSServer.setContext(null);
-
- backupJMSServer.start();
- }
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -270,7 +270,7 @@
}
}
- protected static Object checkBinding(Context context, String binding) throws Exception
+ protected static Object checkBinding(Context context, String binding) throws Exception
{
Object o = context.lookup(binding);
assertNotNull(o);
15 years, 3 months