[jboss-cvs] JBoss Messaging SVN: r6049 - in trunk: src/main/org/jboss/messaging/core/management/impl and 14 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 9 10:53:42 EDT 2009
Author: timfox
Date: 2009-03-09 10:53:41 -0400 (Mon, 09 Mar 2009)
New Revision: 6049
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateAcknowledgeMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingAddedMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingRemovedMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerAddedMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerRemovedMessage.java
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateClusterConnectionUpdate.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java
trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java
trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
trunk/src/main/org/jboss/messaging/utils/TypedProperties.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionPacketHandlerTest.java
Log:
clustered backup
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -59,7 +59,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
@@ -270,7 +270,7 @@
{
checkClosed();
- SessionCreateQueueMessage request = new SessionCreateQueueMessage(address, queueName, filterString, durable, temp);
+ CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp);
channel.sendBlocking(request);
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -551,7 +551,7 @@
TypedProperties notifProps;
if (notification.getProperties() != null)
{
- notifProps = notification.getProperties();
+ notifProps = new TypedProperties(notification.getProperties());
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -224,6 +224,10 @@
}
addressManager.clear();
+
+ queueInfos.clear();
+
+ transientIDs.clear();
started = false;
}
@@ -361,7 +365,7 @@
.toString());
long redistributionDelay = addressSettings.getRedistributionDelay();
-
+
if (redistributionDelay != -1)
{
queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
@@ -431,7 +435,7 @@
.toString());
long redistributionDelay = addressSettings.getRedistributionDelay();
-
+
if (redistributionDelay != -1)
{
queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
@@ -441,6 +445,10 @@
break;
}
+ default:
+ {
+ throw new IllegalArgumentException("Invalid type " + type.toInt());
+ }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -86,7 +86,6 @@
public void clear()
{
- // destinations.clear();
nameMap.clear();
mappings.clear();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -31,8 +31,8 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -106,7 +106,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
@@ -678,12 +678,12 @@
packet = new SessionQueueQueryResponseMessage();
break;
}
- case SESS_CREATE_QUEUE:
+ case CREATE_QUEUE:
{
- packet = new SessionCreateQueueMessage();
+ packet = new CreateQueueMessage();
break;
}
- case SESS_DELETE_QUEUE:
+ case DELETE_QUEUE:
{
packet = new SessionDeleteQueueMessage();
break;
@@ -888,6 +888,8 @@
private CommandConfirmationHandler commandConfirmationHandler;
private int responseActionCount;
+
+ private boolean playedResponsesOnFailure;
public void setCommandConfirmationHandler(final CommandConfirmationHandler handler)
{
@@ -1158,6 +1160,19 @@
public void executeOutstandingDelayedResults()
{
+ //Execute on different thread to avoid deadlock
+
+ new Thread()
+ {
+ public void run()
+ {
+ doExecuteOutstandingDelayedResults();
+ }
+ }.start();
+ }
+
+ private void doExecuteOutstandingDelayedResults()
+ {
synchronized (replicationLock)
{
// Execute all the response actions now
@@ -1328,8 +1343,6 @@
}
}
- private boolean playedResponsesOnFailure;
-
// This will never get called concurrently by more than one thread
// TODO it's not ideal synchronizing this since it forms a contention point with replication
@@ -1454,8 +1467,11 @@
if (packet.isResponse())
{
response = packet;
+
confirm(packet);
+
lock.lock();
+
try
{
sendCondition.signal();
Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java (from rev 6035, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateQueueMessage.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,147 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+
+ * @version <tt>$Revision$</tt>
+ */
+public class CreateQueueMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private SimpleString address;
+ private SimpleString queueName;
+ private SimpleString filterString;
+ private boolean durable;
+ private boolean temporary;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public CreateQueueMessage(final SimpleString address, final SimpleString queueName,
+ final SimpleString filterString, final boolean durable, final boolean temporary)
+ {
+ super(CREATE_QUEUE);
+
+ this.address = address;
+ this.queueName = queueName;
+ this.filterString = filterString;
+ this.durable = durable;
+ this.temporary = temporary;
+ }
+
+ public CreateQueueMessage()
+ {
+ super(CREATE_QUEUE);
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public String toString()
+ {
+ StringBuffer buff = new StringBuffer(getParentString());
+ buff.append(", address=" + address);
+ buff.append(", queueName=" + queueName);
+ buff.append(", filterString=" + filterString);
+ buff.append(", durable=" + durable);
+ buff.append(", temporary=" + temporary);
+ buff.append("]");
+ return buff.toString();
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public SimpleString getQueueName()
+ {
+ return queueName;
+ }
+
+ public SimpleString getFilterString()
+ {
+ return filterString;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public boolean isTemporary()
+ {
+ return temporary;
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.writeSimpleString(address);
+ buffer.writeSimpleString(queueName);
+ buffer.writeNullableSimpleString(filterString);
+ buffer.writeBoolean(durable);
+ buffer.writeBoolean(temporary);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ address = buffer.readSimpleString();
+ queueName = buffer.readSimpleString();
+ filterString = buffer.readNullableSimpleString();
+ durable = buffer.readBoolean();
+ temporary = buffer.readBoolean();
+ }
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof CreateQueueMessage == false)
+ {
+ return false;
+ }
+
+ CreateQueueMessage r = (CreateQueueMessage)other;
+
+ return super.equals(other) && r.address.equals(this.address) &&
+ r.queueName.equals(this.queueName) &&
+ (r.filterString == null ? this.filterString == null : r.filterString.equals(this.filterString)) &&
+ r.durable == this.durable &&
+ r.temporary == this.temporary;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -67,7 +67,12 @@
public static final byte REATTACH_SESSION_RESP = 33;
public static final byte REPLICATE_CREATESESSION = 34;
+
+ public static final byte CREATE_QUEUE = 35;
+ public static final byte DELETE_QUEUE = 36;
+
+
// Session
public static final byte SESS_CREATECONSUMER = 40;
@@ -83,10 +88,6 @@
public static final byte SESS_QUEUEQUERY_RESP = 46;
- public static final byte SESS_CREATE_QUEUE = 47;
-
- public static final byte SESS_DELETE_QUEUE = 48;
-
public static final byte SESS_BINDINGQUERY = 49;
public static final byte SESS_BINDINGQUERY_RESP = 50;
@@ -145,12 +146,18 @@
//Replication
- public static final byte SESS_REPLICATE_DELIVERY = 91;
+ public static final byte SESS_REPLICATE_DELIVERY = 90;
- public static final byte REPLICATE_UPDATE_CONNECTORS = 92;
+ public static final byte REPLICATE_ADD_REMOTE_QUEUE_BINDING = 91;
- public static final byte REPLICATE_NOTIFICATION = 93;
+ public static final byte REPLICATE_REMOVE_REMOTE_QUEUE_BINDING = 92;
+ public static final byte REPLICATE_ADD_REMOTE_CONSUMER = 93;
+
+ public static final byte REPLICATE_REMOVE_REMOTE_CONSUMER = 94;
+
+ public static final byte REPLICATE_ACKNOWLEDGE = 95;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -1,147 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
-
- * @version <tt>$Revision$</tt>
- */
-public class SessionCreateQueueMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private SimpleString address;
- private SimpleString queueName;
- private SimpleString filterString;
- private boolean durable;
- private boolean temporary;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionCreateQueueMessage(final SimpleString address, final SimpleString queueName,
- final SimpleString filterString, final boolean durable, final boolean temporary)
- {
- super(SESS_CREATE_QUEUE);
-
- this.address = address;
- this.queueName = queueName;
- this.filterString = filterString;
- this.durable = durable;
- this.temporary = temporary;
- }
-
- public SessionCreateQueueMessage()
- {
- super(SESS_CREATE_QUEUE);
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public String toString()
- {
- StringBuffer buff = new StringBuffer(getParentString());
- buff.append(", address=" + address);
- buff.append(", queueName=" + queueName);
- buff.append(", filterString=" + filterString);
- buff.append(", durable=" + durable);
- buff.append(", temporary=" + temporary);
- buff.append("]");
- return buff.toString();
- }
-
- public SimpleString getAddress()
- {
- return address;
- }
-
- public SimpleString getQueueName()
- {
- return queueName;
- }
-
- public SimpleString getFilterString()
- {
- return filterString;
- }
-
- public boolean isDurable()
- {
- return durable;
- }
-
- public boolean isTemporary()
- {
- return temporary;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.writeSimpleString(address);
- buffer.writeSimpleString(queueName);
- buffer.writeNullableSimpleString(filterString);
- buffer.writeBoolean(durable);
- buffer.writeBoolean(temporary);
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- address = buffer.readSimpleString();
- queueName = buffer.readSimpleString();
- filterString = buffer.readNullableSimpleString();
- durable = buffer.readBoolean();
- temporary = buffer.readBoolean();
- }
-
- public boolean equals(Object other)
- {
- if (other instanceof SessionCreateQueueMessage == false)
- {
- return false;
- }
-
- SessionCreateQueueMessage r = (SessionCreateQueueMessage)other;
-
- return super.equals(other) && r.address.equals(this.address) &&
- r.queueName.equals(this.queueName) &&
- (r.filterString == null ? this.filterString == null : r.filterString.equals(this.filterString)) &&
- r.durable == this.durable &&
- r.temporary == this.temporary;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -44,14 +44,14 @@
public SessionDeleteQueueMessage(final SimpleString queueName)
{
- super(SESS_DELETE_QUEUE);
+ super(DELETE_QUEUE);
this.queueName = queueName;
}
public SessionDeleteQueueMessage()
{
- super(SESS_DELETE_QUEUE);
+ super(DELETE_QUEUE);
}
// Public --------------------------------------------------------
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateAcknowledgeMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateAcknowledgeMessage.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,89 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.replication;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ *
+ * A ReplicateAcknowledgeMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 4 Mar 2009 18:36:30
+ *
+ *
+ */
+public class ReplicateAcknowledgeMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ //TODO - use queue id not name for smaller packet size
+ private SimpleString uniqueName;
+
+ private long messageID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicateAcknowledgeMessage(final SimpleString uniqueName, final long messageID)
+ {
+ super(REPLICATE_ACKNOWLEDGE);
+
+ this.uniqueName = uniqueName;
+
+ this.messageID = messageID;
+ }
+
+ // Public --------------------------------------------------------
+
+ public ReplicateAcknowledgeMessage()
+ {
+ super(REPLICATE_ACKNOWLEDGE);
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.writeSimpleString(uniqueName);
+ buffer.writeLong(messageID);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ uniqueName = buffer.readSimpleString();
+ messageID = buffer.readLong();
+ }
+
+ public SimpleString getUniqueName()
+ {
+ return uniqueName;
+ }
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateClusterConnectionUpdate.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateClusterConnectionUpdate.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateClusterConnectionUpdate.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -1,138 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat.replication;
-
-import java.util.List;
-
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.Pair;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- *
- * A ReplicateClusterConnectionUpdate
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 4 Mar 2009 17:46:53
- *
- *
- */
-public class ReplicateClusterConnectionUpdate extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private SimpleString clusterConnectionName;
-
- private List<Pair<TransportConfiguration, TransportConfiguration>> connectors;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ReplicateClusterConnectionUpdate(SimpleString clusterConnectionName, List<Pair<TransportConfiguration, TransportConfiguration>> connectors)
- {
- super(REPLICATE_UPDATE_CONNECTORS);
-
- this.clusterConnectionName = clusterConnectionName;
-
- this.connectors = connectors;
- }
-
- public ReplicateClusterConnectionUpdate()
- {
- super(REPLICATE_UPDATE_CONNECTORS);
- }
-
- // Public --------------------------------------------------------
-
- public SimpleString getClusterConnectionName()
- {
- return clusterConnectionName;
- }
-
- public List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors()
- {
- return connectors;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.writeSimpleString(clusterConnectionName);
-
- buffer.writeInt(connectors.size());
-
- for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
- {
- connectorPair.a.encode(buffer);
-
- if (connectorPair.b != null)
- {
- buffer.writeBoolean(true);
-
- connectorPair.b.encode(buffer);
- }
- else
- {
- buffer.writeBoolean(false);
- }
- }
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- clusterConnectionName = buffer.readSimpleString();
-
- int size = buffer.readInt();
-
- for (int i = 0; i < size; i++)
- {
- TransportConfiguration connector = new TransportConfiguration();
-
- connector.decode(buffer);
-
- boolean existsBackup = buffer.readBoolean();
-
- TransportConfiguration backupConnector = null;
-
- if (existsBackup)
- {
- backupConnector = new TransportConfiguration();
-
- backupConnector.decode(buffer);
- }
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
- backupConnector);
-
- connectors.add(connectorPair);
- }
- }
-
- public boolean isRequiresConfirmations()
- {
- return false;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingAddedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingAddedMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingAddedMessage.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,154 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.replication;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ *
+ * A ReplicateRemoteBindingAddedMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 4 Mar 2009 18:36:30
+ *
+ *
+ */
+public class ReplicateRemoteBindingAddedMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private SimpleString clusterConnectionName;
+
+ private SimpleString address;
+
+ private SimpleString uniqueName;
+
+ private SimpleString routingName;
+
+ private int remoteQueueID;
+
+ private SimpleString filterString;
+
+ private SimpleString sfQueueName;
+
+ private int distance;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicateRemoteBindingAddedMessage(SimpleString clusterConnectionName,
+ SimpleString address,
+ SimpleString uniqueName,
+ SimpleString routingName,
+ int remoteQueueID,
+ SimpleString filterString,
+ SimpleString sfQueueName,
+ int distance)
+ {
+ super(REPLICATE_ADD_REMOTE_QUEUE_BINDING);
+
+ this.clusterConnectionName = clusterConnectionName;
+ this.address = address;
+ this.uniqueName = uniqueName;
+ this.routingName = routingName;
+ this.remoteQueueID = remoteQueueID;
+ this.filterString = filterString;
+ this.sfQueueName = sfQueueName;
+ this.distance = distance;
+ }
+
+ // Public --------------------------------------------------------
+
+ public ReplicateRemoteBindingAddedMessage()
+ {
+ super(REPLICATE_ADD_REMOTE_QUEUE_BINDING);
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.writeSimpleString(clusterConnectionName);
+ buffer.writeSimpleString(address);
+ buffer.writeSimpleString(uniqueName);
+ buffer.writeSimpleString(routingName);
+ buffer.writeInt(remoteQueueID);
+ buffer.writeNullableSimpleString(filterString);
+ buffer.writeSimpleString(sfQueueName);
+ buffer.writeInt(distance);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ clusterConnectionName = buffer.readSimpleString();
+ address = buffer.readSimpleString();
+ uniqueName = buffer.readSimpleString();
+ routingName = buffer.readSimpleString();
+ remoteQueueID = buffer.readInt();
+ filterString = buffer.readNullableSimpleString();
+ sfQueueName = buffer.readSimpleString();
+ distance = buffer.readInt();
+ }
+
+ public SimpleString getClusterConnectionName()
+ {
+ return clusterConnectionName;
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public SimpleString getUniqueName()
+ {
+ return uniqueName;
+ }
+
+ public SimpleString getRoutingName()
+ {
+ return routingName;
+ }
+
+ public int getRemoteQueueID()
+ {
+ return remoteQueueID;
+ }
+
+ public SimpleString getFilterString()
+ {
+ return filterString;
+ }
+
+ public SimpleString getSfQueueName()
+ {
+ return sfQueueName;
+ }
+
+ public int getDistance()
+ {
+ return distance;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingRemovedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingRemovedMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteBindingRemovedMessage.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.replication;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ *
+ * A ReplicateRemoteBindingRemovedMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 4 Mar 2009 18:36:30
+ *
+ *
+ */
+public class ReplicateRemoteBindingRemovedMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private SimpleString uniqueName;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicateRemoteBindingRemovedMessage(SimpleString uniqueName)
+ {
+ super(REPLICATE_REMOVE_REMOTE_QUEUE_BINDING);
+
+ this.uniqueName = uniqueName;
+ }
+
+ // Public --------------------------------------------------------
+
+ public ReplicateRemoteBindingRemovedMessage()
+ {
+ super(REPLICATE_REMOVE_REMOTE_QUEUE_BINDING);
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.writeSimpleString(uniqueName);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ uniqueName = buffer.readSimpleString();
+ }
+
+ public SimpleString getUniqueName()
+ {
+ return uniqueName;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerAddedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerAddedMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerAddedMessage.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.replication;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ *
+ * A ReplicateRemoteConsumerAddedMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 4 Mar 2009 18:36:30
+ *
+ *
+ */
+public class ReplicateRemoteConsumerAddedMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private SimpleString uniqueBindingName;
+
+ private SimpleString filterString;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicateRemoteConsumerAddedMessage(SimpleString uniqueBindingName, SimpleString filterString)
+ {
+ super(REPLICATE_ADD_REMOTE_CONSUMER);
+
+ this.uniqueBindingName = uniqueBindingName;
+
+ this.filterString = filterString;
+ }
+
+ // Public --------------------------------------------------------
+
+ public ReplicateRemoteConsumerAddedMessage()
+ {
+ super(REPLICATE_ADD_REMOTE_CONSUMER);
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.writeSimpleString(uniqueBindingName);
+
+ buffer.writeNullableSimpleString(filterString);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ uniqueBindingName = buffer.readSimpleString();
+
+ filterString = buffer.readNullableSimpleString();
+ }
+
+ public SimpleString getUniqueBindingName()
+ {
+ return uniqueBindingName;
+ }
+
+ public SimpleString getFilterString()
+ {
+ return filterString;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerRemovedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerRemovedMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateRemoteConsumerRemovedMessage.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.replication;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ *
+ * A ReplicateRemoteConsumerRemovedMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 4 Mar 2009 18:36:30
+ *
+ *
+ */
+public class ReplicateRemoteConsumerRemovedMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private SimpleString uniqueBindingName;
+
+ private SimpleString filterString;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicateRemoteConsumerRemovedMessage(SimpleString uniqueBindingName, SimpleString filterString)
+ {
+ super(REPLICATE_REMOVE_REMOTE_CONSUMER);
+
+ this.uniqueBindingName = uniqueBindingName;
+
+ this.filterString = filterString;
+ }
+
+ // Public --------------------------------------------------------
+
+ public ReplicateRemoteConsumerRemovedMessage()
+ {
+ super(REPLICATE_REMOVE_REMOTE_CONSUMER);
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.writeSimpleString(uniqueBindingName);
+
+ buffer.writeNullableSimpleString(filterString);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ uniqueBindingName = buffer.readSimpleString();
+
+ filterString = buffer.readNullableSimpleString();
+ }
+
+ public SimpleString getUniqueBindingName()
+ {
+ return uniqueBindingName;
+ }
+
+ public SimpleString getFilterString()
+ {
+ return filterString;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -85,26 +85,24 @@
boolean xa,
int sendWindowSize) throws Exception;
- CreateSessionResponseMessage replicateCreateSession(String name,
- long channelID,
- long originalSessionID,
- String username,
- String password,
- int minLargeMessageSize,
- int incrementingVersion,
- RemotingConnection remotingConnection,
- boolean autoCommitSends,
- boolean autoCommitAcks,
- boolean preAcknowledge,
- boolean xa,
- int sendWindowSize) throws Exception;
-
- void updateClusterConnectionConnectors(SimpleString clusterConnectionName, List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception;
-
+ void replicateCreateSession(String name,
+ long channelID,
+ long originalSessionID,
+ String username,
+ String password,
+ int minLargeMessageSize,
+ int incrementingVersion,
+ RemotingConnection remotingConnection,
+ boolean autoCommitSends,
+ boolean autoCommitAcks,
+ boolean preAcknowledge,
+ boolean xa,
+ int sendWindowSize) throws Exception;
+
void removeSession(String name) throws Exception;
-
+
ServerSession getSession(String name);
-
+
Set<ServerSession> getSessions();
boolean isStarted();
@@ -122,14 +120,14 @@
ResourceManager getResourceManager();
List<ServerSession> getSessions(String connectionID);
-
+
ClusterManager getClusterManager();
-
+
QueueFactory getQueueFactory();
-
+
SimpleString getNodeID();
-
+
UUID getUUID();
-
+
Channel getReplicatingChannel();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -32,7 +32,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
@@ -114,7 +114,7 @@
void handleStop(Packet packet);
- void handleCreateQueue(SessionCreateQueueMessage packet);
+ void handleCreateQueue(CreateQueueMessage packet);
void handleDeleteQueue(SessionDeleteQueueMessage packet);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -48,5 +48,7 @@
Transformer getTransformer();
- boolean isUseDuplicateDetection();
+ boolean isUseDuplicateDetection();
+
+ void activate();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/BroadcastGroup.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -47,4 +47,6 @@
int size();
void broadcastConnectors() throws Exception;
+
+ void activate();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -20,14 +20,9 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.core.server.cluster;
-import java.util.List;
-
-import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.SimpleString;
/**
@@ -42,6 +37,14 @@
public interface ClusterConnection extends MessagingComponent
{
SimpleString getName();
+
+ void handleReplicatedAddBinding(SimpleString address,
+ SimpleString uniqueName,
+ SimpleString routingName,
+ int queueID,
+ SimpleString filterString,
+ SimpleString queueName,
+ int distance) throws Exception;
- void handleReplicatedUpdateConnectors(List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception;
+ void activate();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -43,4 +43,6 @@
Set<ClusterConnection> getClusterConnections();
ClusterConnection getClusterConnection(SimpleString name);
+
+ void activate();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -64,8 +64,11 @@
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.postoffice.BindingType;
+import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
@@ -147,6 +150,10 @@
private final String clusterPassword;
+ private Channel replicatingChannel;
+
+ private boolean activated;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -169,7 +176,9 @@
final boolean useDuplicateDetection,
final SimpleString managementAddress,
final SimpleString managementNotificationAddress,
- final String clusterPassword) throws Exception
+ final String clusterPassword,
+ final Channel replicatingChannel,
+ final boolean activated) throws Exception
{
this(nodeUUID,
name,
@@ -188,7 +197,9 @@
managementAddress,
managementNotificationAddress,
clusterPassword,
- null);
+ null,
+ replicatingChannel,
+ activated);
}
public BridgeImpl(final UUID nodeUUID,
@@ -208,7 +219,9 @@
final SimpleString managementAddress,
final SimpleString managementNotificationAddress,
final String clusterPassword,
- final MessageFlowRecord flowRecord) throws Exception
+ final MessageFlowRecord flowRecord,
+ final Channel replicatingChannel,
+ final boolean activated) throws Exception
{
this.nodeUUID = nodeUUID;
@@ -254,6 +267,10 @@
this.clusterPassword = clusterPassword;
this.flowRecord = flowRecord;
+
+ this.replicatingChannel = replicatingChannel;
+
+ this.activated = activated;
}
public synchronized void start() throws Exception
@@ -265,7 +282,10 @@
started = true;
- executor.execute(new CreateObjectsRunnable());
+ if (activated)
+ {
+ executor.execute(new CreateObjectsRunnable());
+ }
}
private void cancelRefs() throws Exception
@@ -304,89 +324,214 @@
this.waitForRunnablesToComplete();
}
- private class StopRunnable implements Runnable
+ public boolean isStarted()
{
- public void run()
- {
- try
- {
- synchronized (BridgeImpl.this)
- {
- if (!started)
- {
- return;
- }
+ return started;
+ }
- if (session != null)
- {
- session.close();
- }
+ public synchronized void activate()
+ {
+ replicatingChannel = null;
+
+ activated = true;
- started = false;
+ executor.execute(new CreateObjectsRunnable());
+ }
- active = false;
+ public SimpleString getName()
+ {
+ return name;
+ }
- }
+ public Queue getQueue()
+ {
+ return queue;
+ }
- queue.removeConsumer(BridgeImpl.this);
+ public Filter getFilter()
+ {
+ return filter;
+ }
- cancelRefs();
- }
- catch (Exception e)
- {
- log.error("Failed to stop bridge", e);
- }
+ public SimpleString getForwardingAddress()
+ {
+ return forwardingAddress;
+ }
+
+ public Transformer getTransformer()
+ {
+ return transformer;
+ }
+
+ public boolean isUseDuplicateDetection()
+ {
+ return useDuplicateDetection;
+ }
+
+ // For testing only
+ public RemotingConnection getForwardingConnection()
+ {
+ if (session == null)
+ {
+ return null;
}
+ else
+ {
+ return ((ClientSessionImpl)session).getConnection();
+ }
}
- private class FailRunnable implements Runnable
+ // SendAcknowledgementHandler implementation ---------------------
+
+ public void sendAcknowledged(final Message message)
{
- public void run()
+ try
{
- synchronized (BridgeImpl.this)
+ final MessageReference ref = refs.poll();
+
+ if (ref != null)
{
- if (!started)
+ if (replicatingChannel == null)
{
- return;
+ // Acknowledge when we know send has been processed on the server
+ ref.getQueue().acknowledge(ref);
}
+ else
+ {
+ Packet packet = new ReplicateAcknowledgeMessage(name, ref.getMessage().getMessageID());
- if (flowRecord != null)
+ replicatingChannel.replicatePacket(packet, 2, new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ ref.getQueue().acknowledge(ref);
+ }
+ catch (Exception e)
+ {
+ log.info("Failed to ack", e);
+ }
+ }
+ });
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.info("Failed to ack", e);
+ }
+ }
+
+ // Consumer implementation ---------------------------------------
+
+ public HandleStatus handle(final MessageReference ref) throws Exception
+ {
+ if (filter != null && !filter.match(ref.getMessage()))
+ {
+ return HandleStatus.NO_MATCH;
+ }
+
+ if (!active)
+ {
+ return HandleStatus.BUSY;
+ }
+
+ synchronized (this)
+ {
+ ref.getQueue().referenceHandled();
+
+ ServerMessage message = ref.getMessage();
+
+ refs.add(ref);
+
+ if (flowRecord != null)
+ {
+ // We make a shallow copy of the message, then we strip out the unwanted routing id headers and leave
+ // only
+ // the one pertinent for the destination node - this is important since different queues on different
+ // nodes could have same queue ids
+ // Note we must copy since same message may get routed to other nodes which require different headers
+ message = message.copy();
+
+ // TODO - we can optimise this
+
+ Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+
+ byte[] queueIds = (byte[])message.getProperty(idsHeaderName);
+
+ for (SimpleString propName : propNames)
{
- try
+ if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
{
- flowRecord.reset();
+ message.removeProperty(propName);
}
- catch (Exception e)
- {
- log.error("Failed to reset", e);
- }
}
- active = false;
+ message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+
+ message.putBooleanProperty(MessageImpl.HDR_FROM_CLUSTER, Boolean.TRUE);
}
- try
+ if (useDuplicateDetection && !message.containsProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID))
{
- queue.removeConsumer(BridgeImpl.this);
+ // If we are using duplicate detection and there's not already a duplicate detection header, then
+ // we add a header composed of the persistent node id and the message id, which makes it globally unique
+ // between restarts.
+ // If you use a cluster connection then a guid based duplicate id will be used since it is added *before*
+ // the
+ // message goes into the store and forward queue.
+ // But with this technique it also works when the messages don't already have such a header in them in the
+ // queue.
+ byte[] bytes = new byte[24];
- session.cleanUp();
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
- cancelRefs();
+ bb.put(nodeUUID.asBytes());
- csf.close();
+ bb.putLong(message.getMessageID());
+
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
}
- catch (Exception e)
+
+ if (transformer != null)
{
- log.error("Failed to stop", e);
+ message = transformer.transform(message);
}
- if (!createObjects())
+ SimpleString dest;
+
+ if (forwardingAddress != null)
{
- started = false;
+ dest = forwardingAddress;
}
+ else
+ {
+ // Preserve the original address
+ dest = message.getDestination();
+ }
+
+ producer.send(dest, message);
+
+ return HandleStatus.HANDLED;
}
}
+ // FailureListener implementation --------------------------------
+
+ public boolean connectionFailed(final MessagingException me)
+ {
+ fail();
+
+ return true;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
private void waitForRunnablesToComplete()
{
// Wait for any create objects runnable to complete
@@ -532,183 +677,91 @@
}
}
- public boolean isStarted()
- {
- return started;
- }
+ // Inner classes -------------------------------------------------
- public SimpleString getName()
+ private class StopRunnable implements Runnable
{
- return name;
- }
+ public void run()
+ {
+ try
+ {
+ synchronized (BridgeImpl.this)
+ {
+ if (!started)
+ {
+ return;
+ }
- public Queue getQueue()
- {
- return queue;
- }
+ if (session != null)
+ {
+ session.close();
+ }
- public Filter getFilter()
- {
- return filter;
- }
+ started = false;
- public SimpleString getForwardingAddress()
- {
- return forwardingAddress;
- }
+ active = false;
- public Transformer getTransformer()
- {
- return transformer;
- }
+ }
- public boolean isUseDuplicateDetection()
- {
- return useDuplicateDetection;
- }
+ queue.removeConsumer(BridgeImpl.this);
- // For testing only
- public RemotingConnection getForwardingConnection()
- {
- if (session == null)
- {
- return null;
- }
- else
- {
- return ((ClientSessionImpl)session).getConnection();
- }
- }
-
- // SendAcknowledgementHandler implementation ---------------------
-
- public void sendAcknowledged(final Message message)
- {
- try
- {
- MessageReference ref = refs.poll();
-
- if (ref != null)
+ cancelRefs();
+ }
+ catch (Exception e)
{
- // Acknowledge when we know send has been processed on the server
- ref.getQueue().acknowledge(ref);
+ log.error("Failed to stop bridge", e);
}
}
- catch (Exception e)
- {
- log.info("Failed to ack", e);
- }
}
- // Consumer implementation ---------------------------------------
-
- public HandleStatus handle(final MessageReference ref) throws Exception
+ private class FailRunnable implements Runnable
{
- if (filter != null && !filter.match(ref.getMessage()))
+ public void run()
{
- return HandleStatus.NO_MATCH;
- }
-
- if (!active)
- {
- return HandleStatus.BUSY;
- }
-
- synchronized (this)
- {
- ref.getQueue().referenceHandled();
-
- ServerMessage message = ref.getMessage();
-
- refs.add(ref);
-
- if (flowRecord != null)
+ synchronized (BridgeImpl.this)
{
- // We make a shallow copy of the message, then we strip out the unwanted routing id headers and leave
- // only
- // the one pertinent for the destination node - this is important since different queues on different
- // nodes could have same queue ids
- // Note we must copy since same message may get routed to other nodes which require different headers
- message = message.copy();
+ if (!started)
+ {
+ return;
+ }
- // TODO - we can optimise this
-
- Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
-
- byte[] queueIds = (byte[])message.getProperty(idsHeaderName);
-
- for (SimpleString propName : propNames)
+ if (flowRecord != null)
{
- if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
+ try
{
- message.removeProperty(propName);
+ flowRecord.reset();
}
+ catch (Exception e)
+ {
+ log.error("Failed to reset", e);
+ }
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
-
- message.putBooleanProperty(MessageImpl.HDR_FROM_CLUSTER, Boolean.TRUE);
+ active = false;
}
- if (useDuplicateDetection && !message.containsProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID))
+ try
{
- //If we are using duplicate detection and there's not already a duplicate detection header, then
- //we add a header composed of the persistent node id and the message id, which makes it globally unique
- //between restarts.
- //If you use a cluster connection then a guid based duplicate id will be used since it is added *before* the
- //message goes into the store and forward queue.
- //But with this technique it also works when the messages don't already have such a header in them in the queue.
- byte[] bytes = new byte[24];
+ queue.removeConsumer(BridgeImpl.this);
- ByteBuffer bb = ByteBuffer.wrap(bytes);
+ session.cleanUp();
- bb.put(nodeUUID.asBytes());
+ cancelRefs();
- bb.putLong(message.getMessageID());
-
- message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
+ csf.close();
}
-
- if (transformer != null)
+ catch (Exception e)
{
- message = transformer.transform(message);
+ log.error("Failed to stop", e);
}
- SimpleString dest;
-
- if (forwardingAddress != null)
+ if (!createObjects())
{
- dest = forwardingAddress;
+ started = false;
}
- else
- {
- // Preserve the original address
- dest = message.getDestination();
- }
-
- producer.send(dest, message);
-
- return HandleStatus.HANDLED;
}
}
- // FailureListener implementation --------------------------------
-
- public boolean connectionFailed(final MessagingException me)
- {
- fail();
-
- return true;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
private class CreateObjectsRunnable implements Runnable
{
public synchronized void run()
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -22,7 +22,6 @@
package org.jboss.messaging.core.server.cluster.impl;
-import java.io.ByteArrayOutputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
@@ -66,12 +65,15 @@
private boolean started;
private ScheduledFuture<?> future;
+
+ private boolean active;
public BroadcastGroupImpl(final String nodeID,
final String name,
final int localPort,
final InetAddress groupAddress,
- final int groupPort) throws Exception
+ final int groupPort,
+ final boolean active) throws Exception
{
this.nodeID = nodeID;
@@ -82,6 +84,10 @@
this.groupAddress = groupAddress;
this.groupPort = groupPort;
+
+ this.active = active;
+
+ log.info("created broadcast group active "+ active);
}
public synchronized void start() throws Exception
@@ -145,9 +151,19 @@
{
return connectorPairs.size();
}
+
+ public synchronized void activate()
+ {
+ active = true;
+ }
public synchronized void broadcastConnectors() throws Exception
{
+ if (!active)
+ {
+ return;
+ }
+
MessagingBuffer buff = ChannelBuffers.dynamicBuffer(4096);
buff.writeString(nodeID);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -22,6 +22,8 @@
package org.jboss.messaging.core.server.cluster.impl;
+import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CLOSED;
+import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
import static org.jboss.messaging.core.postoffice.impl.PostOfficeImpl.HDR_RESET_QUEUE_DATA;
import java.util.HashMap;
@@ -48,7 +50,10 @@
import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateClusterConnectionUpdate;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.cluster.Bridge;
@@ -109,13 +114,15 @@
private final int maxHops;
private final UUID nodeUUID;
-
+
private final Channel replicatingChannel;
+ private final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors;
+
private boolean backup;
private volatile boolean started;
-
+
/*
* Constructor using static list of connectors
*/
@@ -172,10 +179,12 @@
this.maxHops = maxHops;
this.nodeUUID = nodeUUID;
-
+
this.replicatingChannel = replicatingChannel;
+
+ this.backup = backup;
- this.backup = backup;
+ this.staticConnectors = connectors;
this.updateConnectors(connectors);
}
@@ -236,10 +245,12 @@
this.maxHops = maxHops;
this.nodeUUID = nodeUUID;
-
+
this.replicatingChannel = replicatingChannel;
+
+ this.backup = backup;
- this.backup = backup;
+ this.staticConnectors = null;
}
public synchronized void start() throws Exception
@@ -286,21 +297,37 @@
{
return name;
}
-
- public synchronized void handleReplicatedUpdateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+
+ public synchronized void activate()
{
- if (!backup)
+ backup = false;
+
+ if (discoveryGroup != null)
{
- return;
+ connectorsChanged();
}
-
- updateConnectors(connectors);
+ else
+ {
+ try
+ {
+ updateConnectors(staticConnectors);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to update connectors", e);
+ }
+ }
}
// DiscoveryListener implementation ------------------------------------------------------------------
public synchronized void connectorsChanged()
{
+ if (backup)
+ {
+ return;
+ }
+
try
{
List<Pair<TransportConfiguration, TransportConfiguration>> connectors = discoveryGroup.getConnectors();
@@ -312,34 +339,10 @@
log.error("Failed to update connectors", e);
}
}
-
+
private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
{
- if (replicatingChannel == null)
- {
- doUpdateConnectors(connectors);
- }
- else
- {
- Packet packet = new ReplicateClusterConnectionUpdate(name, connectors);
-
- Runnable action = new Runnable()
- {
- public void run()
- {
- try
- {
- doUpdateConnectors(connectors);
- }
- catch (Exception e)
- {
- log.error("Failed to update connectors", e);
- }
- }
- };
-
- replicatingChannel.replicatePacket(packet, 1, action);
- }
+ doUpdateConnectors(connectors);
}
private void doUpdateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
@@ -387,7 +390,9 @@
// Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
// actually routed to at that address though
- Binding storeBinding = new LocalQueueBinding(queue.getName(), queue, new SimpleString(nodeUUID.toString()));
+ Binding storeBinding = new LocalQueueBinding(queue.getName(),
+ queue,
+ new SimpleString(nodeUUID.toString()));
storageManager.addQueueBinding(storeBinding);
}
@@ -411,7 +416,9 @@
managementService.getManagementAddress(),
managementService.getManagementNotificationAddress(),
managementService.getClusterPassword(),
- record);
+ record,
+ replicatingChannel,
+ !backup);
record.setBridge(bridge);
@@ -513,6 +520,7 @@
clearBindings();
}
+
public void onMessage(final ClientMessage message)
{
try
@@ -538,177 +546,339 @@
NotificationType ntype = NotificationType.valueOf(type.toString());
- Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
-
- if (distance == null)
- {
- throw new IllegalStateException("distance is null");
- }
-
switch (ntype.toInt())
{
case NotificationType.BINDING_ADDED_INDEX:
{
+ doBindingAdded(message, replicatingChannel);
- SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
+ break;
+ }
+ case NotificationType.BINDING_REMOVED_INDEX:
+ {
+ doBindingRemoved(message, replicatingChannel);
- if (queueAddress == null)
- {
- throw new IllegalStateException("queueAddress is null");
- }
+ break;
+ }
+ case NotificationType.CONSUMER_CREATED_INDEX:
+ {
+ doConsumerCreated(message, replicatingChannel);
- SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+ break;
+ }
+ case NotificationType.CONSUMER_CLOSED_INDEX:
+ {
+ doConsumerClosed(message, replicatingChannel);
- if (clusterName == null)
- {
- throw new IllegalStateException("clusterName is null");
- }
+ break;
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Invalid type " + ntype.toInt());
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle message", e);
+ }
+ }
- SimpleString routingName = (SimpleString)message.getProperty(ManagementHelper.HDR_ROUTING_NAME);
+ private void clearBindings() throws Exception
+ {
+ for (RemoteQueueBinding binding : bindings.values())
+ {
+ postOffice.removeBinding(binding.getUniqueName());
+ }
- if (routingName == null)
- {
- throw new IllegalStateException("routingName is null");
- }
+ bindings.clear();
+ }
+
+ private void doBindingAdded(final ClientMessage message, final Channel replChannel) throws Exception
+ {
+ Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
- SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+ if (distance == null)
+ {
+ throw new IllegalStateException("distance is null");
+ }
- Integer queueID = (Integer)message.getProperty(ManagementHelper.HDR_BINDING_ID);
+ SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
- if (queueID == null)
- {
- throw new IllegalStateException("queueID is null");
- }
+ if (queueAddress == null)
+ {
+ throw new IllegalStateException("queueAddress is null");
+ }
- RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
- clusterName,
- routingName,
- queueID,
- filterString,
- queue,
- useDuplicateDetection,
- bridge.getName(),
- distance + 1);
+ SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- bindings.put(clusterName, binding);
+ if (clusterName == null)
+ {
+ throw new IllegalStateException("clusterName is null");
+ }
- if (postOffice.getBinding(clusterName) != null)
- {
- // Sanity check - this means the binding has already been added via another bridge, probably max
- // hops is too high
- // or there are multiple cluster connections for the same address
+ SimpleString routingName = (SimpleString)message.getProperty(ManagementHelper.HDR_ROUTING_NAME);
- log.warn("Remoting queue binding " + clusterName +
- " has already been bound in the post office. Most likely cause for this is you have a loop " +
- "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
+ if (routingName == null)
+ {
+ throw new IllegalStateException("routingName is null");
+ }
- return;
- }
+ SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
- postOffice.addBinding(binding);
+ Integer queueID = (Integer)message.getProperty(ManagementHelper.HDR_BINDING_ID);
- Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
-
- theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
-
- break;
- }
- case NotificationType.BINDING_REMOVED_INDEX:
+ if (queueID == null)
+ {
+ throw new IllegalStateException("queueID is null");
+ }
+
+ if (replChannel != null)
+ {
+ Packet packet = new ReplicateRemoteBindingAddedMessage(name, queueAddress, clusterName, routingName, queueID, filterString, queue.getName(), distance + 1);
+
+ replChannel.replicatePacket(packet, 2, new Runnable()
+ {
+ public void run()
{
- SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-
- if (clusterName == null)
+ try
{
- throw new IllegalStateException("clusterName is null");
+ doBindingAdded(message, null);
}
-
- RemoteQueueBinding binding = bindings.remove(clusterName);
-
- if (binding == null)
+ catch (Exception e)
{
- throw new IllegalStateException("Cannot find binding for queue " + clusterName);
+ log.error("Failed to add remote queue binding", e);
}
+ }
+ });
+ }
+ else
+ {
+ RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
+ clusterName,
+ routingName,
+ queueID,
+ filterString,
+ queue,
+ useDuplicateDetection,
+ bridge.getName(),
+ distance + 1);
+
+ bindings.put(clusterName, binding);
+
+ if (postOffice.getBinding(clusterName) != null)
+ {
+ // Sanity check - this means the binding has already been added via another bridge, probably max
+ // hops is too high
+ // or there are multiple cluster connections for the same address
+
+ log.warn("Remoting queue binding " + clusterName +
+ " has already been bound in the post office. Most likely cause for this is you have a loop " +
+ "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
+
+ return;
+ }
+
+ postOffice.addBinding(binding);
+
+ Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
+
+ theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
+ }
+ }
- postOffice.removeBinding(binding.getUniqueName());
+ private void doBindingRemoved(final ClientMessage message, final Channel replChannel) throws Exception
+ {
+ SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- break;
- }
- case NotificationType.CONSUMER_CREATED_INDEX:
+ if (clusterName == null)
+ {
+ throw new IllegalStateException("clusterName is null");
+ }
+
+ if (replChannel != null)
+ {
+ Packet packet = new ReplicateRemoteBindingRemovedMessage(clusterName);
+
+ replChannel.replicatePacket(packet, 2, new Runnable()
+ {
+ public void run()
{
- SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-
- if (clusterName == null)
+ try
{
- throw new IllegalStateException("clusterName is null");
+ doBindingRemoved(message, null);
}
-
- SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
-
- RemoteQueueBinding binding = bindings.get(clusterName);
-
- if (binding == null)
+ catch (Exception e)
{
- throw new IllegalStateException("Cannot find binding for " + clusterName);
+ log.error("Failed to remove remote queue binding", e);
}
+ }
+ });
+ }
+ else
+ {
+ RemoteQueueBinding binding = bindings.remove(clusterName);
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding for queue " + clusterName);
+ }
+
+ postOffice.removeBinding(binding.getUniqueName());
+ }
+ }
- binding.addConsumer(filterString);
+ private void doConsumerCreated(final ClientMessage message, final Channel replChannel) throws Exception
+ {
+ Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
- message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+ if (distance == null)
+ {
+ throw new IllegalStateException("distance is null");
+ }
- // Need to propagate the consumer add
- Notification notification = new Notification(ntype, message.getProperties());
+ SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- managementService.sendNotification(notification);
+ if (clusterName == null)
+ {
+ throw new IllegalStateException("clusterName is null");
+ }
- break;
- }
- case NotificationType.CONSUMER_CLOSED_INDEX:
+ SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ if (replChannel != null)
+ {
+ Packet packet = new ReplicateRemoteConsumerAddedMessage(clusterName, filterString);
+
+ replChannel.replicatePacket(packet, 2, new Runnable()
+ {
+ public void run()
{
- SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-
- if (clusterName == null)
+ try
{
- throw new IllegalStateException("clusterName is null");
+ doConsumerCreated(message, null);
}
-
- SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
-
- RemoteQueueBinding binding = bindings.get(clusterName);
-
- if (binding == null)
+ catch (Exception e)
{
- throw new IllegalStateException("Cannot find binding for " + clusterName);
+ log.error("Failed to add remote consumer", e);
}
+ }
+ });
+ }
+ else
+ {
+ RemoteQueueBinding binding = bindings.get(clusterName);
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding for " + clusterName);
+ }
+
+ binding.addConsumer(filterString);
+
+ message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+
+ // Need to propagate the consumer add
+ Notification notification = new Notification(CONSUMER_CREATED, message.getProperties());
+
+ managementService.sendNotification(notification);
+ }
+ }
- binding.removeConsumer(filterString);
+ private void doConsumerClosed(final ClientMessage message, final Channel replChannel) throws Exception
+ {
+ Integer distance = (Integer)message.getProperty(ManagementHelper.HDR_DISTANCE);
- message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+ if (distance == null)
+ {
+ throw new IllegalStateException("distance is null");
+ }
- // Need to propagate the consumer close
- Notification notification = new Notification(ntype, message.getProperties());
+ SimpleString clusterName = (SimpleString)message.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- managementService.sendNotification(notification);
+ if (clusterName == null)
+ {
+ throw new IllegalStateException("clusterName is null");
+ }
- break;
+ SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ if (replChannel != null)
+ {
+ Packet packet = new ReplicateRemoteConsumerRemovedMessage(clusterName, filterString);
+
+ replChannel.replicatePacket(packet, 2, new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ doConsumerClosed(message, null);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to remove remote consumer", e);
+ }
}
+ });
+ }
+ else
+ {
+ RemoteQueueBinding binding = bindings.get(clusterName);
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding for " + clusterName);
}
+
+ binding.removeConsumer(filterString);
+
+ message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+
+ // Need to propagate the consumer close
+ Notification notification = new Notification(CONSUMER_CLOSED, message.getProperties());
+
+ managementService.sendNotification(notification);
}
- catch (Exception e)
- {
- log.error("Failed to handle message", e);
- }
}
- private void clearBindings() throws Exception
+
+ }
+
+
+
+ public void handleReplicatedAddBinding(final SimpleString address,
+ final SimpleString uniqueName,
+ final SimpleString routingName,
+ final int queueID,
+ final SimpleString filterString,
+ final SimpleString queueName,
+ final int distance) throws Exception
+ {
+ Binding queueBinding = postOffice.getBinding(queueName);
+
+ if (queueBinding == null)
{
- for (RemoteQueueBinding binding : bindings.values())
- {
- postOffice.removeBinding(binding.getUniqueName());
- }
-
- bindings.clear();
+ throw new IllegalStateException("Cannot find s & f queue " + queueName);
}
+ Queue queue = (Queue)queueBinding.getBindable();
+
+ RemoteQueueBinding binding = new RemoteQueueBindingImpl(address,
+ uniqueName,
+ routingName,
+ queueID,
+ filterString,
+ queue,
+ useDuplicateDetection,
+ queueName,
+ distance);
+
+ postOffice.addBinding(binding);
+
+ Bindings theBindings = postOffice.getBindingsForAddress(address);
+
+ theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -47,6 +47,7 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.cluster.Bridge;
@@ -94,8 +95,12 @@
private final QueueFactory queueFactory;
private final UUID nodeUUID;
+
+ private Channel replicatingChannel;
private volatile boolean started;
+
+ private boolean backup;
public ClusterManagerImpl(final org.jboss.messaging.utils.ExecutorFactory executorFactory,
final StorageManager storageManager,
@@ -104,7 +109,9 @@
final ManagementService managementService,
final Configuration configuration,
final QueueFactory queueFactory,
- final UUID nodeUUID)
+ final UUID nodeUUID,
+ final Channel replicatingChannel,
+ final boolean backup)
{
this.executorFactory = executorFactory;
@@ -121,6 +128,10 @@
this.queueFactory = queueFactory;
this.nodeUUID = nodeUUID;
+
+ this.replicatingChannel = replicatingChannel;
+
+ this.backup = backup;
}
public synchronized void start() throws Exception
@@ -212,6 +223,28 @@
{
return clusters.get(name.toString());
}
+
+ public synchronized void activate()
+ {
+ for (BroadcastGroup bg: broadcastGroups.values())
+ {
+ bg.activate();
+ }
+
+ for (Bridge bridge: bridges.values())
+ {
+ bridge.activate();
+ }
+
+ for (ClusterConnection cc: clusters.values())
+ {
+ cc.activate();
+ }
+
+ replicatingChannel = null;
+
+ backup = false;
+ }
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
{
@@ -229,7 +262,8 @@
config.getName(),
config.getLocalBindPort(),
groupAddress,
- config.getGroupPort());
+ config.getGroupPort(),
+ !backup);
for (Pair<String, String> connectorInfo : config.getConnectorInfos())
{
@@ -383,6 +417,8 @@
Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
backupConnector);
+ log.info("deploying bridge, backup is " + backup);
+
bridge = new BridgeImpl(nodeUUID,
new SimpleString(config.getName()),
queue,
@@ -399,7 +435,9 @@
config.isUseDuplicateDetection(),
managementService.getManagementAddress(),
managementService.getManagementNotificationAddress(),
- managementService.getClusterPassword());
+ managementService.getClusterPassword(),
+ replicatingChannel,
+ !backup);
bridges.put(config.getName(), bridge);
@@ -481,8 +519,8 @@
connectors,
config.getMaxHops(),
nodeUUID,
- null,
- false);
+ replicatingChannel,
+ backup);
}
else
{
@@ -511,8 +549,8 @@
dg,
config.getMaxHops(),
nodeUUID,
- null,
- false);
+ replicatingChannel,
+ backup);
}
managementService.registerCluster(clusterConnection, config);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -380,7 +380,9 @@
managementService,
configuration,
queueFactory,
- uuid);
+ uuid,
+ getReplicatingChannel(),
+ configuration.isBackup());
clusterManager.start();
}
@@ -442,7 +444,6 @@
postOffice = null;
securityRepository = null;
securityStore = null;
- addressSettingsRepository.clear();
scheduledExecutor.shutdown();
queueFactory = null;
resourceManager = null;
@@ -581,6 +582,11 @@
configuration.setBackup(false);
remotingService.setBackup(false);
+
+ if (clusterManager != null)
+ {
+ clusterManager.activate();
+ }
}
connection.activate();
@@ -645,34 +651,34 @@
}
}
- public CreateSessionResponseMessage replicateCreateSession(final String name,
- final long replicatedChannelID,
- final long originalChannelID,
- final String username,
- final String password,
- final int minLargeMessageSize,
- final int incrementingVersion,
- final RemotingConnection connection,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final boolean xa,
- final int sendWindowSize) throws Exception
+ public void replicateCreateSession(final String name,
+ final long replicatedChannelID,
+ final long originalChannelID,
+ final String username,
+ final String password,
+ final int minLargeMessageSize,
+ final int incrementingVersion,
+ final RemotingConnection connection,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preAcknowledge,
+ final boolean xa,
+ final int sendWindowSize) throws Exception
{
- return doCreateSession(name,
- replicatedChannelID,
- originalChannelID,
- username,
- password,
- minLargeMessageSize,
- incrementingVersion,
- connection,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- xa,
- sendWindowSize,
- true);
+ doCreateSession(name,
+ replicatedChannelID,
+ originalChannelID,
+ username,
+ password,
+ minLargeMessageSize,
+ incrementingVersion,
+ connection,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ xa,
+ sendWindowSize,
+ true);
}
public CreateSessionResponseMessage createSession(final String name,
@@ -717,19 +723,6 @@
return sessions.get(name);
}
- public void updateClusterConnectionConnectors(final SimpleString clusterConnectionName,
- final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
- {
- ClusterConnection cc = clusterManager.getClusterConnection(clusterConnectionName);
-
- if (cc == null)
- {
- throw new IllegalStateException("Cannot find cluster connection with name " + clusterConnectionName);
- }
-
- cc.handleReplicatedUpdateConnectors(connectors);
- }
-
public List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
@@ -1099,7 +1092,14 @@
if (conn != null)
{
- conn.fail(me);
+ // Execute on different thread to avoid deadlocks
+ new Thread()
+ {
+ public void run()
+ {
+ conn.fail(me);
+ }
+ }.start();
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -13,22 +13,33 @@
package org.jboss.messaging.core.server.impl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_CREATESESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_UPDATE_CONNECTORS;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateClusterConnectionUpdate;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
+import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
/**
* A packet handler for all packets that need to be handled at the server level
@@ -90,22 +101,63 @@
break;
}
- case REPLICATE_UPDATE_CONNECTORS:
+ case CREATE_QUEUE:
{
- ReplicateClusterConnectionUpdate request = (ReplicateClusterConnectionUpdate)packet;
-
- handleClusterConnectionUpdate(request);
-
+ // Create queue can also be fielded here in the case of a replicated store and forward queue creation
+
+ CreateQueueMessage request = (CreateQueueMessage)packet;
+
+ handleCreateQueue(request);
+
break;
-
}
+ case PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING:
+ {
+ ReplicateRemoteBindingAddedMessage request = (ReplicateRemoteBindingAddedMessage)packet;
+
+ handleAddRemoteQueueBinding(request);
+
+ break;
+ }
+ case PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING:
+ {
+ ReplicateRemoteBindingRemovedMessage request = (ReplicateRemoteBindingRemovedMessage)packet;
+
+ handleRemoveRemoteQueueBinding(request);
+
+ break;
+ }
+ case PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER:
+ {
+ ReplicateRemoteConsumerAddedMessage request = (ReplicateRemoteConsumerAddedMessage)packet;
+
+ handleAddRemoteConsumer(request);
+
+ break;
+ }
+ case PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER:
+ {
+ ReplicateRemoteConsumerRemovedMessage request = (ReplicateRemoteConsumerRemovedMessage)packet;
+
+ handleRemoveRemoteConsumer(request);
+
+ break;
+ }
+ case PacketImpl.REPLICATE_ACKNOWLEDGE:
+ {
+ ReplicateAcknowledgeMessage request = (ReplicateAcknowledgeMessage)packet;
+
+ handleReplicateAcknowledge(request);
+
+ break;
+ }
default:
{
log.error("Invalid packet " + packet);
}
}
}
-
+
private void doHandleCreateSession(final CreateSessionMessage request, final long oppositeChannelID)
{
Packet response;
@@ -138,7 +190,7 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
}
}
-
+
channel1.send(response);
}
@@ -179,41 +231,28 @@
private void handleReplicateCreateSession(final ReplicateCreateSessionMessage request)
{
- Packet response;
-
try
{
- response = server.replicateCreateSession(request.getName(),
- request.getReplicatedSessionChannelID(),
- request.getOriginalSessionChannelID(),
- request.getUsername(),
- request.getPassword(),
- request.getMinLargeMessageSize(),
- request.getVersion(),
- connection,
- request.isAutoCommitSends(),
- request.isAutoCommitAcks(),
- request.isPreAcknowledge(),
- request.isXA(),
- request.getWindowSize());
+ server.replicateCreateSession(request.getName(),
+ request.getReplicatedSessionChannelID(),
+ request.getOriginalSessionChannelID(),
+ request.getUsername(),
+ request.getPassword(),
+ request.getMinLargeMessageSize(),
+ request.getVersion(),
+ connection,
+ request.isAutoCommitSends(),
+ request.isAutoCommitAcks(),
+ request.isPreAcknowledge(),
+ request.isXA(),
+ request.getWindowSize());
}
catch (Exception e)
{
log.error("Failed to handle replicate create session", e);
-
- if (e instanceof MessagingException)
- {
- response = new MessagingExceptionMessage((MessagingException)e);
- }
- else
- {
- response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
- }
}
-
- channel1.send(response);
}
-
+
private void handleReattachSession(final ReattachSessionMessage request)
{
Packet response;
@@ -235,15 +274,15 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
}
}
-
+
channel1.send(response);
}
-
- private void handleClusterConnectionUpdate(final ReplicateClusterConnectionUpdate request)
+
+ private void handleCreateQueue(final CreateQueueMessage request)
{
try
{
- server.updateClusterConnectionConnectors(request.getClusterConnectionName(), request.getConnectors());
+ server.getServerManagement().createQueue(request.getAddress().toString(), request.getQueueName().toString());
}
catch (Exception e)
{
@@ -251,4 +290,108 @@
}
}
+ private void handleAddRemoteQueueBinding(final ReplicateRemoteBindingAddedMessage request)
+ {
+ ClusterConnection cc = server.getClusterManager().getClusterConnection(request.getClusterConnectionName());
+
+ if (cc == null)
+ {
+ throw new IllegalStateException("No cluster connection found with name " + request.getClusterConnectionName());
+ }
+
+ try
+ {
+ cc.handleReplicatedAddBinding(request.getAddress(),
+ request.getUniqueName(),
+ request.getRoutingName(),
+ request.getRemoteQueueID(),
+ request.getFilterString(),
+ request.getSfQueueName(),
+ request.getDistance());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle add remote queue binding", e);
+ }
+ }
+
+ private void handleRemoveRemoteQueueBinding(final ReplicateRemoteBindingRemovedMessage request)
+ {
+ try
+ {
+ Binding binding = server.getPostOffice().removeBinding(request.getUniqueName());
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueName());
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle remove remote queue binding", e);
+ }
+ }
+
+ private void handleAddRemoteConsumer(final ReplicateRemoteConsumerAddedMessage request)
+ {
+ RemoteQueueBinding binding = (RemoteQueueBinding)server.getPostOffice()
+ .getBinding(request.getUniqueBindingName());
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueBindingName());
+ }
+
+ try
+ {
+ binding.addConsumer(request.getFilterString());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle add remote consumer", e);
+ }
+ }
+
+ private void handleRemoveRemoteConsumer(final ReplicateRemoteConsumerRemovedMessage request)
+ {
+ RemoteQueueBinding binding = (RemoteQueueBinding)server.getPostOffice()
+ .getBinding(request.getUniqueBindingName());
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding to remove " + request.getUniqueBindingName());
+ }
+
+ try
+ {
+ binding.removeConsumer(request.getFilterString());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle remove remote consumer", e);
+ }
+ }
+
+ private void handleReplicateAcknowledge(final ReplicateAcknowledgeMessage request)
+ {
+ Binding binding = server.getPostOffice().getBinding(request.getUniqueName());
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding " + request.getUniqueName());
+ }
+
+ try
+ {
+ Queue queue = (Queue)binding.getBindable();
+
+ MessageReference ref = queue.removeFirstReference(request.getMessageID());
+
+ queue.acknowledge(ref);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle remove remote consumer", e);
+ }
+ }
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -43,7 +43,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
@@ -371,7 +371,7 @@
}
}
- public void handleCreateQueue(final SessionCreateQueueMessage packet)
+ public void handleCreateQueue(final CreateQueueMessage packet)
{
if (replicatingChannel == null)
{
@@ -1179,7 +1179,7 @@
boolean browseOnly = packet.isBrowseOnly();
Packet response = null;
-
+
try
{
Binding binding = postOffice.getBinding(name);
@@ -1284,7 +1284,7 @@
}
- private void doHandleCreateQueue(final SessionCreateQueueMessage packet)
+ private void doHandleCreateQueue(final CreateQueueMessage packet)
{
SimpleString address = packet.getAddress();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -18,8 +18,8 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -55,7 +55,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
@@ -120,13 +120,13 @@
session.handleCreateConsumer(request);
break;
}
- case SESS_CREATE_QUEUE:
+ case CREATE_QUEUE:
{
- SessionCreateQueueMessage request = (SessionCreateQueueMessage)packet;
+ CreateQueueMessage request = (CreateQueueMessage)packet;
session.handleCreateQueue(request);
break;
}
- case SESS_DELETE_QUEUE:
+ case DELETE_QUEUE:
{
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
session.handleDeleteQueue(request);
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -166,8 +166,8 @@
configuration);
this.useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME,
- TransportConstants.DEFAULT_USE_INVM,
- configuration);
+ TransportConstants.DEFAULT_USE_INVM,
+ configuration);
this.host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
TransportConstants.DEFAULT_HOST,
configuration);
@@ -218,7 +218,7 @@
bossExecutor = Executors.newCachedThreadPool(new org.jboss.messaging.utils.JBMThreadFactory("jbm-netty-acceptor-boss-threads"));
workerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-worker-threads"));
- if(useInvm)
+ if (useInvm)
{
channelFactory = new DefaultLocalServerChannelFactory();
}
@@ -296,7 +296,7 @@
for (String h : hosts)
{
SocketAddress address;
- if(useInvm)
+ if (useInvm)
{
address = new LocalAddress(h);
System.out.println("address = " + address);
@@ -312,7 +312,7 @@
public synchronized void stop()
{
-
+
if (channelFactory == null)
{
return;
@@ -389,7 +389,7 @@
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
-
+
listener.connectionCreated(connection);
}
@@ -397,12 +397,12 @@
{
if (connections.remove(connectionID) != null)
{
- //Execute on different thread to avoid deadlocks
+ // Execute on different thread to avoid deadlocks
new Thread()
{
public void run()
{
- listener.connectionDestroyed(connectionID);
+ listener.connectionDestroyed(connectionID);
}
}.start();
}
@@ -410,7 +410,15 @@
public void connectionException(final Object connectionID, final MessagingException me)
{
- listener.connectionException(connectionID, me);
+ // Execute on different thread to avoid deadlocks
+ new Thread()
+ {
+ public void run()
+ {
+ listener.connectionException(connectionID, me);
+ }
+ }.start();
+
}
}
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -582,7 +582,14 @@
public void connectionException(final Object connectionID, final MessagingException me)
{
- listener.connectionException(connectionID, me);
+ // Execute on different thread to avoid deadlocks
+ new Thread()
+ {
+ public void run()
+ {
+ listener.connectionException(connectionID, me);
+ }
+ }.start();
}
}
Modified: trunk/src/main/org/jboss/messaging/utils/TypedProperties.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/TypedProperties.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/src/main/org/jboss/messaging/utils/TypedProperties.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -336,7 +336,7 @@
properties.clear();
}
}
-
+
// Private ------------------------------------------------------------------------------------
private void checkCreateProperties()
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -157,7 +157,7 @@
}
}
- //log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
+ // log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
if (bindingCount == count && totConsumers == consumerCount)
{
@@ -165,7 +165,7 @@
return;
}
- Thread.sleep(10);
+ Thread.sleep(100);
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
@@ -396,7 +396,7 @@
if (message != null)
{
- log.info("Consumer " + consumerIDs[i] + " received message " + message.getProperty(COUNT_PROP));
+ //log.info("Consumer " + consumerIDs[i] + " received message " + message.getProperty(COUNT_PROP));
}
}
while (message != null);
@@ -515,13 +515,13 @@
ClientMessage message;
do
{
- message = holder.consumer.receive(200);
+ message = holder.consumer.receive(1000);
if (message != null)
{
int count = (Integer)message.getProperty(COUNT_PROP);
- //log.info("consumer " + consumerIDs[i] + " received message " + count);
+ // log.info("consumer " + consumerIDs[i] + " received message " + count);
assertFalse(counts.contains(count));
@@ -1057,7 +1057,7 @@
}
protected void stopServers(int... nodes) throws Exception
- {
+ {
for (int i = 0; i < nodes.length; i++)
{
if (services[nodes[i]].isStarted())
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -69,7 +69,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -114,7 +114,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -181,7 +181,7 @@
String nodeID = randomString();
- BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -263,7 +263,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -299,7 +299,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -343,13 +343,13 @@
final int timeout = 500;
- BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress1, groupPort1);
+ BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress1, groupPort1, true);
bg1.start();
- BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress2, groupPort2);
+ BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress2, groupPort2, true);
bg2.start();
- BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress3, groupPort3);
+ BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress3, groupPort3, true);
bg3.start();
TransportConfiguration live1 = generateTC();
@@ -428,7 +428,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -471,7 +471,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
bg.start();
@@ -583,13 +583,13 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+ BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
bg1.start();
- BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+ BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
bg2.start();
- BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+ BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
bg3.start();
TransportConfiguration live1 = generateTC();
@@ -823,7 +823,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), -1, groupAddress, groupPort, true);
bg.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -61,7 +61,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
@@ -107,7 +107,7 @@
boolean durable = randomBoolean();
boolean temporary = randomBoolean();
- SessionCreateQueueMessage request = new SessionCreateQueueMessage(address, queueName, null, durable, false);
+ CreateQueueMessage request = new CreateQueueMessage(address, queueName, null, durable, false);
// SimpleString version
expect(channel.sendBlocking(request)).andReturn(new NullResponseMessage());
@@ -128,7 +128,7 @@
verifyMocks();
// with temporary
- request = new SessionCreateQueueMessage(address, queueName, null, durable, temporary);
+ request = new CreateQueueMessage(address, queueName, null, durable, temporary);
resetMocks();
@@ -142,7 +142,7 @@
// full methods
resetMocks();
- request = new SessionCreateQueueMessage(address, queueName, filterString, durable, temporary);
+ request = new CreateQueueMessage(address, queueName, filterString, durable, temporary);
expect(channel.sendBlocking(request)).andReturn(new NullResponseMessage());
replayMocks();
@@ -154,7 +154,7 @@
// full methods with String
resetMocks();
- request = new SessionCreateQueueMessage(address, queueName, filterString, durable, temporary);
+ request = new CreateQueueMessage(address, queueName, filterString, durable, temporary);
expect(channel.sendBlocking(request)).andReturn(new NullResponseMessage());
replayMocks();
@@ -596,7 +596,7 @@
// CommandManager cm = EasyMock.createStrictMock(CommandManager.class);
// ConnectionRegistry reg = EasyMock.createStrictMock(ConnectionRegistry.class);
//
- // SessionCreateQueueMessage request = new SessionCreateQueueMessage(new SimpleString("blah"), new
+ // CreateQueueMessage request = new CreateQueueMessage(new SimpleString("blah"), new
// SimpleString("hagshg"),
// new SimpleString("jhjhs"), false, false);
//
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionPacketHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionPacketHandlerTest.java 2009-03-09 14:45:38 UTC (rev 6048)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerSessionPacketHandlerTest.java 2009-03-09 14:53:41 UTC (rev 6049)
@@ -76,7 +76,7 @@
//
// public void testCreateQueue() throws Exception
// {
-// SessionCreateQueueMessage request = new SessionCreateQueueMessage(queueName, queueName, filterString, true, true);
+// CreateQueueMessage request = new CreateQueueMessage(queueName, queueName, filterString, true, true);
// session.createQueue(queueName, queueName, filterString, true, true);
// long responseTargetID = 1212;
// request.setResponseTargetID(responseTargetID);
More information about the jboss-cvs-commits
mailing list