[Jboss-cvs] JBoss Messaging SVN: r1295 - in trunk: src/etc/server/default/deploy src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/server src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/remoting src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/local src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests tests/etc tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/local tests/src/org/jboss/test/messaging/core/paging tests/src/org/jboss/test/messaging/core/plugin tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/tools/jmx
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 15 13:44:31 EDT 2006
Author: timfox
Date: 2006-09-15 13:44:02 -0400 (Fri, 15 Sep 2006)
New Revision: 1295
Removed:
trunk/tests/src/org/jboss/test/messaging/core/local/QueueWithFilterTest.java
Modified:
trunk/src/etc/server/default/deploy/remoting-service.xml
trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java
trunk/src/main/org/jboss/jms/server/destination/QueueService.java
trunk/src/main/org/jboss/jms/server/destination/TopicService.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
trunk/src/main/org/jboss/jms/server/remoting/MessagingMarshallable.java
trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java
trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java
trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/local/Queue.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/BindingImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java
trunk/tests/build.xml
trunk/tests/etc/container.xml
trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
trunk/tests/src/org/jboss/test/messaging/core/local/NonRecoverableQueueTest.java
trunk/tests/src/org/jboss/test/messaging/core/local/RecoverableQueueTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
More clustering work - remoting fix, selectors
Modified: trunk/src/etc/server/default/deploy/remoting-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/remoting-service.xml 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/etc/server/default/deploy/remoting-service.xml 2006-09-15 17:44:02 UTC (rev 1295)
@@ -16,7 +16,8 @@
<invoker transport="socket">
<attribute name="marshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
<attribute name="unmarshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
- <attribute name="serializationtype" isParam="true">jboss</attribute>
+ <!-- Serialization type must be jms - do not change! -->
+ <attribute name="serializationtype" isParam="true">jms</attribute>
<attribute name="dataType" isParam="true">jms</attribute>
<attribute name="socket.check_connection" isParam="true">false</attribute>
<attribute name="timeout">0</attribute>
Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -170,12 +170,14 @@
return;
}
- //We explicitly associate the datatype "jms" with our customer SerializationManager
+ //We explicitly associate the datatype "jms" with our custom SerializationManager
//This is vital for performance reasons.
try
{
+// SerializationStreamFactory.setManagerClassName(
+// "jms", "org.jboss.remoting.serialization.impl.jboss.JBossSerializationManager");
SerializationStreamFactory.setManagerClassName(
- "jms", "org.jboss.remoting.serialization.impl.jboss.JBossSerializationManager");
+ "jms", "org.jboss.jms.server.remoting.MessagingSerializationManager");
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -762,8 +762,11 @@
{
// We explicitly associate the datatype "jms" with the java SerializationManager
// This is vital for performance reasons.
+// SerializationStreamFactory.setManagerClassName(
+// "jms", "org.jboss.remoting.serialization.impl.jboss.JBossSerializationManager");
SerializationStreamFactory.setManagerClassName(
- "jms", "org.jboss.remoting.serialization.impl.jboss.JBossSerializationManager");
+ "jms", "org.jboss.jms.server.remoting.MessagingSerializationManager");
+
JMSWireFormat wf = new JMSWireFormat();
Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -142,6 +142,8 @@
while (iter.hasNext())
{
Binding binding = (Binding)iter.next();
+
+ String filterString = binding.getFilter() != null ? binding.getFilter().getFilterString() : null;
if (durable && binding.isDurable())
{
@@ -154,7 +156,7 @@
sb.append("\", clientID=\"");
sb.append(helper.getClientId());
sb.append("\", selector=\"");
- sb.append(binding.getSelector());
+ sb.append(filterString);
sb.append("\"\n");
}
else if (!durable && !binding.isDurable())
@@ -162,7 +164,7 @@
sb.append("Non-durable, subscriptionID=\"");
sb.append(binding.getChannelId());
sb.append("\", selector=\"");
- sb.append(binding.getSelector());
+ sb.append(filterString);
sb.append("\"\n");
}
}
Modified: trunk/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -101,7 +101,7 @@
org.jboss.messaging.core.local.Queue q =
new org.jboss.messaging.core.local.Queue(binding.getChannelId(), ms, pm, true, true,
destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize(),
- executor, null);
+ executor);
q.load();
binding.setQueue(q);
binding.activate();
@@ -114,10 +114,10 @@
org.jboss.messaging.core.local.Queue q =
new org.jboss.messaging.core.local.Queue(idm.getId(), ms, pm, true, true,
destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize(),
- executor, null);
+ executor);
//Make a binding for this queue
- postOffice.bindQueue(destination.getName(), destination.getName(), q);
+ postOffice.bindQueue(destination.getName(), destination.getName(), null, q);
}
//push security update to the server
Modified: trunk/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/TopicService.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/destination/TopicService.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -82,7 +82,7 @@
org.jboss.messaging.core.local.Queue q =
new org.jboss.messaging.core.local.Queue(binding.getChannelId(), ms, pm, true, true,
destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize(),
- executor, null);
+ executor);
q.load();
binding.setQueue(q);
binding.activate();
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -192,10 +192,10 @@
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize(),
- executor, selector);
+ executor);
//Make a binding for this queue
- binding = topicPostOffice.bindQueue(new GUID().toString(), jmsDestination.getName(), q);
+ binding = topicPostOffice.bindQueue(new GUID().toString(), jmsDestination.getName(), selector, q);
}
else
{
@@ -229,10 +229,10 @@
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize(),
- executor, selector);
+ executor);
//Make a binding for this queue
- binding = topicPostOffice.bindQueue(name, jmsDestination.getName(), q);
+ binding = topicPostOffice.bindQueue(name, jmsDestination.getName(), selector, q);
}
else
{
@@ -246,11 +246,13 @@
// Changing a durable subscriber is equivalent to unsubscribing (deleting) the old
// one and creating a new one.
+ String filterString = binding.getFilter() != null ? binding.getFilter().getFilterString() : null;
+
boolean selectorChanged =
- (selectorString == null && binding.getSelector() != null) ||
- (binding.getSelector() == null && selectorString != null) ||
- (binding.getSelector() != null && selectorString != null &&
- !binding.getSelector().equals(selectorString));
+ (selectorString == null && filterString != null) ||
+ (filterString == null && selectorString != null) ||
+ (filterString != null && selectorString != null &&
+ !filterString.equals(selectorString));
if (trace) { log.trace("selector " + (selectorChanged ? "has" : "has NOT") + " changed"); }
@@ -274,10 +276,10 @@
mDest.getFullSize(),
mDest.getPageSize(),
mDest.getDownCacheSize(),
- executor, selector);
+ executor);
//Make a binding for this queue
- binding = topicPostOffice.bindQueue(name, jmsDestination.getName(), q);
+ binding = topicPostOffice.bindQueue(name, jmsDestination.getName(), selector, q);
}
}
}
@@ -572,10 +574,10 @@
QueuedExecutor executor = (QueuedExecutor)pool.get();
Queue q =
new Queue(idm.getId(), ms, pm, true, false, fullSize, pageSize, downCacheSize,
- executor, null);
+ executor);
//Make a binding for this queue
- queuePostOffice.bindQueue(dest.getName(), dest.getName(), q);
+ queuePostOffice.bindQueue(dest.getName(), dest.getName(), null, q);
}
}
catch (Throwable t)
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -25,6 +25,8 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
@@ -48,9 +50,8 @@
import org.jboss.remoting.InvocationResponse;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
-import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
-import org.jboss.remoting.marshal.serializable.SerializableUnMarshaller;
import org.jboss.serial.io.JBossObjectInputStream;
+import org.jboss.serial.io.JBossObjectOutputStream;
/**
*
@@ -77,6 +78,8 @@
private static final long serialVersionUID = -7646123424863782043L;
private static final Logger log = Logger.getLogger(JMSWireFormat.class);
+
+ private static boolean usingJBossSerialization;
// The request codes - start from zero
@@ -99,46 +102,41 @@
// Static --------------------------------------------------------
+
+ public static void setUsingJBossSerialization(boolean b)
+ {
+ usingJBossSerialization = b;
+ }
// Attributes ----------------------------------------------------
- protected Marshaller serializableMarshaller;
- protected UnMarshaller serializableUnMarshaller;
-
protected boolean trace;
// Constructors --------------------------------------------------
public JMSWireFormat()
{
- serializableMarshaller = new SerializableMarshaller();
- serializableUnMarshaller = new SerializableUnMarshaller();
-
trace = log.isTraceEnabled();
}
+
+
// Marshaller implementation -------------------------------------
public void write(Object obj, OutputStream out) throws IOException
{
- DataOutputStream dos;
-
- if (out instanceof DataOutputStream)
+ //Sanity check
+ if (!(out instanceof MessagingObjectOutputStream))
{
- dos = (DataOutputStream)out;
+ throw new IllegalStateException("Must be MessagingObjectOutputStream");
}
- else
- {
- //TODO - We should get remoting to pass in a DataOutputStream
- //So we don't have to wrap it every time
- dos = new DataOutputStream(out);
- }
+ DataOutputStream dos = (DataOutputStream)(((MessagingObjectOutputStream)out).getUnderlyingStream());
+
handleVersion(obj, dos);
try
- {
-
+ {
if (obj instanceof InvocationRequest)
{
if (trace) { log.trace("writing InvocationRequest"); }
@@ -283,7 +281,7 @@
dos.write(SERIALIZED);
// Delegate to serialization to handle the wire format
- serializableMarshaller.write(obj, dos);
+ serialize(dos, obj);
if (trace) { log.trace("wrote using standard serialization"); }
}
@@ -311,7 +309,7 @@
dos.write(SERIALIZED);
//Delegate to serialization to handle the wire format
- serializableMarshaller.write(obj, dos);
+ serialize(dos, obj);
if (trace) { log.trace("wrote using standard serialization"); }
}
@@ -374,7 +372,7 @@
dos.write(SERIALIZED);
//Delegate to serialization to handle the wire format
- serializableMarshaller.write(obj, out);
+ serialize(dos, obj);
if (trace) { log.trace("wrote using standard serialization"); }
}
@@ -401,25 +399,13 @@
public Object read(InputStream in, Map map) throws IOException, ClassNotFoundException
{
- if (in instanceof JBossObjectInputStream)
+ // Sanity check
+ if (!(in instanceof MessagingObjectInputStream))
{
- // Need to explicitly set the classloader
- ((JBossObjectInputStream)in).
- setClassLoader(Thread.currentThread().getContextClassLoader());
+ throw new IllegalStateException("Must be MessagingObjectInputStream");
}
- DataInputStream dis;
-
- //TODO We should ensure that remoting always passes in a DataInputStream
- //This saves us wrapping it each time
- if (in instanceof DataInputStream)
- {
- dis = (DataInputStream)in;
- }
- else
- {
- dis = new DataInputStream(in);
- }
+ DataInputStream dis = (DataInputStream)(((MessagingObjectInputStream)in).getUnderlyingStream());
// First byte read is always version
@@ -437,7 +423,7 @@
case SERIALIZED:
{
// Delegate to serialization
- Object ret = serializableUnMarshaller.read(dis, map);
+ Object ret = deserialize(dis);
if (trace) { log.trace("read using standard serialization"); }
@@ -726,6 +712,42 @@
return mi;
}
+
+ private void serialize(OutputStream os, Object obj) throws Exception
+ {
+ ObjectOutputStream oos;
+
+ if (usingJBossSerialization)
+ {
+ oos = new JBossObjectOutputStream(os);
+ }
+ else
+ {
+ oos = new ObjectOutputStream(os);
+ }
+
+ oos.writeObject(obj);
+
+ oos.flush();
+ }
+
+ private Object deserialize(InputStream is) throws Exception
+ {
+ ObjectInputStream ois;
+
+ if (usingJBossSerialization)
+ {
+ ois = new JBossObjectInputStream(is);
+ }
+ else
+ {
+ ois = new ObjectInputStream(is);
+ }
+
+ Object obj = ois.readObject();
+
+ return obj;
+ }
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/server/remoting/MessagingMarshallable.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingMarshallable.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingMarshallable.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -21,6 +21,10 @@
*/
package org.jboss.jms.server.remoting;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
/**
*
@@ -31,19 +35,25 @@
*
* $Id$
*/
-public class MessagingMarshallable
+public class MessagingMarshallable implements Externalizable
{
// Constants -----------------------------------------------------
+ private static final long serialVersionUID = 4715063783844562048L;
+
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
-
+
protected byte version;
protected Object load;
// Constructors --------------------------------------------------
+ public MessagingMarshallable()
+ {
+ }
+
public MessagingMarshallable(byte version, Object load)
{
this.version = version;
@@ -67,6 +77,20 @@
return "MessagingMarshallable[" + version + ", " + load + "]";
}
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ version = in.readByte();
+
+ load = in.readObject();
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ out.writeByte(version);
+
+ out.writeObject(load);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -23,10 +23,15 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.InvalidObjectException;
+import java.io.NotActiveException;
import java.io.ObjectInputStream;
+import java.io.ObjectInputValidation;
/**
* A MessagingObjectInputStream
+ *
+ * See comment in MessagingSerializationManager
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
@@ -51,4 +56,149 @@
return in;
}
+ public int available() throws IOException
+ {
+ return in.available();
+ }
+
+ public void close() throws IOException
+ {
+ in.close();
+ }
+
+ public boolean equals(Object obj)
+ {
+ return in.equals(obj);
+ }
+
+ public int hashCode()
+ {
+ return in.hashCode();
+ }
+
+ public void mark(int readlimit)
+ {
+ in.mark(readlimit);
+ }
+
+ public boolean markSupported()
+ {
+ return in.markSupported();
+ }
+
+ public int read() throws IOException
+ {
+ return in.read();
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ return in.read(b, off, len);
+ }
+
+ public int read(byte[] b) throws IOException
+ {
+ return in.read(b);
+ }
+
+ public void reset() throws IOException
+ {
+ in.reset();
+ }
+
+ public long skip(long n) throws IOException
+ {
+ return in.skip(n);
+ }
+
+ public String toString()
+ {
+ return in.toString();
+ }
+
+ public void defaultReadObject() throws IOException, ClassNotFoundException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean readBoolean() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public byte readByte() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public char readChar() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public double readDouble() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public GetField readFields() throws IOException, ClassNotFoundException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public float readFloat() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void readFully(byte[] buf, int off, int len) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void readFully(byte[] buf) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int readInt() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public long readLong() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public short readShort() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object readUnshared() throws IOException, ClassNotFoundException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int readUnsignedByte() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int readUnsignedShort() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public String readUTF() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void registerValidation(ObjectInputValidation obj, int prio) throws NotActiveException, InvalidObjectException
+ {
+ throw new UnsupportedOperationException();
+ }
+
}
Modified: trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -27,6 +27,8 @@
/**
* A MessagingObjectOutputStream
+ *
+ * See comment in MessagingSerializationManager
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
@@ -49,4 +51,119 @@
return out;
}
+ public void close() throws IOException
+ {
+ out.close();
+ }
+
+ public boolean equals(Object obj)
+ {
+ return out.equals(obj);
+ }
+
+ public void flush() throws IOException
+ {
+ out.flush();
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ out.write(b, off, len);
+ }
+
+ public void write(byte[] b) throws IOException
+ {
+ out.write(b);
+ }
+
+ public void write(int b) throws IOException
+ {
+ out.write(b);
+ }
+
+ public void defaultWriteObject() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public PutField putFields() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void reset() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void useProtocolVersion(int version) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeBoolean(boolean val) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeByte(int val) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeBytes(String str) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeChar(int val) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeChars(String str) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeDouble(double val) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeFields() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeFloat(float val) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeInt(int val) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeLong(long val) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeShort(int val) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeUnshared(Object obj) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void writeUTF(String str) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
}
Modified: trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -21,17 +21,28 @@
*/
package org.jboss.jms.server.remoting;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
+import org.jboss.logging.Logger;
import org.jboss.remoting.serialization.IMarshalledValue;
import org.jboss.remoting.serialization.SerializationManager;
/**
* A MessagingSerializationManager
+ *
+ * This class and the related ObjectInputStream and ObjectOutputStream classes
+ * are a hack to work around a limitiation of JBoss remoting whereby it always assumes
+ * the createInput and createOutput methods always return an ObjectInput/OutputStream
+ * For the purposes of messaging we want the marshaller and the server and client invokers
+ * to use the underlying stream instead, since we do not want all the extra crap that the object input/output
+ * streams add to the stream (headers)
+ * This should really be fixed properly in remoting
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
@@ -41,6 +52,9 @@
*/
public class MessagingSerializationManager extends SerializationManager
{
+ private static final Logger log = Logger.getLogger(MessagingSerializationManager.class);
+
+
public IMarshalledValue createdMarshalledValue(Object arg0) throws IOException
{
throw new UnsupportedOperationException();
@@ -48,12 +62,12 @@
public ObjectInputStream createInput(InputStream in, ClassLoader cl) throws IOException
{
- return new MessagingObjectInputStream(in);
+ return new MessagingObjectInputStream(new DataInputStream(in));
}
public ObjectOutputStream createOutput(OutputStream out) throws IOException
{
- return new MessagingObjectOutputStream(out);
+ return new MessagingObjectOutputStream(new DataOutputStream(out));
}
public IMarshalledValue createMarshalledValueForClone(Object arg0) throws IOException
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -167,37 +167,8 @@
{
return handleInternal(sender, ref, tx, true);
}
- }
-
- public Delivery handleDontPersist(DeliveryObserver sender, MessageReference ref, Transaction tx)
- {
- checkClosed();
+ }
- Future result = new Future();
-
- if (tx == null)
- {
- try
- {
- // Instead of executing directly, we add the handle request to the event queue.
- // Since remoting doesn't currently handle non blocking IO, we still have to wait for the
- // result, but when remoting does, we can use a full SEDA approach and get even better
- // throughput.
- this.executor.execute(new HandleRunnable(result, sender, ref, false));
- }
- catch (InterruptedException e)
- {
- log.warn("Thread interrupted", e);
- }
-
- return (Delivery)result.getResult();
- }
- else
- {
- return handleInternal(sender, ref, tx, false);
- }
- }
-
// DeliveryObserver implementation --------------------------
public void acknowledge(Delivery d, Transaction tx) throws Throwable
@@ -1037,24 +1008,26 @@
{
// by default a noop
}
-
- // Private -------------------------------------------------------
-
- private void checkClosed()
+
+ protected void checkClosed()
{
if (router == null)
{
throw new IllegalStateException(this + " closed");
}
}
+
+ // Private -------------------------------------------------------
+
+
// Inner classes -------------------------------------------------
- private class DeliveryRunnable implements Runnable
+ protected class DeliveryRunnable implements Runnable
{
Future result;
- DeliveryRunnable(Future result)
+ public DeliveryRunnable(Future result)
{
this.result = result;
}
@@ -1070,7 +1043,7 @@
}
}
- private class HandleRunnable implements Runnable
+ protected class HandleRunnable implements Runnable
{
Future result;
@@ -1080,7 +1053,7 @@
boolean persist;
- HandleRunnable(Future result, DeliveryObserver sender, MessageReference ref, boolean persist)
+ public HandleRunnable(Future result, DeliveryObserver sender, MessageReference ref, boolean persist)
{
this.result = result;
this.sender = sender;
Modified: trunk/src/main/org/jboss/messaging/core/local/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/Queue.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/local/Queue.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -22,14 +22,9 @@
package org.jboss.messaging.core.local;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.PagingChannel;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.messaging.core.tx.Transaction;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -65,38 +60,19 @@
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
-
- protected Filter filter;
-
-
+
// Constructors --------------------------------------------------
public Queue(long id, MessageStore ms, PersistenceManager pm,
boolean acceptReliableMessages, boolean recoverable,
- int fullSize, int pageSize, int downCacheSize, QueuedExecutor executor,
- Filter filter)
+ int fullSize, int pageSize, int downCacheSize, QueuedExecutor executor)
{
super(id, ms, pm, acceptReliableMessages, recoverable, fullSize, pageSize, downCacheSize, executor);
router = new RoundRobinPointToPointRouter();
-
- this.filter = filter;
}
- // Channel implementation ----------------------------------------
-
- public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx)
- {
- //If the queue has a Filter we do not accept any Message references that do not
- //match the Filter
- if (filter != null && !filter.accept(ref))
- {
- if (trace) { log.trace(this + " not accepting " + ref); }
- return null;
- }
-
- return super.handle(sender, ref, tx);
- }
+ // Channel implementation ----------------------------------------
// Public --------------------------------------------------------
@@ -105,11 +81,6 @@
return "Queue[" + getChannelID() + "]";
}
- public Filter getFilter()
- {
- return filter;
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -21,6 +21,7 @@
*/
package org.jboss.messaging.core.plugin.contract;
+import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredBinding;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredQueue;
@@ -46,7 +47,7 @@
* @return
* @throws Exception
*/
- ClusteredBinding bindClusteredQueue(String queueName, String condition, ClusteredQueue queue) throws Exception;
+ ClusteredBinding bindClusteredQueue(String queueName, String condition, Filter filter, ClusteredQueue queue) throws Exception;
/**
* Unbind a clustered queue from the post office
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -23,6 +23,7 @@
import java.util.List;
+import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.local.Queue;
import org.jboss.messaging.core.plugin.postoffice.Binding;
@@ -53,11 +54,12 @@
* @param queueName The unique name of the queue
* @param condition The condition to be used when routing references
* @param noLocal
+ * @param filter The filter to use
* @param queue
* @return
* @throws Exception
*/
- Binding bindQueue(String queueName, String condition, Queue queue) throws Exception;
+ Binding bindQueue(String queueName, String condition, Filter filter, Queue queue) throws Exception;
/**
* Unbind a queue from the post office
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -21,6 +21,7 @@
*/
package org.jboss.messaging.core.plugin.postoffice;
+import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.local.Queue;
/**
@@ -42,7 +43,7 @@
Queue getQueue();
- String getSelector();
+ Filter getFilter();
long getChannelId();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/BindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/BindingImpl.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/BindingImpl.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -24,6 +24,8 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import org.jboss.jms.selector.Selector;
+import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.local.Queue;
import org.jboss.messaging.util.StreamUtils;
@@ -49,7 +51,7 @@
private boolean active;
- private String selector;
+ private Filter filter;
private long channelId;
@@ -59,7 +61,7 @@
{
}
- public BindingImpl(String nodeId, String queueName, String condition, String selector,
+ public BindingImpl(String nodeId, String queueName, String condition, Filter filter,
long channelId, boolean durable)
{
this.nodeId = nodeId;
@@ -68,7 +70,7 @@
this.condition = condition;
- this.selector = selector;
+ this.filter = filter;
this.channelId = channelId;
@@ -113,9 +115,9 @@
return active;
}
- public String getSelector()
+ public Filter getFilter()
{
- return selector;
+ return filter;
}
public long getChannelId()
@@ -148,8 +150,11 @@
active = in.readBoolean();
- selector = (String)StreamUtils.readObject(in, false);
+ String filterString = (String)StreamUtils.readObject(in, false);
+ //TODO we need to generalise this
+ filter = new Selector(filterString);
+
channelId = in.readLong();
durable = in.readBoolean();
@@ -165,7 +170,7 @@
out.writeBoolean(active);
- StreamUtils.writeObject(out, selector, false, false);
+ StreamUtils.writeObject(out, filter.getFilterString(), false, false);
out.writeLong(channelId);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -24,6 +24,7 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -36,8 +37,10 @@
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
+import org.jboss.jms.selector.Selector;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.local.Queue;
import org.jboss.messaging.core.plugin.JDBCSupport;
@@ -121,7 +124,7 @@
// PostOffice implementation ---------------------------------------
- public Binding bindQueue(String queueName, String condition, Queue queue) throws Exception
+ public Binding bindQueue(String queueName, String condition, Filter filter, Queue queue) throws Exception
{
if (queueName == null)
{
@@ -153,11 +156,9 @@
}
boolean durable = queue.isRecoverable();
-
- String filter = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
binding = createBinding(nodeId, queueName, condition, filter,
- queue.getChannelID(), durable);
+ queue.getChannelID(), durable);
binding.setQueue(queue);
@@ -327,13 +328,18 @@
if (binding.isActive() && binding.getNodeId().equals(this.nodeId))
{
//It's a local binding so we pass the message on to the subscription
- Queue subscription = binding.getQueue();
-
- Delivery del = subscription.handle(null, ref, tx);
+ Queue queue = binding.getQueue();
- if (del != null && del.isSelectorAccepted())
+ Filter filter = binding.getFilter();
+
+ if (filter != null && filter.accept(ref))
{
- routed = true;
+ Delivery del = queue.handle(null, ref, tx);
+
+ if (del != null && del.isSelectorAccepted())
+ {
+ routed = true;
+ }
}
}
}
@@ -355,9 +361,9 @@
// Protected -----------------------------------------------------
- protected Binding createBinding(String nodeId, String queueName, String condition, String filter,
- long channelId, boolean durable)
- {
+ protected Binding createBinding(String nodeId, String queueName, String condition, Filter filter,
+ long channelId, boolean durable) throws Exception
+ {
return new BindingImpl(nodeId, queueName, condition, filter,
channelId, durable);
}
@@ -396,12 +402,21 @@
conn = ds.getConnection();
ps = conn.prepareStatement(getSQLStatement("INSERT_BINDING"));
+
+ String filterString = binding.getFilter() == null ? null : binding.getFilter().getFilterString();
ps.setString(1, this.officeName);
ps.setString(2, this.nodeId);
ps.setString(3, binding.getQueueName());
ps.setString(4, binding.getCondition());
- ps.setString(5, binding.getSelector());
+ if (filterString != null)
+ {
+ ps.setString(5, filterString);
+ }
+ else
+ {
+ ps.setNull(5, Types.VARCHAR);
+ }
ps.setLong(6, binding.getChannelId());
ps.executeUpdate();;
@@ -488,13 +503,25 @@
String selector = rs.getString(4);
+ if (rs.wasNull())
+ {
+ selector = null;
+ }
+
long channelId = rs.getLong(5);
//We don't load the actual queue - this is because we don't know the paging params until
//activation time
- Binding binding = createBinding(nodeId, queueName, condition, selector, channelId, true);
+ //TODO we need to generalise selector creation
+ Selector filter = null;
+ if (selector != null)
+ {
+ filter = new Selector(selector);
+ }
+ Binding binding = createBinding(nodeId, queueName, condition, filter, channelId, true);
+
list.add(binding);
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -24,7 +24,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import org.jboss.logging.Logger;
import org.jboss.messaging.util.Streamable;
/**
@@ -38,9 +37,7 @@
*
*/
abstract class ClusterRequest implements Streamable
-{
- private static final Logger log = Logger.getLogger(ClusterRequest.class);
-
+{
/*
* Factory method
*/
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingImpl.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingImpl.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -24,6 +24,7 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.plugin.postoffice.BindingImpl;
/**
@@ -46,9 +47,9 @@
{
}
- public ClusteredBindingImpl(String nodeId, String queueName, String condition, String selector, long channelId, boolean durable)
+ public ClusteredBindingImpl(String nodeId, String queueName, String condition, Filter filter, long channelId, boolean durable)
{
- super(nodeId, queueName, condition, selector, channelId, durable);
+ super(nodeId, queueName, condition, filter, channelId, durable);
}
public double getConsumptionRate()
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -36,8 +36,10 @@
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
+import org.jboss.jms.selector.Selector;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.local.Queue;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
@@ -277,16 +279,14 @@
// PostOffice implementation ---------------------------------------
- public ClusteredBinding bindClusteredQueue(String queueName, String condition, ClusteredQueue queue) throws Exception
+ public ClusteredBinding bindClusteredQueue(String queueName, String condition, Filter filter, ClusteredQueue queue) throws Exception
{
- ClusteredBinding binding = (ClusteredBinding)super.bindQueue(queueName, condition, queue);
+ ClusteredBinding binding = (ClusteredBinding)super.bindQueue(queueName, condition, filter, queue);
boolean durable = queue.isRecoverable();
- String filter = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
-
BindRequest request =
- new BindRequest(nodeId, queueName, condition, filter,
+ new BindRequest(nodeId, queueName, condition, filter == null ? null : filter.getFilterString(),
binding.getChannelId(), durable);
syncSendRequest(request);
@@ -410,33 +410,38 @@
throw new IllegalStateException("No bindings in list!");
}
- if (binding.getNodeId().equals(this.nodeId))
+ Filter filter = binding.getFilter();
+
+ if (filter != null && filter.accept(ref))
{
- //It's a local binding so we pass the message on to the queue
- Queue queue = binding.getQueue();
-
- Delivery del = queue.handle(null, ref, tx);
+ if (binding.getNodeId().equals(this.nodeId))
+ {
+ //It's a local binding so we pass the message on to the queue
+ Queue queue = binding.getQueue();
- if (del != null && del.isSelectorAccepted())
+ Delivery del = queue.handle(null, ref, tx);
+
+ if (del != null && del.isSelectorAccepted())
+ {
+ routed = true;
+ }
+ }
+ else
{
+ //It's a binding on a different office instance on the cluster
+ numberRemote++;
+
+ if (ref.isReliable() && binding.isDurable())
+ {
+ //Insert the reference into the database
+
+ //TODO perhaps we should do this via a stub queue class
+ pm.addReference(binding.getChannelId(), ref, tx);
+ }
+
routed = true;
}
- }
- else
- {
- //It's a binding on a different office instance on the cluster
- numberRemote++;
-
- if (ref.isReliable() && binding.isDurable())
- {
- //Insert the reference into the database
-
- //TODO perhaps we should do this via a stub queue class
- pm.addReference(binding.getChannelId(), ref, tx);
- }
-
- routed = true;
- }
+ }
}
//Now we've sent the message to any local queues, we might also need
@@ -522,7 +527,7 @@
throw new IllegalArgumentException(this.nodeId + "Binding already exists for node Id " + nodeId + " queue name " + queueName);
}
- binding = new ClusteredBindingImpl(nodeId, queueName, condition, filterString,
+ binding = new ClusteredBindingImpl(nodeId, queueName, condition, new Selector(filterString),
channelID, durable);
binding.activate();
@@ -585,7 +590,7 @@
throw new IllegalStateException("Cannot find binding for queue name " + queueName);
}
- Queue queue = binding.getQueue();
+ ClusteredQueue queue = (ClusteredQueue)binding.getQueue();
Iterator iter = messages.iterator();
@@ -599,7 +604,7 @@
ref = ms.reference(msg);
- queue.handleDontPersist(null, ref, null);
+ queue.handleFromCluster(null, ref, null);
}
finally
{
@@ -663,11 +668,11 @@
if (handle)
{
//It's a local binding so we pass the message on to the subscription
- Queue subscription = binding.getQueue();
+ ClusteredQueue queue = (ClusteredQueue)binding.getQueue();
//TODO instead of adding a new method on the channel
//we should set a header and use the same method
- subscription.handleDontPersist(null, ref, null);
+ queue.handleFromCluster(null, ref, null);
}
}
}
@@ -849,34 +854,37 @@
{
ClusteredBinding bb = (ClusteredBinding)iter.next();
- ClusteredQueue q = (ClusteredQueue)bb.getQueue();
-
- //We don't bother sending the stat if there is less than STATS_DIFFERENCE_MARGIN_PERCENT % difference
-
- double newRate = q.getGrowthRate();
-
- int newMessageCount = q.messageCount();
-
- boolean sendStats = decideToSendStats(bb.getConsumptionRate(), newRate);
-
- if (!sendStats)
- {
- sendStats = decideToSendStats(bb.getMessageCount(), newMessageCount);
- }
-
- if (sendStats)
- {
- bb.setConsumptionRate(newRate);
- bb.setMessageCount(newMessageCount);
+ if (bb.isActive())
+ {
+ ClusteredQueue q = (ClusteredQueue)bb.getQueue();
- if (stats == null)
+ //We don't bother sending the stat if there is less than STATS_DIFFERENCE_MARGIN_PERCENT % difference
+
+ double newRate = q.getGrowthRate();
+
+ int newMessageCount = q.messageCount();
+
+ boolean sendStats = decideToSendStats(bb.getConsumptionRate(), newRate);
+
+ if (!sendStats)
{
- stats = new ArrayList();
+ sendStats = decideToSendStats(bb.getMessageCount(), newMessageCount);
}
- QueueStats qs = new QueueStats(bb.getQueueName(), newRate, newMessageCount);
- stats.add(qs);
- }
+ if (sendStats)
+ {
+ bb.setConsumptionRate(newRate);
+ bb.setMessageCount(newMessageCount);
+
+ if (stats == null)
+ {
+ stats = new ArrayList();
+ }
+ QueueStats qs = new QueueStats(bb.getQueueName(), newRate, newMessageCount);
+
+ stats.add(qs);
+ }
+ }
}
}
}
@@ -961,11 +969,11 @@
// Protected ---------------------------------------------------------------------------------------
- protected Binding createBinding(String nodeId, String queueName, String condition, String filter,
- long channelId, boolean durable)
+ protected Binding createBinding(String nodeId, String queueName, String condition, Filter filter,
+ long channelId, boolean durable) throws Exception
{
return new ClusteredBindingImpl(nodeId, queueName, condition, filter,
- channelId, durable);
+ channelId, durable);
}
protected void loadBindings() throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -25,12 +25,14 @@
import java.util.List;
import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.SimpleDelivery;
import org.jboss.messaging.core.local.Queue;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.util.Future;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -56,10 +58,11 @@
private volatile int numberConsumed;
- public ClusteredQueue(long id, MessageStore ms, PersistenceManager pm, boolean acceptReliableMessages, boolean recoverable, int fullSize, int pageSize, int downCacheSize, QueuedExecutor executor, Filter filter)
+ public ClusteredQueue(long id, MessageStore ms, PersistenceManager pm, boolean acceptReliableMessages,
+ boolean recoverable, int fullSize, int pageSize, int downCacheSize, QueuedExecutor executor)
{
super(id, ms, pm, acceptReliableMessages, recoverable, fullSize, pageSize,
- downCacheSize, executor, filter);
+ downCacheSize, executor);
lastTime = System.currentTimeMillis();
@@ -115,7 +118,35 @@
}
}
}
+
+ /*
+ * This is the same as the normal handle() method on the Channel except it doesn't
+ * persist the message even if it is persistent - this is because persistent messages
+ * are always persisted on the sending node before sending.
+ */
+ public Delivery handleFromCluster(DeliveryObserver sender, MessageReference ref, Transaction tx)
+ throws Exception
+ {
+ checkClosed();
+
+ Future result = new Future();
+ if (tx == null)
+ {
+ // Instead of executing directly, we add the handle request to the event queue.
+ // Since remoting doesn't currently handle non blocking IO, we still have to wait for the
+ // result, but when remoting does, we can use a full SEDA approach and get even better
+ // throughput.
+ this.executor.execute(new HandleRunnable(result, sender, ref, false));
+
+ return (Delivery)result.getResult();
+ }
+ else
+ {
+ return handleInternal(sender, ref, tx, false);
+ }
+ }
+
protected void addReferenceInMemory(MessageReference ref) throws Exception
{
super.addReferenceInMemory(ref);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -25,7 +25,6 @@
import java.io.DataOutputStream;
import java.util.Map;
-import org.jboss.logging.Logger;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.message.MessageFactory;
import org.jboss.messaging.util.StreamUtils;
@@ -42,8 +41,6 @@
*/
class MessageHolder implements Streamable
{
- private static final Logger log = Logger.getLogger(MessageHolder.class);
-
private String routingKey;
private Message message;
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/build.xml 2006-09-15 17:44:02 UTC (rev 1295)
@@ -58,14 +58,14 @@
-->
<property name="functional.tests.database" value="hsqldb"/>
- <property name="functional.tests.serialization" value="jboss"/>
+ <property name="functional.tests.serialization" value="jms"/>
<!--
Stress tests.
-->
<property name="stress.tests.database" value="mysql"/>
- <property name="stress.tests.serialization" value="jboss"/>
+ <property name="stress.tests.serialization" value="jms"/>
<!--
Project paths.
@@ -880,7 +880,7 @@
<property name="test.execution.classpath.file" value=".test.execution.classpath"/>
<target name="get-test-execution-classpath" depends="init">
- <pathconvert refid="test.execution.classpath"
+ <pathconvert refid="test.execution.classpath"
property="test.execution.classpath.unix"/>
<echo message="${test.execution.classpath.unix}" file="${test.execution.classpath.file}"/>
</target>
Modified: trunk/tests/etc/container.xml
===================================================================
--- trunk/tests/etc/container.xml 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/etc/container.xml 2006-09-15 17:44:02 UTC (rev 1295)
@@ -55,8 +55,10 @@
precedence. Supported values: "jboss" and "java".
-->
- <serialization-type>jboss</serialization-type>
+ <!-- Must ALWAYS be jms -->
+ <serialization-type>jms</serialization-type>
+
</container>
Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -120,26 +120,26 @@
public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
{
- log.info(this + " got routable:" + ref);
+ log.trace(this + " got routable:" + ref);
try
{
if (ref == null)
{
- log.info("Receiver [" + name + "] is rejecting a null reference");
+ log.trace("Receiver [" + name + "] is rejecting a null reference");
return null;
}
if (SELECTOR_REJECTING.equals(state))
{
- log.info(this + " is rejecting message since doesn't match selector");
+ log.trace(this + " is rejecting message since doesn't match selector");
return new SimpleDelivery(null, null, true, false);
}
if (REJECTING.equals(state))
{
- log.info(this + " is rejecting reference " + ref);
+ log.trace(this + " is rejecting reference " + ref);
return null;
}
@@ -150,10 +150,10 @@
"THE BEHAVIOUR OF A BROKEN RECEIVER");
}
- log.info("State is:" + state);
+ log.trace("State is:" + state);
boolean done = ACKING.equals(state) ? true : false;
- log.info(this + " is " + (done ? "ACKing" : "NACKing") + " message " + ref);
+ log.trace(this + " is " + (done ? "ACKing" : "NACKing") + " message " + ref);
Message m = ref.getMessage();
@@ -162,7 +162,7 @@
if (immediateAsynchronousAcknowledgment)
{
- log.info("simulating an asynchronous ACK that arrives before we return the delivery to channel");
+ log.trace("simulating an asynchronous ACK that arrives before we return the delivery to channel");
try
{
delivery.acknowledge(null);
@@ -210,7 +210,7 @@
log.error("No channel, cannot request messages");
return;
}
- log.info("receiver explicitely requesting message from the channel");
+ log.trace("receiver explicitely requesting message from the channel");
channel.deliver(true);
}
@@ -292,7 +292,7 @@
Message m = (Message)o[0];
if (m == r)
{
- log.info("*** found it");
+ log.trace("*** found it");
d = (Delivery)o[1];
touple = o;
break;
@@ -311,7 +311,7 @@
d.acknowledge(tx);
- log.info(this + " acknowledged " + r);
+ log.trace(this + " acknowledged " + r);
// make sure I get rid of message if the transaction is rolled back
if (tx != null)
@@ -349,7 +349,7 @@
d.cancel();
- log.info(this + " cancelled " + r);
+ log.trace(this + " cancelled " + r);
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/local/NonRecoverableQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/NonRecoverableQueueTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/NonRecoverableQueueTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -55,8 +55,7 @@
{
super.setUp();
- queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor(),
- null);
+ queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor());
}
public void tearDown() throws Exception
@@ -72,8 +71,7 @@
public void recoverChannel() throws Exception
{
- queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor(),
- null);
+ queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor());
}
// Public --------------------------------------------------------
Deleted: trunk/tests/src/org/jboss/test/messaging/core/local/QueueWithFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/QueueWithFilterTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/QueueWithFilterTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -1,158 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.core.local;
-
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Routable;
-import org.jboss.messaging.core.local.Queue;
-import org.jboss.messaging.core.message.CoreMessage;
-import org.jboss.messaging.core.plugin.JDBCPersistenceManager;
-import org.jboss.messaging.core.plugin.SimpleMessageStore;
-import org.jboss.messaging.core.plugin.contract.MessageStore;
-import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.test.messaging.MessagingTestCase;
-import org.jboss.test.messaging.tools.jmx.ServiceContainer;
-
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
-/**
- *
- * A QueueWithFilterTest.
- *
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @version $Revision: 1237 $
- *
- * $Id: queueWithFilterTest.java 1237 2006-08-30 18:36:36Z timfox $
- */
-public class QueueWithFilterTest extends MessagingTestCase
-{
- public QueueWithFilterTest(String name)
- {
- super(name);
- }
-
- protected PersistenceManager pm;
-
- protected MessageStore ms;
-
- protected ServiceContainer sc;
-
- // ChannelTestBase overrides ------------------------------------
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- sc = new ServiceContainer("all,-remoting,-security");
- sc.start();
-
- pm =
- new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(), null,
- true, true, true, 100);
- pm.start();
-
- ms = new SimpleMessageStore();
- ms.start();
-
- log.debug("setup done");
- }
-
- public void tearDown() throws Exception
- {
- pm.stop();
-
- ms.stop();
-
- sc.stop();
-
- super.tearDown();
- }
-
-
- public void testWithFilter()
- {
- Filter f = new SimpleFilter(3);
-
- Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(),
- f);
-
- Message m1 = new CoreMessage(1, false, 0, 0, (byte)0, null, null, 0);
- Message m2 = new CoreMessage(2, false, 0, 0, (byte)0, null, null, 0);
- Message m3 = new CoreMessage(3, false, 0, 0, (byte)0, null, null, 0);
- Message m4 = new CoreMessage(4, false, 0, 0, (byte)0, null, null, 0);
- Message m5 = new CoreMessage(5, false, 0, 0, (byte)0, null, null, 0);
-
- assertNull(queue.handle(null, ms.reference(m1), null));
- assertNull(queue.handle(null, ms.reference(m2), null));
- assertNotNull(queue.handle(null, ms.reference(m3), null));
- assertNull(queue.handle(null, ms.reference(m4), null));
- assertNull(queue.handle(null, ms.reference(m5), null));
-
- }
-
- public void testWithoutFilter()
- {
- Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(),
- null);
-
- Message m1 = new CoreMessage(1, false, 0, 0, (byte)0, null, null, 0);
- Message m2 = new CoreMessage(2, false, 0, 0, (byte)0, null, null, 0);
- Message m3 = new CoreMessage(3, false, 0, 0, (byte)0, null, null, 0);
- Message m4 = new CoreMessage(4, false, 0, 0, (byte)0, null, null, 0);
- Message m5 = new CoreMessage(5, false, 0, 0, (byte)0, null, null, 0);
-
- assertNotNull(queue.handle(null, ms.reference(m1), null));
- assertNotNull(queue.handle(null, ms.reference(m2), null));
- assertNotNull(queue.handle(null, ms.reference(m3), null));
- assertNotNull(queue.handle(null, ms.reference(m4), null));
- assertNotNull(queue.handle(null, ms.reference(m5), null));
-
- }
-
- public void crashChannel() throws Exception
- {
- }
-
- public void recoverChannel() throws Exception
- {
- }
-
- class SimpleFilter implements Filter
- {
- private long value;
- SimpleFilter(long value)
- {
- this.value = value;
- }
- public boolean accept(Routable routable)
- {
- MessageReference ref = (MessageReference)routable;
- return ref.getMessageID() == value;
- }
- public String getFilterString()
- {
- return null;
- }
- }
-}
Modified: trunk/tests/src/org/jboss/test/messaging/core/local/RecoverableQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/RecoverableQueueTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/RecoverableQueueTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -55,8 +55,7 @@
{
super.setUp();
- queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(),
- null);
+ queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
}
public void tearDown() throws Exception
@@ -72,8 +71,7 @@
public void recoverChannel() throws Exception
{
- queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(),
- null);
+ queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
}
// Public --------------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -64,9 +64,9 @@
public void testChannelShareNP_2PC() throws Throwable
{
- Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
- Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+ Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -63,9 +63,9 @@
public void test1() throws Throwable
{
- Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
- Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+ Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -64,9 +64,9 @@
public void testChannelShareNP_Transactional() throws Throwable
{
- Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
- Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+ Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -59,9 +59,9 @@
public void test1() throws Throwable
{
- Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
- Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+ Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -63,9 +63,9 @@
public void test1() throws Throwable
{
- Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
- Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+ Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -63,9 +63,9 @@
public void test1() throws Throwable
{
- Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
- Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+ Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
Message[] msgs = new Message[150];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -63,7 +63,7 @@
public void testPaging() throws Exception
{
- Queue p = new Queue(0, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue p = new Queue(0, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
CoreMessage m = null;
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -66,7 +66,7 @@
public void test1() throws Throwable
{
- Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -65,7 +65,7 @@
public void test1() throws Throwable
{
- Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -66,7 +66,7 @@
public void test1() throws Throwable
{
- Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -66,7 +66,7 @@
public void test1() throws Throwable
{
- Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -65,7 +65,7 @@
public void test1() throws Throwable
{
- Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -66,7 +66,7 @@
public void test1() throws Throwable
{
- Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
Message[] msgs = new Message[241];
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -66,7 +66,7 @@
public void testRecoverableQueueCrash() throws Throwable
{
- Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
Message[] msgs = new Message[200];
@@ -126,7 +126,7 @@
ms = new SimpleMessageStore();
ms.start();
- Queue queue2 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue2 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
queue2.load();
@@ -155,7 +155,7 @@
{
//Non recoverable queue - eg temporary queue
- Queue queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor());
Message[] msgs = new Message[200];
@@ -215,7 +215,7 @@
ms = new SimpleMessageStore();
ms.start();
- Queue queue2 = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue2 = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor());
queue2.load();
@@ -242,7 +242,7 @@
{
//Non recoverable queue - eg temporary queue
- Queue queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor());
Message[] msgs = new Message[200];
@@ -306,7 +306,7 @@
public void testQueueReloadWithSmallerFullSize() throws Throwable
{
- Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
Message[] msgs = new Message[150];
@@ -360,7 +360,7 @@
//Reload the queue with a smaller fullSize
- Queue queue2 = new Queue(1, ms, pm, true, false, 50, 20, 10, new QueuedExecutor(), null);
+ Queue queue2 = new Queue(1, ms, pm, true, false, 50, 20, 10, new QueuedExecutor());
queue2.load();
@@ -404,7 +404,7 @@
public void testReloadWithLargerFullSize() throws Throwable
{
- Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+ Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
Message[] msgs = new Message[150];
@@ -458,7 +458,7 @@
//Reload the queue with a smaller fullSize
- Queue queue2 = new Queue(1, ms, pm, true, false, 130, 20, 10, new QueuedExecutor(), null);
+ Queue queue2 = new Queue(1, ms, pm, true, false, 130, 20, 10, new QueuedExecutor());
queue2.load();
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -28,6 +28,7 @@
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredBinding;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRoutingPolicy;
@@ -93,12 +94,12 @@
//Add a couple of bindings
- ClusteredQueue queue1 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ ClusteredQueue queue1 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding1 =
- office1.bindClusteredQueue("sub1", "topic1", queue1);
- ClusteredQueue queue2 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ office1.bindClusteredQueue("sub1", "topic1", null, queue1);
+ ClusteredQueue queue2 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding2 =
- office1.bindClusteredQueue("sub2", "topic1", queue2);
+ office1.bindClusteredQueue("sub2", "topic1", null, queue2);
//Start another office - make sure it picks up the bindings from the first node
@@ -113,9 +114,9 @@
//Add another binding on node 2
- ClusteredQueue queue3 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ ClusteredQueue queue3 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding3 =
- office2.bindClusteredQueue("sub3", "topic1", queue3);
+ office2.bindClusteredQueue("sub3", "topic1", null, queue3);
//Make sure both nodes pick it up
@@ -137,9 +138,9 @@
//Add another binding on node 1
- ClusteredQueue queue4 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ ClusteredQueue queue4 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding4 =
- office2.bindClusteredQueue("sub4", "topic1", queue4);
+ office2.bindClusteredQueue("sub4", "topic1", null, queue4);
// Make sure both nodes pick it up
@@ -196,9 +197,9 @@
//Add another binding on node 3
- ClusteredQueue queue5 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ ClusteredQueue queue5 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding5 =
- office3.bindClusteredQueue("sub5", "topic1", queue5);
+ office3.bindClusteredQueue("sub5", "topic1", null, queue5);
// Make sure all nodes pick it up
@@ -228,13 +229,13 @@
//Add a durable and a non durable binding on node 1
- ClusteredQueue queue6 = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ ClusteredQueue queue6 = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding6 =
- office1.bindClusteredQueue("sub6", "topic1", queue6);
+ office1.bindClusteredQueue("sub6", "topic1", null, queue6);
- ClusteredQueue queue7 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ ClusteredQueue queue7 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding7 =
- office1.bindClusteredQueue("sub7", "topic1", queue7);
+ office1.bindClusteredQueue("sub7", "topic1", null, queue7);
// Make sure all nodes pick them up
@@ -406,6 +407,53 @@
clusteredTransactionalRoute(false);
}
+ public final void testRedistribute() throws Exception
+ {
+ ClusteredPostOffice office1 = null;
+
+ ClusteredPostOffice office2 = null;
+
+ try
+ {
+ office1 = createClusteredPostOffice("node1", "testgroup");
+ office2 = createClusteredPostOffice("node2", "testgroup");
+
+ ClusteredQueue queue1 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ ClusteredBinding binding1 = office1.bindClusteredQueue("queue1", "queue1", null, queue1);
+
+ ClusteredQueue queue2 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ ClusteredBinding binding2 = office2.bindClusteredQueue("queue1", "queue1", null, queue1);
+
+ final int NUM_MESSAGES = 10;
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message msg = CoreMessageFactory.createCoreMessage(i, false, null);
+ MessageReference ref = ms.reference(msg);
+ boolean routed = office1.route(ref, "queue1", null);
+ }
+
+ List msgs = queue1.browse();
+ assertEquals(NUM_MESSAGES, msgs.size());
+ msgs = queue2.browse();
+ assertEquals(0, msgs.size());
+
+
+
+ }
+ finally
+ {
+ if (office1 != null)
+ {
+ office1.stop();
+ }
+
+ if (office2 != null)
+ {
+ office2.stop();
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -427,53 +475,53 @@
ClusteredQueue[] queues = new ClusteredQueue[16];
Binding[] bindings = new Binding[16];
- queues[0] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[0] = office1.bindClusteredQueue("sub1", "topic1", queues[0]);
+ queues[0] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[0] = office1.bindClusteredQueue("sub1", "topic1", null, queues[0]);
- queues[1] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[1] = office1.bindClusteredQueue("sub2", "topic1", queues[1]);
+ queues[1] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[1] = office1.bindClusteredQueue("sub2", "topic1", null, queues[1]);
- queues[2] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[2] = office2.bindClusteredQueue("sub3", "topic1", queues[2]);
+ queues[2] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[2] = office2.bindClusteredQueue("sub3", "topic1", null, queues[2]);
- queues[3] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[3] = office2.bindClusteredQueue("sub4", "topic1", queues[3]);
+ queues[3] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[3] = office2.bindClusteredQueue("sub4", "topic1", null, queues[3]);
- queues[4] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[4] = office2.bindClusteredQueue("sub5", "topic1", queues[4]);
+ queues[4] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[4] = office2.bindClusteredQueue("sub5", "topic1", null, queues[4]);
- queues[5] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[5] = office1.bindClusteredQueue("sub6", "topic1", queues[5]);
+ queues[5] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[5] = office1.bindClusteredQueue("sub6", "topic1", null, queues[5]);
- queues[6] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[6] = office1.bindClusteredQueue("sub7", "topic1", queues[6]);
+ queues[6] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[6] = office1.bindClusteredQueue("sub7", "topic1", null, queues[6]);
- queues[7] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[7] = office1.bindClusteredQueue("sub8", "topic1", queues[7]);
+ queues[7] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[7] = office1.bindClusteredQueue("sub8", "topic1", null, queues[7]);
- queues[8] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[8] = office1.bindClusteredQueue("sub9", "topic2", queues[8]);
+ queues[8] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[8] = office1.bindClusteredQueue("sub9", "topic2", null, queues[8]);
- queues[9] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[9] = office1.bindClusteredQueue("sub10", "topic2", queues[9]);
+ queues[9] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[9] = office1.bindClusteredQueue("sub10", "topic2", null, queues[9]);
- queues[10] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[10] = office2.bindClusteredQueue("sub11", "topic2", queues[10]);
+ queues[10] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[10] = office2.bindClusteredQueue("sub11", "topic2", null, queues[10]);
- queues[11] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[11] = office2.bindClusteredQueue("sub12", "topic2", queues[11]);
+ queues[11] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[11] = office2.bindClusteredQueue("sub12", "topic2", null, queues[11]);
- queues[12] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[12] = office2.bindClusteredQueue("sub13", "topic2", queues[12]);
+ queues[12] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[12] = office2.bindClusteredQueue("sub13", "topic2", null, queues[12]);
- queues[13] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[13] = office1.bindClusteredQueue("sub14", "topic2", queues[13]);
+ queues[13] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[13] = office1.bindClusteredQueue("sub14", "topic2", null, queues[13]);
- queues[14] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[14] = office1.bindClusteredQueue("sub15", "topic2", queues[14]);
+ queues[14] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[14] = office1.bindClusteredQueue("sub15", "topic2", null, queues[14]);
- queues[15] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[15] = office1.bindClusteredQueue("sub16", "topic2", queues[15]);
+ queues[15] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[15] = office1.bindClusteredQueue("sub16", "topic2", null, queues[15]);
SimpleReceiver[] receivers = new SimpleReceiver[16];
@@ -589,53 +637,53 @@
ClusteredQueue[] queues = new ClusteredQueue[16];
Binding[] bindings = new Binding[16];
- queues[0] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[0] = office1.bindClusteredQueue("sub1", "topic1", queues[0]);
+ queues[0] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[0] = office1.bindClusteredQueue("sub1", "topic1", null, queues[0]);
- queues[1] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[1] = office1.bindClusteredQueue("sub2", "topic1", queues[1]);
+ queues[1] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[1] = office1.bindClusteredQueue("sub2", "topic1", null, queues[1]);
- queues[2] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[2] = office2.bindClusteredQueue("sub3", "topic1", queues[2]);
+ queues[2] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[2] = office2.bindClusteredQueue("sub3", "topic1", null, queues[2]);
- queues[3] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[3] = office2.bindClusteredQueue("sub4", "topic1", queues[3]);
+ queues[3] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[3] = office2.bindClusteredQueue("sub4", "topic1", null, queues[3]);
- queues[4] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[4] = office2.bindClusteredQueue("sub5", "topic1", queues[4]);
+ queues[4] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[4] = office2.bindClusteredQueue("sub5", "topic1", null, queues[4]);
- queues[5] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[5] = office1.bindClusteredQueue("sub6", "topic1", queues[5]);
+ queues[5] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[5] = office1.bindClusteredQueue("sub6", "topic1", null, queues[5]);
- queues[6] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[6] = office1.bindClusteredQueue("sub7", "topic1", queues[6]);
+ queues[6] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[6] = office1.bindClusteredQueue("sub7", "topic1", null, queues[6]);
- queues[7] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[7] = office1.bindClusteredQueue("sub8", "topic1", queues[7]);
+ queues[7] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[7] = office1.bindClusteredQueue("sub8", "topic1", null, queues[7]);
- queues[8] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[8] = office1.bindClusteredQueue("sub9", "topic2", queues[8]);
+ queues[8] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[8] = office1.bindClusteredQueue("sub9", "topic2", null, queues[8]);
- queues[9] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[9] = office1.bindClusteredQueue("sub10", "topic2", queues[9]);
+ queues[9] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[9] = office1.bindClusteredQueue("sub10", "topic2", null, queues[9]);
- queues[10] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[10] = office2.bindClusteredQueue("sub11", "topic2", queues[10]);
+ queues[10] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[10] = office2.bindClusteredQueue("sub11", "topic2", null, queues[10]);
- queues[11] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[11] = office2.bindClusteredQueue("sub12", "topic2", queues[11]);
+ queues[11] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[11] = office2.bindClusteredQueue("sub12", "topic2", null, queues[11]);
- queues[12] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[12] = office2.bindClusteredQueue("sub13", "topic2", queues[12]);
+ queues[12] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[12] = office2.bindClusteredQueue("sub13", "topic2", null, queues[12]);
- queues[13] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[13] = office1.bindClusteredQueue("sub14", "topic2", queues[13]);
+ queues[13] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[13] = office1.bindClusteredQueue("sub14", "topic2", null, queues[13]);
- queues[14] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[14] = office1.bindClusteredQueue("sub15", "topic2", queues[14]);
+ queues[14] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[14] = office1.bindClusteredQueue("sub15", "topic2", null, queues[14]);
- queues[15] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
- bindings[15] = office1.bindClusteredQueue("sub16", "topic2", queues[15]);
+ queues[15] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
+ bindings[15] = office1.bindClusteredQueue("sub16", "topic2", null, queues[15]);
SimpleReceiver[] receivers = new SimpleReceiver[16];
@@ -1183,7 +1231,7 @@
null, true, nodeId, "Clustered", ms, groupName,
JGroupsUtil.getControlStackProperties(50, 1),
JGroupsUtil.getDataStackProperties(50, 1),
- tr, pm, 5000, 5000, routingPolicy, redistPolicy, 5000);
+ tr, pm, 5000, 5000, routingPolicy, redistPolicy, 1000);
postOffice.start();
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -146,15 +146,15 @@
Filter filter1 = new Selector("x = 'cheese'");
Filter filter2 = new Selector("y = 'bread'");
- Queue queue1 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue1 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding1 =
- office1.bindQueue("durableQueue", "condition1", queue1);
+ office1.bindQueue("durableQueue", "condition1", null, queue1);
//Binding twice with the same name should fail
try
{
- Binding bindFail = office1.bindQueue("durableQueue", "condition1", queue1);
+ Binding bindFail = office1.bindQueue("durableQueue", "condition1", null, queue1);
fail();
}
catch (IllegalArgumentException e)
@@ -163,9 +163,9 @@
}
//Bind one non durable
- Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding2 =
- office1.bindQueue("nonDurableQueue", "condition2", queue2);
+ office1.bindQueue("nonDurableQueue", "condition2", null, queue2);
//Check they're there
@@ -247,37 +247,37 @@
{
office = createPostOffice();
- Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding1 =
- office.bindQueue("queue1", "condition1", queue1);
+ office.bindQueue("queue1", "condition1", null, queue1);
- Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding2 =
- office.bindQueue("queue2", "condition1", queue2);
+ office.bindQueue("queue2", "condition1", null, queue2);
- Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding3 =
- office.bindQueue("queue3", "condition1", queue3);
+ office.bindQueue("queue3", "condition1", null, queue3);
- Queue queue4 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue4 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding4 =
- office.bindQueue("queue4", "condition1", queue4);
+ office.bindQueue("queue4", "condition1", null, queue4);
- Queue queue5 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue5 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding5 =
- office.bindQueue("queue5", "condition2", queue5);
+ office.bindQueue("queue5", "condition2", null, queue5);
- Queue queue6 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue6 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding6 =
- office.bindQueue("queue6", "condition2", queue6);
+ office.bindQueue("queue6", "condition2", null, queue6);
- Queue queue7 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue7 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding7 =
- office.bindQueue("queue7", "condition2", queue7);
+ office.bindQueue("queue7", "condition2", null, queue7);
- Queue queue8 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue8 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding8 =
- office.bindQueue("queue8", "condition2", queue8);
+ office.bindQueue("queue8", "condition2", null, queue8);
List bindings = office.listBindingsForCondition("dummy");
@@ -353,29 +353,29 @@
postOffice = createPostOffice();
- Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding1 =
- postOffice.bindQueue("queue1", "topic1", queue1);
+ postOffice.bindQueue("queue1", "topic1", null, queue1);
- Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding2 =
- postOffice.bindQueue("queue2", "topic1", queue2);
+ postOffice.bindQueue("queue2", "topic1", null, queue2);
- Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding3 =
- postOffice.bindQueue("queue3", "topic1", queue3);
+ postOffice.bindQueue("queue3", "topic1", null, queue3);
- Queue queue4 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue4 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding4 =
- postOffice.bindQueue("queue4", "topic2", queue4);
+ postOffice.bindQueue("queue4", "topic2", null, queue4);
- Queue queue5 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue5 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding5 =
- postOffice.bindQueue("queue5", "topic2", queue5);
+ postOffice.bindQueue("queue5", "topic2", null, queue5);
- Queue queue6 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue6 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding6 =
- postOffice.bindQueue("queue6", "topic2", queue6);
+ postOffice.bindQueue("queue6", "topic2", null, queue6);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
queue1.add(receiver1);
@@ -494,9 +494,9 @@
{
postOffice = createPostOffice();
- Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding1 =
- postOffice.bindQueue("queue1", "condition1", queue1);
+ postOffice.bindQueue("queue1", "condition1", null, queue1);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
queue1.add(receiver1);
@@ -538,29 +538,29 @@
{
postOffice = createPostOffice();
- Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding1 =
- postOffice.bindQueue("queue1", "topic1", queue1);
+ postOffice.bindQueue("queue1", "topic1", null, queue1);
- Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding2 =
- postOffice.bindQueue("queue2", "topic1", queue2);
+ postOffice.bindQueue("queue2", "topic1", null, queue2);
- Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding3 =
- postOffice.bindQueue("queue3", "topic1", queue3);
+ postOffice.bindQueue("queue3", "topic1", null, queue3);
- Queue queue4 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue4 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding4 =
- postOffice.bindQueue("queue4", "topic2", queue4);
+ postOffice.bindQueue("queue4", "topic2", null, queue4);
- Queue queue5 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue5 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding5 =
- postOffice.bindQueue("queue5", "topic2", queue5);
+ postOffice.bindQueue("queue5", "topic2", null, queue5);
- Queue queue6 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue6 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding6 =
- postOffice.bindQueue("queue6", "topic2", queue6);
+ postOffice.bindQueue("queue6", "topic2", null, queue6);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
queue1.add(receiver1);
@@ -701,13 +701,13 @@
{
postOffice = createPostOffice();
- Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding1 =
- postOffice.bindQueue("queue1", "topic1", queue1);
+ postOffice.bindQueue("queue1", "topic1", null, queue1);
- Queue queue2 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+ Queue queue2 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
Binding binding2 =
- postOffice.bindQueue("queue2", "topic1", queue2);
+ postOffice.bindQueue("queue2", "topic1", null, queue2);
SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
queue1.add(receiver1);
@@ -962,8 +962,10 @@
protected void assertEquivalent(Binding binding1, Binding binding2)
{
assertEquals(binding1.getNodeId(), binding2.getNodeId());
- assertEquals(binding1.getQueueName(), binding2.getQueueName());
- assertEquals(binding1.getSelector(), binding2.getSelector());
+ assertEquals(binding1.getQueueName(), binding2.getQueueName());
+ String selector1 = binding1.getFilter() != null ? binding1.getFilter().getFilterString() : null;
+ String selector2 = binding2.getFilter() != null ? binding2.getFilter().getFilterString() : null;
+ assertEquals(selector1, selector2);
assertEquals(binding1.getChannelId(), binding2.getChannelId());
assertEquals(binding1.isDurable(), binding2.isDurable());
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -272,32 +272,26 @@
final int NUM_MESSAGES = 20;
//Send some messages
- log.info("********* SENDING MSGS");
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
producer.send(m);
}
- log.info("*********** SENT MSGSSGS");
-
+
assertRemainingMessages(0);
producerSess.rollback();
//Send some messages
- log.info("********* SENDING MOREMSGS");
for (int i = 0; i < NUM_MESSAGES; i++)
{
Message m = producerSess.createMessage();
producer.send(m);
}
- log.info("********* SENT MORE MSGS");
assertRemainingMessages(0);
- log.info("COMMITTING");
producerSess.commit();
- log.info("COMMITTED");
-
+
assertRemainingMessages(NUM_MESSAGES);
log.trace("Sent messages");
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -259,7 +259,7 @@
"<invoker transport=\"socket\">\n" +
"<attribute name=\"marshaller\" isParam=\"true\">org.jboss.jms.server.remoting.JMSWireFormat</attribute>\n" +
"<attribute name=\"unmarshaller\" isParam=\"true\">org.jboss.jms.server.remoting.JMSWireFormat</attribute>\n" +
- "<attribute name=\"serializationtype\" isParam=\"true\">jboss</attribute>\n" +
+ "<attribute name=\"serializationtype\" isParam=\"true\">jms</attribute>\n" +
"<attribute name=\"dataType\" isParam=\"true\">jms</attribute>\n" +
"<attribute name=\"serverBindPort\">" + port +"</attribute>\n" +
"<attribute name=\"socket.check_connection\" isParam=\"true\">false</attribute>\n" +
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-09-15 17:44:02 UTC (rev 1295)
@@ -79,6 +79,7 @@
import org.jboss.tm.usertx.client.ServerVMClientUserTransaction;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.serialization.SerializationStreamFactory;
/**
@@ -972,10 +973,13 @@
private void startRemoting(boolean multiplex) throws Exception
{
+ SerializationStreamFactory.setManagerClassName(
+ "jms", "org.jboss.jms.server.remoting.MessagingSerializationManager");
+
RemotingJMXWrapper mbean;
String serializationType = config.getSerializationType();
-
+
String params = "/?marshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
"unmarshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
"serializationtype=" + serializationType + "&" +
More information about the jboss-cvs-commits
mailing list