[jboss-cvs] JBoss Messaging SVN: r5686 - in trunk: src/main/org/jboss/messaging/core/persistence and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 22 06:16:47 EST 2009
Author: timfox
Date: 2009-01-22 06:16:47 -0500 (Thu, 22 Jan 2009)
New Revision: 5686
Added:
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
Removed:
trunk/src/main/org/jboss/messaging/core/postoffice/DivertBinding.java
trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBindingImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/LinkBindingImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/QueueBindingImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
trunk/src/main/org/jboss/messaging/core/server/Bindable.java
trunk/src/main/org/jboss/messaging/core/server/Divert.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/schemas/jbm-configuration.xsd
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
trunk/tests/src/org/jboss/messaging/tests/stress/failover/LargeMessageMultiThreadFailoverStressTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/SimpleAddressManagerTest.java
Log:
more clustering, tweaks, fixes etc
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -87,7 +87,7 @@
for (int i = 0; i < bindings.getBindings().size(); i++)
{
Binding binding = bindings.getBindings().get(i);
- queueNames[i] = binding.getBindable().getUniqueName().toString();
+ queueNames[i] = binding.getUniqueName().toString();
}
return queueNames;
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -54,10 +54,10 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.postoffice.impl.QueueBindingImpl;
+import org.jboss.messaging.core.postoffice.impl.BindingImpl;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.server.Bindable;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
@@ -297,7 +297,7 @@
if (postOffice.getBinding(sName) == null)
{
Queue queue = queueFactory.createQueue(-1, sName, null, true, false);
- QueueBinding binding = new QueueBindingImpl(sAddress, queue);
+ Binding binding = new BindingImpl(sAddress, sName, sName, queue, false, true);
storageManager.addQueueBinding(binding);
postOffice.addBinding(binding);
}
@@ -316,7 +316,7 @@
if (postOffice.getBinding(sName) == null)
{
Queue queue = queueFactory.createQueue(-1, sName, filter, durable, false);
- QueueBinding binding = new QueueBindingImpl(sAddress, queue);
+ Binding binding = new BindingImpl(sAddress, sName, sName, queue, false, true);
if (durable)
{
storageManager.addQueueBinding(binding);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -28,8 +28,8 @@
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.paging.PageTransactionInfo;
+import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
import org.jboss.messaging.core.server.LargeServerMessage;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingComponent;
@@ -110,7 +110,7 @@
// Bindings related operations
- void addQueueBinding(QueueBinding binding) throws Exception;
+ void addQueueBinding(Binding binding) throws Exception;
void deleteQueueBinding(long queueBindingID) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -60,8 +60,8 @@
import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.QueueBindingInfo;
import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.XidCodecSupport;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -863,20 +863,20 @@
// Bindings operations
- public void addQueueBinding(final QueueBinding binding) throws Exception
+ public void addQueueBinding(final Binding binding) throws Exception
{
- Filter filter = binding.getQueue().getFilter();
+ Queue queue = (Queue)binding.getBindable();
+ Filter filter = queue.getFilter();
SimpleString filterString = filter == null ? null : filter.getFilterString();
- PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(binding.getBindable()
- .getUniqueName(),
+ PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(),
binding.getAddress(),
filterString);
long id = this.generateUniqueID();
- binding.getQueue().setPersistenceID(id);
+ queue.setPersistenceID(id);
bindingsJournal.appendAddRecord(id, QUEUE_BINDING_RECORD, bindingEncoding);
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -30,8 +30,8 @@
import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.persistence.QueueBindingInfo;
import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
import org.jboss.messaging.core.server.LargeServerMessage;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
@@ -58,7 +58,7 @@
private volatile boolean started;
- public void addQueueBinding(final QueueBinding queueBinding) throws Exception
+ public void addQueueBinding(final Binding queueBinding) throws Exception
{
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -18,14 +18,14 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.postoffice;
import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.util.SimpleString;
-
/**
*
* A Binding
@@ -36,8 +36,16 @@
public interface Binding
{
SimpleString getAddress();
-
- Bindable getBindable();
-
+
+ Bindable getBindable();
+
boolean isQueueBinding();
+
+ SimpleString getUniqueName();
+
+ SimpleString getRoutingName();
+
+ boolean accept(ServerMessage message) throws Exception;
+
+ boolean isExclusive();
}
Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/DivertBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/DivertBinding.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/DivertBinding.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.postoffice;
-
-import org.jboss.messaging.core.server.Divert;
-
-/**
- * A DivertBinding
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 9 Jan 2009 18:28:42
- *
- *
- */
-public interface DivertBinding extends Binding
-{
- public Divert getDivert();
-}
Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.postoffice;
-
-import org.jboss.messaging.core.server.Queue;
-
-/**
- * A QueueBinding
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 9 Jan 2009 17:18:44
- *
- *
- */
-public interface QueueBinding extends Binding
-{
- public Queue getQueue();
-}
Added: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,109 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.postoffice.impl;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A BindingImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 21 Jan 2009 18:52:15
+ *
+ *
+ */
+public class BindingImpl implements Binding
+{
+ private final SimpleString address;
+
+ private final SimpleString uniqueName;
+
+ private final SimpleString routingName;
+
+ protected final Bindable bindable;
+
+ private final boolean exclusive;
+
+ private final boolean isQueue;
+
+ public BindingImpl(final SimpleString address,
+ final SimpleString uniqueName,
+ final SimpleString routingName,
+ final Bindable bindable,
+ final boolean exclusive,
+ final boolean isQueue)
+ {
+ this.address = address;
+
+ this.uniqueName = uniqueName;
+
+ this.routingName = routingName;
+
+ this.bindable = bindable;
+
+ this.exclusive = exclusive;
+
+ this.isQueue = isQueue;
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public Bindable getBindable()
+ {
+ return bindable;
+ }
+
+ public boolean isQueueBinding()
+ {
+ return isQueue;
+ }
+
+ public boolean accept(final ServerMessage message) throws Exception
+ {
+ return bindable.accept(message);
+ }
+
+ public SimpleString getRoutingName()
+ {
+ return routingName;
+ }
+
+ public SimpleString getUniqueName()
+ {
+ return uniqueName;
+ }
+
+ public boolean isExclusive()
+ {
+ return exclusive;
+ }
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -67,13 +67,13 @@
public void addBinding(final Binding binding)
{
- if (binding.getBindable().isExclusive())
+ if (binding.isExclusive())
{
exclusiveBindings.add(binding);
}
else
{
- SimpleString routingName = binding.getBindable().getRoutingName();
+ SimpleString routingName = binding.getRoutingName();
List<Binding> bindings = routingNameBindingMap.get(routingName);
@@ -97,13 +97,13 @@
public void removeBinding(final Binding binding)
{
- if (binding.getBindable().isExclusive())
+ if (binding.isExclusive())
{
exclusiveBindings.remove(binding);
}
else
{
- SimpleString routingName = binding.getBindable().getRoutingName();
+ SimpleString routingName = binding.getRoutingName();
List<Binding> bindings = routingNameBindingMap.get(routingName);
@@ -189,11 +189,11 @@
pos = 0;
}
- if (binding.getBindable().accept(message))
+ if (binding.accept(message))
{
chosen.add(binding.getBindable());
- SimpleString headerName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(binding.getBindable().getRoutingName());
+ SimpleString headerName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(binding.getRoutingName());
message.putBooleanProperty(headerName, Boolean.valueOf(true));
Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBindingImpl.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBindingImpl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,72 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import org.jboss.messaging.core.postoffice.DivertBinding;
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.Divert;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A DivertBindingImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 9 Jan 2009 15:45:09
- *
- *
- */
-public class DivertBindingImpl implements DivertBinding
-{
- private SimpleString address;
-
- private Divert divert;
-
- public DivertBindingImpl(final SimpleString address, final Divert divert)
- {
- this.address = address;
-
- this.divert = divert;
- }
-
- public SimpleString getAddress()
- {
- return address;
- }
-
- public Bindable getBindable()
- {
- return divert;
- }
-
- public Divert getDivert()
- {
- return divert;
- }
-
- public boolean isQueueBinding()
- {
- return false;
- }
-}
Added: trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.postoffice.impl;
+
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.cluster.impl.FlowBindingFilter;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A FlowBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 21 Jan 2009 18:55:22
+ *
+ *
+ */
+public class FlowBinding extends BindingImpl
+{
+ private final FlowBindingFilter filter;
+
+ public FlowBinding(final SimpleString address,
+ final SimpleString uniqueName,
+ final SimpleString routingName,
+ final Bindable bindable,
+ final FlowBindingFilter filter)
+ {
+ super(address, uniqueName, routingName, bindable, false, false);
+
+ this.filter = filter;
+ }
+
+ public boolean accept(final ServerMessage message) throws Exception
+ {
+ if (filter.match(message))
+ {
+ return bindable.accept(message);
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public FlowBindingFilter getFilter()
+ {
+ return filter;
+ }
+}
Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LinkBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LinkBindingImpl.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LinkBindingImpl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,72 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import org.jboss.messaging.core.postoffice.DivertBinding;
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.Divert;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A LinkBindingImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 9 Jan 2009 18:29:24
- *
- *
- */
-public class LinkBindingImpl implements DivertBinding
-{
- private final SimpleString address;
-
- private final Divert link;
-
- public LinkBindingImpl(final SimpleString address, final Divert link)
- {
- this.address = address;
-
- this.link = link;
- }
-
- public SimpleString getAddress()
- {
- return address;
- }
-
- public Bindable getBindable()
- {
- return link;
- }
-
- public boolean isQueueBinding()
- {
- return false;
- }
-
- public Divert getDivert()
- {
- return link;
- }
-}
Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/QueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/QueueBindingImpl.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/QueueBindingImpl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,72 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A QueueBindingImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 9 Jan 2009 15:45:09
- *
- *
- */
-public class QueueBindingImpl implements QueueBinding
-{
- private SimpleString address;
-
- private Queue queue;
-
- public QueueBindingImpl(final SimpleString address, final Queue queue)
- {
- this.address = address;
-
- this.queue = queue;
- }
-
- public SimpleString getAddress()
- {
- return address;
- }
-
- public Bindable getBindable()
- {
- return queue;
- }
-
- public Queue getQueue()
- {
- return queue;
- }
-
- public boolean isQueueBinding()
- {
- return true;
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -50,7 +50,7 @@
public void addBinding(final Binding binding)
{
- if (nameMap.putIfAbsent(binding.getBindable().getUniqueName(), binding) != null)
+ if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null)
{
throw new IllegalStateException("Binding already exists " + binding);
}
@@ -157,7 +157,7 @@
for (Binding binding: bindings.getBindings())
{
- if (binding.getBindable().getUniqueName().equals(bindableName))
+ if (binding.getUniqueName().equals(bindableName))
{
theBinding = binding;
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -145,7 +145,7 @@
{
for (Binding b : bindings.getBindings())
{
- super.removeMapping(address, b.getBindable().getUniqueName());
+ super.removeMapping(address, b.getUniqueName());
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Bindable.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Bindable.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/Bindable.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -23,7 +23,6 @@
package org.jboss.messaging.core.server;
import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.util.SimpleString;
/**
* A Bindable
@@ -38,12 +37,6 @@
{
void route(ServerMessage message, Transaction tx) throws Exception;
- SimpleString getUniqueName();
-
- SimpleString getRoutingName();
-
boolean accept(ServerMessage message) throws Exception;
-
- boolean isExclusive();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Divert.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Divert.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/Divert.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -24,6 +24,7 @@
package org.jboss.messaging.core.server;
+
/**
* A Divert
*
@@ -35,5 +36,5 @@
*/
public interface Divert extends Bindable
{
- boolean isExclusive();
+ //boolean isExclusive();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -433,7 +433,8 @@
}
else
{
- dest = (SimpleString)message.getProperty(MessageImpl.HDR_ORIGINAL_DESTINATION);
+ //Preserve the original address
+ dest = message.getDestination();
}
producer.send(dest, message);
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,401 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.cluster.DiscoveryListener;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.impl.BindingImpl;
+import org.jboss.messaging.core.postoffice.impl.FlowBinding;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.cluster.Bridge;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A ClusterImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 21 Jan 2009 14:43:05
+ *
+ *
+ */
+public class ClusterImpl implements DiscoveryListener
+{
+ private static final Logger log = Logger.getLogger(ClusterImpl.class);
+
+ private final ExecutorFactory executorFactory;
+
+ private final StorageManager storageManager;
+
+ private final PostOffice postOffice;
+
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+ private final SimpleString name;
+
+ private final SimpleString address;
+
+ private final BridgeConfiguration bridgeConfig;
+
+ private final boolean useDuplicateDetection;
+
+ private final int maxHops;
+
+ private Map<Pair<TransportConfiguration, TransportConfiguration>, Bridge> bridges = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Bridge>();
+
+ private final DiscoveryGroup discoveryGroup;
+
+ private final ScheduledExecutorService scheduledExecutor;
+
+ private final QueueFactory queueFactory;
+
+ private volatile boolean started;
+
+ /*
+ * Constructor using static list of connectors
+ */
+ public ClusterImpl(final SimpleString name,
+ final SimpleString address,
+ final BridgeConfiguration bridgeConfig,
+ final boolean useDuplicateDetection,
+ final int maxHops,
+ final ExecutorFactory executorFactory,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final ScheduledExecutorService scheduledExecutor,
+ final QueueFactory queueFactory,
+ final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+ {
+ this.name = name;
+
+ this.address = address;
+
+ this.bridgeConfig = bridgeConfig;
+
+ this.useDuplicateDetection = useDuplicateDetection;
+
+ this.maxHops = maxHops;
+
+ this.executorFactory = executorFactory;
+
+ this.storageManager = storageManager;
+
+ this.postOffice = postOffice;
+
+ this.queueSettingsRepository = queueSettingsRepository;
+
+ this.discoveryGroup = null;
+
+ this.scheduledExecutor = scheduledExecutor;
+
+ this.queueFactory = queueFactory;
+
+ this.updateConnectors(connectors);
+ }
+
+ /*
+ * Constructor using discovery to get connectors
+ */
+ public ClusterImpl(final SimpleString name,
+ final SimpleString address,
+ final BridgeConfiguration bridgeConfig,
+ final boolean useDuplicateDetection,
+ final int maxHops,
+ final ExecutorFactory executorFactory,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final ScheduledExecutorService scheduledExecutor,
+ final QueueFactory queueFactory,
+ final DiscoveryGroup discoveryGroup) throws Exception
+ {
+ this.name = name;
+
+ this.address = address;
+
+ this.bridgeConfig = bridgeConfig;
+
+ this.executorFactory = executorFactory;
+
+ this.storageManager = storageManager;
+
+ this.postOffice = postOffice;
+
+ this.queueSettingsRepository = queueSettingsRepository;
+
+ this.scheduledExecutor = scheduledExecutor;
+
+ this.queueFactory = queueFactory;
+
+ this.discoveryGroup = discoveryGroup;
+
+ this.useDuplicateDetection = useDuplicateDetection;
+
+ this.maxHops = maxHops;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ if (discoveryGroup != null)
+ {
+ updateConnectors(discoveryGroup.getConnectors());
+
+ discoveryGroup.registerListener(this);
+ }
+
+ started = true;
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ if (discoveryGroup != null)
+ {
+ discoveryGroup.unregisterListener(this);
+ }
+
+ for (Bridge bridge : bridges.values())
+ {
+ bridge.stop();
+ }
+
+ started = false;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ // DiscoveryListener implementation ------------------------------------------------------------------
+
+ public void connectorsChanged()
+ {
+ try
+ {
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectors = discoveryGroup.getConnectors();
+
+ updateConnectors(connectors);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to update connectors", e);
+ }
+ }
+
+ private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+ {
+ Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
+
+ connectorSet.addAll(connectors);
+
+ Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Bridge>> iter = bridges.entrySet()
+ .iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Bridge> entry = iter.next();
+
+ if (!connectorSet.contains(entry.getKey()))
+ {
+ // Connector no longer there - we should remove and close it - we don't delete the queue though - it may have messages - this is up to the admininstrator to do this
+
+ entry.getValue().stop();
+
+ iter.remove();
+ }
+ }
+
+ for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
+ {
+ if (!bridges.containsKey(connectorPair))
+ {
+ SimpleString queueName = new SimpleString("cluster." + name +
+ "." +
+ generateConnectorString(connectorPair.a) +
+ "-" +
+ (connectorPair.b == null ? "null"
+ : generateConnectorString(connectorPair.b)));
+
+ Binding queueBinding = postOffice.getBinding(queueName);
+
+ Queue queue;
+
+ if (queueBinding != null)
+ {
+ queue = (Queue)queueBinding.getBindable();
+ }
+ else
+ {
+ queue = queueFactory.createQueue(-1, name, null, true, false);
+
+ // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never actually routed to at that address though
+
+ Binding storeBinding = new BindingImpl(queue.getName(), queue.getName(), queue.getName(), queue, false, true);
+
+ storageManager.addQueueBinding(storeBinding);
+ }
+
+ Bridge bridge = new BridgeImpl(queueName,
+ queue,
+ connectorPair,
+ executorFactory.getExecutor(),
+ bridgeConfig.getMaxBatchSize(),
+ bridgeConfig.getMaxBatchTime(),
+ new SimpleString(bridgeConfig.getFilterString()),
+ null,
+ storageManager,
+ scheduledExecutor,
+ null,
+ bridgeConfig.getRetryInterval(),
+ bridgeConfig.getRetryIntervalMultiplier(),
+ bridgeConfig.getMaxRetriesBeforeFailover(),
+ bridgeConfig.getMaxRetriesAfterFailover(),
+ false);
+
+ bridges.put(connectorPair, bridge);
+
+ bridge.start();
+ }
+ }
+ }
+
+ private void updateQueueInfo(final Pair<TransportConfiguration, TransportConfiguration> connectorPair, final QueueInfo info) throws Exception
+ {
+ Bridge bridge = this.bridges.get(connectorPair);
+
+ if (bridge == null)
+ {
+ throw new IllegalArgumentException("Cannot find bridge for " + connectorPair);
+ }
+
+ SimpleString uniqueName = null; // ?????
+
+ FlowBinding flowBinding = (FlowBinding)postOffice.getBinding(uniqueName);
+
+ if (flowBinding == null)
+ {
+ //TODO - can be optimised by storing the queue with the bridge in this class
+ Binding binding = postOffice.getBinding(bridge.getName());
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find queue with name " + bridge.getName());
+ }
+
+ Queue queue = (Queue)binding.getBindable();
+
+ FlowBindingFilter filter = new FlowBindingFilter(info);
+
+ flowBinding = new FlowBinding(new SimpleString(info.getAddress()), uniqueName, new SimpleString(info.getQueueName()), queue, filter);
+
+ postOffice.addBinding(flowBinding);
+ }
+ else
+ {
+ FlowBindingFilter filter = flowBinding.getFilter();
+
+ filter.updateInfo(info);
+ }
+ }
+
+ private void removeQueueInfo()
+ {
+ //TODO
+ }
+
+ private String replaceWildcardChars(final String str)
+ {
+ return str.replace('.', '-');
+ }
+
+ private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
+ {
+ StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
+
+ if (!config.getParams().isEmpty())
+ {
+ str.append("?");
+ }
+
+ boolean first = true;
+ for (Map.Entry<String, Object> entry : config.getParams().entrySet())
+ {
+ if (!first)
+ {
+ str.append("&");
+ }
+ String encodedKey = replaceWildcardChars(entry.getKey());
+
+ String val = entry.getValue().toString();
+ String encodedVal = replaceWildcardChars(val);
+
+ str.append(encodedKey).append('=').append(encodedVal);
+
+ first = false;
+ }
+
+ return new SimpleString(str.toString());
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,122 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A FlowBindingFilter
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 21 Jan 2009 20:53:20
+ *
+ *
+ */
+public class FlowBindingFilter implements Filter
+{
+ private volatile QueueInfo info;
+
+ private final Map<String, Filter> filters = new HashMap<String, Filter>();
+
+ public FlowBindingFilter(final QueueInfo info) throws Exception
+ {
+ this.info = info;
+
+ updateInfo(info);
+ }
+
+ public void updateInfo(final QueueInfo info) throws Exception
+ {
+ this.info = info;
+
+ List<String> filterStrings = info.getFilterStrings();
+
+ for (String filterString: filterStrings)
+ {
+ Filter filter = filters.get(filterString);
+
+ if (filter == null)
+ {
+ filter = new FilterImpl(new SimpleString(filterString));
+
+ filters.put(filterString, filter);
+ }
+ }
+
+ //TODO - this can be optimised by storing map of filters in QueueInfo
+ if (filterStrings.size() < filters.size())
+ {
+ Iterator<String> iter = filters.keySet().iterator();
+
+ while (iter.hasNext())
+ {
+ String filterString = iter.next();
+
+ if (!filterStrings.contains(filterString))
+ {
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ public SimpleString getFilterString()
+ {
+ return null;
+ }
+
+ public boolean match(final ServerMessage message)
+ {
+ if (info.getNumberOfConsumers() == 0)
+ {
+ return false;
+ }
+
+ if (filters.isEmpty())
+ {
+ return true;
+ }
+ else
+ {
+ for (Filter filter: filters.values())
+ {
+ if (filter.match(message))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.List;
+
+/**
+ * A QueueInfo
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 21 Jan 2009 20:55:06
+ *
+ *
+ */
+public class QueueInfo
+{
+ private final String queueName;
+
+ private final String address;
+
+ private final List<String> filterStrings;
+
+ private final int numberOfConsumers;
+
+ public QueueInfo(final String queueName, final String address, final List<String> filterStrings, final int numberOfConsumers)
+ {
+ this.queueName = queueName;
+ this.address = address;
+ this.filterStrings = filterStrings;
+ this.numberOfConsumers = numberOfConsumers;
+ }
+
+ public String getQueueName()
+ {
+ return queueName;
+ }
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ public List<String> getFilterStrings()
+ {
+ return filterStrings;
+ }
+
+ public int getNumberOfConsumers()
+ {
+ return numberOfConsumers;
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -43,13 +43,10 @@
import org.jboss.messaging.core.persistence.QueueBindingInfo;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.DivertBinding;
import org.jboss.messaging.core.postoffice.DuplicateIDCache;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.postoffice.impl.DivertBindingImpl;
+import org.jboss.messaging.core.postoffice.impl.BindingImpl;
import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
-import org.jboss.messaging.core.postoffice.impl.QueueBindingImpl;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.RemotingConnection;
@@ -64,6 +61,7 @@
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.security.SecurityStore;
import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
+import org.jboss.messaging.core.server.Bindable;
import org.jboss.messaging.core.server.Divert;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
@@ -291,11 +289,11 @@
true,
false);
- Binding binding = new QueueBindingImpl(queueBindingInfo.getAddress(), queue);
+ Binding binding = new BindingImpl(queueBindingInfo.getAddress(), queueBindingInfo.getQueueName(), queueBindingInfo.getQueueName(), queue, false, true);
queues.put(queueBindingInfo.getPersistenceID(), queue);
- postOffice.addBinding(binding);
+ postOffice.addBinding(binding);
}
Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
@@ -801,7 +799,7 @@
Queue queue = queueFactory.createQueue(-1, name, filter, config.isDurable(), false);
- QueueBinding queueBinding = new QueueBindingImpl(new SimpleString(config.getAddress()), queue);
+ Binding queueBinding = new BindingImpl(new SimpleString(config.getAddress()), name, name, queue, false, true);
binding = queueBinding;
@@ -870,7 +868,7 @@
pagingManager,
storageManager);
- DivertBinding binding = new DivertBindingImpl(sAddress, divert);
+ Binding binding = new BindingImpl(sAddress, sName, new SimpleString(config.getRoutingName()), divert, config.isExclusive(), false);
postOffice.addBinding(binding);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -361,8 +361,7 @@
else
{
ref.getQueue().acknowledge(tx, ref);
- // ref.acknowledge(tx, storageManager, postOffice, queueSettingsRepository);
-
+
// Del count is not actually updated in storage unless it's
// cancelled
ref.incrementDeliveryCount();
@@ -559,6 +558,8 @@
return HandleStatus.BUSY;
}
+ //TODO use a null or boolean check here for performance
+
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
if (pendingLargeMessagesCounter.get() > 0)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -35,8 +35,7 @@
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.postoffice.impl.QueueBindingImpl;
+import org.jboss.messaging.core.postoffice.impl.BindingImpl;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.DelayedResult;
import org.jboss.messaging.core.remoting.FailureListener;
@@ -1475,13 +1474,11 @@
final Queue queue = queueFactory.createQueue(-1, name, filter, durable, temporary);
- binding = new QueueBindingImpl(address, queue);
+ binding = new BindingImpl(address, name, name, queue, false, true);
if (durable)
{
- QueueBinding queueBinding = (QueueBinding)binding;
-
- storageManager.addQueueBinding(queueBinding);
+ storageManager.addQueueBinding(binding);
}
postOffice.addBinding(binding);
@@ -1659,7 +1656,7 @@
{
if (binding.isQueueBinding())
{
- names.add(binding.getBindable().getUniqueName());
+ names.add(binding.getUniqueName());
}
}
}
Modified: trunk/src/schemas/jbm-configuration.xsd
===================================================================
--- trunk/src/schemas/jbm-configuration.xsd 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/src/schemas/jbm-configuration.xsd 2009-01-22 11:16:47 UTC (rev 5686)
@@ -187,10 +187,10 @@
<xsd:complexType>
<xsd:sequence>
<xsd:element ref="local-bind-address" maxOccurs="1"
- minOccurs="1">
+ minOccurs="0">
</xsd:element>
<xsd:element ref="local-bind-port" maxOccurs="1"
- minOccurs="1">
+ minOccurs="0">
</xsd:element>
<xsd:element ref="group-address" maxOccurs="1"
minOccurs="1">
@@ -228,26 +228,6 @@
</xsd:complexType>
</xsd:element>
- <xsd:element name="message-flow">
- <xsd:complexType>
- <xsd:sequence>
- <xsd:element name="address" type="xsd:string"></xsd:element>
- <xsd:choice>
- <xsd:element ref="discovery-group-ref" minOccurs="1"
- maxOccurs="1">
- </xsd:element>
- <xsd:element ref="connector-ref" minOccurs="1"
- maxOccurs="unbounded">
- </xsd:element>
- </xsd:choice>
- <xsd:element name="exclusive" type="xsd:boolean"
- minOccurs="0" maxOccurs="1">
- </xsd:element>
- </xsd:sequence>
- <xsd:attribute name="name" type="xsd:ID" use="required"></xsd:attribute>
- </xsd:complexType>
- </xsd:element>
-
<xsd:element name="discovery-group-ref">
<xsd:complexType>
<xsd:attribute name="discovery-group-name"
@@ -351,9 +331,6 @@
<xsd:element name="use-duplicate-detection"
type="xsd:boolean" maxOccurs="1" minOccurs="0">
</xsd:element>
- <xsd:element name="max-hops" type="xsd:int" maxOccurs="1"
- minOccurs="0">
- </xsd:element>
<xsd:choice>
<xsd:element name="connector-ref"
type="connector-refType" minOccurs="1" maxOccurs="1">
@@ -373,14 +350,17 @@
<xsd:complexType name="divertType">
<xsd:sequence>
- <xsd:element name="routing-name" type="xsd:string"
+ <xsd:element name="unique-name" type="xsd:string"
maxOccurs="1" minOccurs="1">
</xsd:element>
+ <xsd:element name="routing-name" type="xsd:string"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
<xsd:element name="address" type="xsd:string" maxOccurs="1"
minOccurs="1">
</xsd:element>
<xsd:element name="forwarding-address" type="xsd:string"
- maxOccurs="1" minOccurs="0">
+ maxOccurs="1" minOccurs="1">
</xsd:element>
<xsd:element name="filter" type="filterType" maxOccurs="1"
minOccurs="0">
@@ -408,8 +388,6 @@
</xsd:attribute>
</xsd:complexType>
-
-
<xsd:complexType name="filterType">
<xsd:attribute name="string" type="xsd:string" use="required"></xsd:attribute>
</xsd:complexType>
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -1,148 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.failover;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-
-/**
- * A LargeMessageMuliThreadFailoverTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created Jan 18, 2009 4:52:09 PM
- *
- *
- */
-public class LargeMessageMuliThreadFailoverTest extends MultiThreadRandomFailoverTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected ClientSessionFactoryInternal createSessionFactory()
- {
- ClientSessionFactoryInternal sf = super.createSessionFactory();
-
- sf.setMinLargeMessageSize(200);
-
- return sf;
-
- }
-
-
- @Override
- protected void start() throws Exception
- {
-
- deleteDirectory(new File(getTestDir()));
-
- Configuration backupConf = new ConfigurationImpl();
-
- backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
- backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
- backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
- backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
- backupConf.setJournalFileSize(100 * 1024);
-
- backupConf.setSecurityEnabled(false);
- backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- backupConf.getAcceptorConfigurations()
- .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
- backupConf.setBackup(true);
-
- backupService = MessagingServiceImpl.newMessagingService(backupConf);
- backupService.start();
-
- Configuration liveConf = new ConfigurationImpl();
-
- liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
- liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
- liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
- liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
-
- liveConf.setJournalFileSize(100 * 1024);
-
- liveConf.setSecurityEnabled(false);
- liveConf.getAcceptorConfigurations()
- .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
-
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
-
- TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
- backupParams,
- "backup-connector");
- connectors.put(backupTC.getName(), backupTC);
- liveConf.setConnectorConfigurations(connectors);
- liveConf.setBackupConnectorName(backupTC.getName());
- liveService = MessagingServiceImpl.newMessagingService(liveConf);
-
- liveService.start();
-
- }
-
- @Override
- protected void setBody(final ClientMessage message) throws Exception
- {
- message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(500)));
-
- }
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
- */
- @Override
- protected boolean checkSize(ClientMessage message)
- {
- return 500 == message.getBodySize();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java (from rev 5683, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMuliThreadFailoverTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -0,0 +1,148 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+
+/**
+ * A LargeMessageMultiThreadFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Jan 18, 2009 4:52:09 PM
+ *
+ *
+ */
+public class LargeMessageMultiThreadFailoverTest extends MultiThreadRandomFailoverTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected ClientSessionFactoryInternal createSessionFactory()
+ {
+ ClientSessionFactoryInternal sf = super.createSessionFactory();
+
+ sf.setMinLargeMessageSize(200);
+
+ return sf;
+
+ }
+
+
+ @Override
+ protected void start() throws Exception
+ {
+
+ deleteDirectory(new File(getTestDir()));
+
+ Configuration backupConf = new ConfigurationImpl();
+
+ backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
+ backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
+ backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
+ backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+ backupConf.setJournalFileSize(100 * 1024);
+
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
+ backupConf.setBackup(true);
+
+ backupService = MessagingServiceImpl.newMessagingService(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+
+ liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
+ liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
+ liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
+ liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
+
+ liveConf.setJournalFileSize(100 * 1024);
+
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+ TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
+ backupParams,
+ "backup-connector");
+ connectors.put(backupTC.getName(), backupTC);
+ liveConf.setConnectorConfigurations(connectors);
+ liveConf.setBackupConnectorName(backupTC.getName());
+ liveService = MessagingServiceImpl.newMessagingService(liveConf);
+
+ liveService.start();
+
+ }
+
+ @Override
+ protected void setBody(final ClientMessage message) throws Exception
+ {
+ message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(500)));
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
+ */
+ @Override
+ protected boolean checkSize(ClientMessage message)
+ {
+ return 500 == message.getBodySize();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -1243,7 +1243,7 @@
protected int getNumIterations()
{
- return 1;
+ return 20;
}
@Override
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.util.SimpleString;
/**
@@ -33,6 +34,30 @@
*/
public class FakeBinding implements Binding
{
+ public boolean accept(ServerMessage message) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public SimpleString getRoutingName()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public SimpleString getUniqueName()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public boolean isExclusive()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
private SimpleString address;
private Bindable bindable;
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/failover/LargeMessageMultiThreadFailoverStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/failover/LargeMessageMultiThreadFailoverStressTest.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/failover/LargeMessageMultiThreadFailoverStressTest.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -23,7 +23,7 @@
package org.jboss.messaging.tests.stress.failover;
-import org.jboss.messaging.tests.integration.cluster.failover.LargeMessageMuliThreadFailoverTest;
+import org.jboss.messaging.tests.integration.cluster.failover.LargeMessageMultiThreadFailoverTest;
/**
* A LargeMessageMultiThreadFailoverStressTest
@@ -34,7 +34,7 @@
*
*
*/
-public class LargeMessageMultiThreadFailoverStressTest extends LargeMessageMuliThreadFailoverTest
+public class LargeMessageMultiThreadFailoverStressTest extends LargeMessageMultiThreadFailoverTest
{
// Constants -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -75,83 +75,7 @@
// Public --------------------------------------------------------
- public void disabledtestFoo() throws Exception
- {
-
- final int numberOfInserts = 50000;
-
- // final int numberOfAddRefs = 10;
-
-// for (long i = 1; i <= 10000; i++)
-// {
-// if (i % 10000 == 0)
-// {
-// System.out.println("Append " + i);
-// }
-// impl.appendAddRecord(i, (byte)12, encoding);
-//
-// for (int j = 0; j < numberOfAddRefs; j++)
-// {
-// impl.appendUpdateRecord(i, (byte)12, updateEncoding);
-// }
-// }
-//
- System.out.println("Starting");
-
-// for (int numberOfAddRefs = 0; numberOfAddRefs < 10; numberOfAddRefs++)
-// {
- final int numberOfAddRefs = 1;
- while (true)
- {
- File file = new File(getTestDir());
- deleteDirectory(file);
- file.mkdirs();
-
- SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDir());
- JournalImpl impl = new JournalImpl(10 * 1024 * 1024, 10, true, false, factory, "jbm", "jbm", 1000, 1000);
-
- impl.start();
-
- impl.load(dummyLoader);
-
- System.out.println("Loaded");
-
- SimpleEncoding encoding = new SimpleEncoding(1024, (byte)'f');
-
- SimpleEncoding updateEncoding = new SimpleEncoding(8, (byte)'f');
-
-
- long start = System.currentTimeMillis();
-
- for (long i = 1; i <= numberOfInserts; i++)
- {
- if (i % 10000 == 0)
- {
- System.out.println("Append " + i);
- }
- impl.appendAddRecord(i, (byte)12, encoding);
-
- for (int j = 0; j < numberOfAddRefs; j++)
- {
- impl.appendUpdateRecord(i, (byte)12, updateEncoding);
- }
- }
-
- long end = System.currentTimeMillis();
-
- double rate = 1000 * ((double)numberOfInserts) / (end - start);
- System.out.println("rate " + rate);
-
- impl.stop();
-
- }
-
-
-
- }
-
-
public void testInsertAndLoad() throws Exception
{
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/SimpleAddressManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/SimpleAddressManagerTest.java 2009-01-22 10:49:12 UTC (rev 5685)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/SimpleAddressManagerTest.java 2009-01-22 11:16:47 UTC (rev 5686)
@@ -21,16 +21,10 @@
*/
package org.jboss.messaging.tests.unit.core.postoffice.impl;
-import org.easymock.EasyMock;
import org.jboss.messaging.core.postoffice.AddressManager;
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.BindingType;
-import org.jboss.messaging.core.postoffice.impl.QueueBindingImpl;
import org.jboss.messaging.core.postoffice.impl.SimpleAddressManager;
-import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeQueueFactory;
import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.util.SimpleString;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
More information about the jboss-cvs-commits
mailing list