[Jboss-cvs] JBoss Messaging SVN: r1274 - in trunk: src/main/org/jboss/jms/message src/main/org/jboss/jms/server/remoting src/main/org/jboss/messaging/core/message src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/util tests/src/org/jboss/test/messaging/core/plugin
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Sep 10 13:04:30 EDT 2006
Author: timfox
Date: 2006-09-10 13:04:21 -0400 (Sun, 10 Sep 2006)
New Revision: 1274
Added:
trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java
trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java
trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java
trunk/src/main/org/jboss/messaging/util/StreamUtils.java
Modified:
trunk/src/main/org/jboss/jms/message/JBossMapMessage.java
trunk/src/main/org/jboss/jms/message/JBossMessage.java
trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java
trunk/src/main/org/jboss/jms/server/remoting/CountingOutputStream.java
trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java
trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
Log:
More clustering work
Modified: trunk/src/main/org/jboss/jms/message/JBossMapMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossMapMessage.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/message/JBossMapMessage.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -35,6 +35,7 @@
import javax.jms.MessageFormatException;
import org.jboss.jms.destination.JBossDestination;
+import org.jboss.messaging.util.StreamUtils;
import org.jboss.util.Primitives;
/**
@@ -504,21 +505,14 @@
protected void writePayloadExternal(ObjectOutput out, Serializable thePayload) throws IOException
{
- writeMap(out, ((Map)getPayload()), true);
+ StreamUtils.writeMap(out, ((Map)getPayload()), true);
}
protected Serializable readPayloadExternal(ObjectInput in, int length)
throws IOException, ClassNotFoundException
{
- Map m = readMap(in, true);
- if (!(m instanceof HashMap))
- {
- return new HashMap(m);
- }
- else
- {
- return (HashMap)m;
- }
+ HashMap m = StreamUtils.readMap(in, true);
+ return m;
}
// Private -------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossMessage.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/message/JBossMessage.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -28,8 +28,8 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
-import java.util.Iterator;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
@@ -50,6 +50,7 @@
import org.jboss.jms.destination.JBossTopic;
import org.jboss.jms.util.MessagingJMSException;
import org.jboss.messaging.core.message.MessageSupport;
+import org.jboss.messaging.util.StreamUtils;
import org.jboss.util.Primitives;
import org.jboss.util.Strings;
@@ -74,16 +75,22 @@
// Constants -----------------------------------------------------
private static final long serialVersionUID = 8341387096828690976L;
-
+
public static final byte TYPE = 0;
- private static final int QUEUE = 1;
+ private static final byte NULL = 0;
- private static final int TOPIC = 2;
+ private static final byte STRING = 1;
- private static final int TEMP_QUEUE = 3;
+ private static final byte BYTES = 2;
- private static final int TEMP_TOPIC = 4;
+ private static final byte QUEUE = 3;
+
+ private static final byte TOPIC = 4;
+
+ private static final byte TEMP_QUEUE = 5;
+
+ private static final byte TEMP_TOPIC = 6;
// Static --------------------------------------------------------
@@ -949,7 +956,7 @@
out.writeUTF(jmsType);
}
- writeMap(out, properties, true);
+ StreamUtils.writeMap(out, properties, true);
if (correlationID == null && correlationIDBytes == null)
{
@@ -991,7 +998,7 @@
jmsType = in.readUTF();
}
- Map m = readMap(in, true);
+ Map m = StreamUtils.readMap(in, true);
if (!(m instanceof HashMap))
{
properties = new HashMap(m);
Modified: trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -36,6 +36,7 @@
import javax.jms.StreamMessage;
import org.jboss.jms.destination.JBossDestination;
+import org.jboss.messaging.util.StreamUtils;
import org.jboss.util.Primitives;
/**
@@ -695,21 +696,14 @@
protected void writePayloadExternal(ObjectOutput out, Serializable thePayload) throws IOException
{
- writeList(out, (List)thePayload);
+ StreamUtils.writeList(out, (List)thePayload);
}
protected Serializable readPayloadExternal(ObjectInput in, int length)
throws IOException, ClassNotFoundException
{
- List l = readList(in);
- if (!(l instanceof ArrayList))
- {
- return new ArrayList(l);
- }
- else
- {
- return (ArrayList)l;
- }
+ ArrayList l = StreamUtils.readList(in);
+ return l;
}
// Private -------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/remoting/CountingOutputStream.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/CountingOutputStream.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/server/remoting/CountingOutputStream.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -32,7 +32,7 @@
*
* Counts how many bytes written, used as a diagnostic tool
* during development when we want to see exactly what's being written,
- * typically to sockets OuputStreams.
+ * typically to socket output streams.
*
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
* @version 1.1
Added: trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -0,0 +1,54 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.remoting;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+
+/**
+ * A MessagingObjectInputStream
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MessagingObjectInputStream extends ObjectInputStream
+{
+ private InputStream in;
+
+ public MessagingObjectInputStream(InputStream in) throws IOException, SecurityException
+ {
+ super(in);
+
+ this.in = in;
+
+ }
+
+ public InputStream getUnderlyingStream()
+ {
+ return in;
+ }
+
+}
Added: trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.remoting;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+/**
+ * A MessagingObjectOutputStream
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MessagingObjectOutputStream extends ObjectOutputStream
+{
+ private OutputStream out;
+
+ public MessagingObjectOutputStream(OutputStream out) throws IOException
+ {
+ super(out);
+ this.out = out;
+ }
+
+ public OutputStream getUnderlyingStream()
+ {
+ return out;
+ }
+
+}
Added: trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.remoting;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+import org.jboss.remoting.serialization.IMarshalledValue;
+import org.jboss.remoting.serialization.SerializationManager;
+
+/**
+ * A MessagingSerializationManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MessagingSerializationManager extends SerializationManager
+{
+ public IMarshalledValue createdMarshalledValue(Object arg0) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public ObjectInputStream createInput(InputStream in, ClassLoader cl) throws IOException
+ {
+ return new MessagingObjectInputStream(in);
+ }
+
+ public ObjectOutputStream createOutput(OutputStream out) throws IOException
+ {
+ return new MessagingObjectOutputStream(out);
+ }
+
+ public IMarshalledValue createMarshalledValueForClone(Object arg0) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object receiveObject(InputStream arg0, ClassLoader arg1) throws IOException, ClassNotFoundException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void sendObject(ObjectOutputStream arg0, Object arg1) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -30,6 +30,7 @@
import java.util.Map;
import org.jboss.messaging.core.Message;
+import org.jboss.messaging.util.StreamUtils;
import org.jboss.serial.io.JBossObjectInputStream;
import org.jboss.serial.io.JBossObjectOutputStream;
@@ -338,7 +339,7 @@
*/
protected void writePayloadExternal(ObjectOutput out, Serializable thePayload) throws IOException
{
- internalWriteObject(out, thePayload, true, true);
+ StreamUtils.writeObject(out, thePayload, true, true);
}
/**
@@ -347,7 +348,7 @@
protected Serializable readPayloadExternal(ObjectInput in, int length)
throws IOException, ClassNotFoundException
{
- return internalReadObject(in, true);
+ return (Serializable)StreamUtils.readObject(in, true);
}
/**
Modified: trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -26,17 +26,13 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Routable;
-import org.jboss.messaging.util.SafeUTF;
-import org.jboss.util.Primitives;
+import org.jboss.messaging.util.StreamUtils;
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -52,293 +48,12 @@
private static final Logger log = Logger.getLogger(RoutableSupport.class);
- /** A byte property */
- protected static final byte BYTE = 0;
- /** A short property */
- protected static final byte SHORT = 1;
- /** An integer property */
- protected static final byte INT = 2;
- /** A long property */
- protected static final byte LONG = 3;
- /** A float property */
- protected static final byte FLOAT = 4;
- /** A double property */
- protected static final byte DOUBLE = 5;
- /** A boolean property */
- protected static final byte BOOLEAN = 6;
- /** A string property */
- protected static final byte STRING = 7;
- /** An object property */
- protected static final byte OBJECT = 8;
- /** A null property */
- protected static final byte NULL = 9;
-
- protected static final byte BYTES = 10;
-
- protected static final byte MAP = 11;
-
- protected static final byte LIST = 12;
+
// Static --------------------------------------------------------
- public static void writeList(ObjectOutput out, List list) throws IOException
- {
- out.writeInt(list.size());
- Iterator iter = list.iterator();
- while (iter.hasNext())
- {
- Object value = iter.next();
- if (value != null && !(value instanceof Serializable))
- {
- throw new IOException("Object in List must be serializable: " + value);
- }
- internalWriteObject(out, (Serializable)value, false, false);
- }
- }
- public static List readList(ObjectInput in) throws ClassNotFoundException, IOException
- {
- int size = in.readInt();
- ArrayList list = new ArrayList(size);
- for (int i = 0; i < size; i++)
- {
- Object obj = internalReadObject(in, false);
- list.add(obj);
- }
- return list;
- }
- protected static Serializable internalReadObject(ObjectInput in, boolean longStrings) throws IOException, ClassNotFoundException
- {
- byte type = in.readByte();
- Serializable value = null;
- switch (type)
- {
- case BYTE :
- value = new Byte(in.readByte());
- break;
- case SHORT :
- value = new Short(in.readShort());
- break;
- case INT :
- value = new Integer(in.readInt());
- break;
- case LONG :
- value = new Long(in.readLong());
- break;
- case FLOAT :
- value = new Float(in.readFloat());
- break;
- case DOUBLE :
- value = new Double(in.readDouble());
- break;
- case BOOLEAN :
- value = Primitives.valueOf(in.readBoolean());
- break;
- case STRING :
- if (longStrings)
- {
- //We cope with >= 64K Strings
- value = SafeUTF.instance.safeReadUTF(in);
- }
- else
- {
- //Limited to < 64K Strings
- value = in.readUTF();
- }
- break;
- case BYTES :
- int size = in.readInt();
- byte[] bytes = new byte[size];
- in.readFully(bytes);
- return bytes;
- case MAP:
- {
- Map m = readMap(in, false);
- if (m instanceof HashMap)
- {
- return (HashMap)m;
- }
- else
- {
- return new HashMap(m);
- }
- }
- case LIST:
- {
- List l = readList(in);
- if (l instanceof ArrayList)
- {
- return (ArrayList)l;
- }
- else
- {
- return new ArrayList(l);
- }
- }
- case NULL:
- value = null;
- break;
- default :
- value = (Serializable)in.readObject();
- }
- return value;
- }
-
- protected static void internalWriteObject(ObjectOutput out, Serializable value,
- boolean containerTypes, boolean longStrings) throws IOException
- {
- // We cheat with some often used types - more efficient than using object serialization
- if (value == null)
- {
- out.writeByte(NULL);
- }
- else if (value instanceof String)
- {
- out.writeByte(STRING);
- if (longStrings)
- {
- //We can cope with >=64K Strings
- SafeUTF.instance.safeWriteUTF(out, (String)value);
- }
- else
- {
- //Limited to < 64K Strings
- out.writeUTF((String)value);
- }
- }
- else if (value instanceof Integer)
- {
- out.writeByte(INT);
- out.writeInt(((Integer) value).intValue());
- }
- else if (value instanceof Boolean)
- {
- out.writeByte(BOOLEAN);
- out.writeBoolean(((Boolean) value).booleanValue());
- }
- else if (value instanceof Byte)
- {
- out.writeByte(BYTE);
- out.writeByte(((Byte) value).byteValue());
- }
- else if (value instanceof Short)
- {
- out.writeByte(SHORT);
- out.writeShort(((Short) value).shortValue());
- }
- else if (value instanceof Long)
- {
- out.writeByte(LONG);
- out.writeLong(((Long) value).longValue());
- }
- else if (value instanceof Float)
- {
- out.writeByte(FLOAT);
- out.writeFloat(((Float) value).floatValue());
- }
- else if (value instanceof Double)
- {
- out.writeByte(DOUBLE);
- out.writeDouble(((Double) value).doubleValue());
- }
- else if (value instanceof byte[])
- {
- out.writeByte(BYTES);
- byte[] bytes = (byte[])value;
- out.writeInt(bytes.length);
- out.write(bytes);
- }
- else if (containerTypes && value instanceof ArrayList)
- {
- out.write(LIST);
- writeList(out, (List)value);
- }
- else if (containerTypes && value instanceof HashMap)
- {
- out.write(MAP);
- writeMap(out, (Map)value, false);
- }
- else
- {
- // Default to standard serialization
- out.writeByte(OBJECT);
- out.writeObject(value);
- }
- }
-
- public static void writeMap(ObjectOutput out, Map map, boolean stringKeys) throws IOException
- {
- if (map.isEmpty())
- {
- out.writeByte(NULL);
- }
- else
- {
- out.writeByte(MAP);
- Set entrySet = map.entrySet();
- out.writeInt(entrySet.size());
- for (Iterator it = entrySet.iterator(); it.hasNext(); )
- {
- Map.Entry me = (Map.Entry)it.next();
-
- //Write the key
- if (stringKeys)
- {
- out.writeUTF((String)me.getKey());
- }
- else
- {
- if (!(me.getKey() instanceof Serializable))
- {
- throw new IOException("Key in map must be Serializable: " + me.getKey());
- }
- internalWriteObject(out, (Serializable)me.getKey(), false, false);
- }
-
- Object value = me.getValue();
- if (value != null && !(value instanceof Serializable))
- {
- throw new IOException("Value in map must be Serializable: " + value);
- }
-
- // write the value
- internalWriteObject(out, (Serializable)value, false, false);
- }
- }
- }
-
- public static Map readMap(ObjectInput in, boolean stringKeys) throws IOException, ClassNotFoundException
- {
- byte b = in.readByte();
- if (b == NULL)
- {
- return new HashMap();
- }
- else
- {
- int size = in.readInt();
- HashMap m = new HashMap(size);
- for (int i = 0; i < size; i++)
- {
- Object key;
- if (stringKeys)
- {
- key = in.readUTF();
- }
- else
- {
- key = internalReadObject(in, false);
- }
-
- Object value = internalReadObject(in, false);
-
- m.put(key, value);
- }
- return m;
- }
- }
-
// Attributes ----------------------------------------------------
private boolean trace = log.isTraceEnabled();
@@ -535,7 +250,7 @@
out.writeBoolean(reliable);
out.writeLong(expiration);
out.writeLong(timestamp);
- writeMap(out, headers, true);
+ StreamUtils.writeMap(out, headers, true);
out.writeBoolean(redelivered);
out.writeByte(priority);
out.writeInt(deliveryCount);
@@ -547,7 +262,7 @@
reliable = in.readBoolean();
expiration = in.readLong();
timestamp = in.readLong();
- Map m = readMap(in, true);
+ Map m = StreamUtils.readMap(in, true);
if (!(m instanceof HashMap))
{
headers = new HashMap(m);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -54,12 +54,12 @@
import org.jboss.messaging.core.message.CoreMessage;
import org.jboss.messaging.core.message.MessageFactory;
import org.jboss.messaging.core.message.MessageSupport;
-import org.jboss.messaging.core.message.RoutableSupport;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.core.tx.XidImpl;
import org.jboss.messaging.util.JDBCUtil;
+import org.jboss.messaging.util.StreamUtils;
import org.jboss.messaging.util.Util;
import org.jboss.serial.io.JBossObjectInputStream;
import org.jboss.serial.io.JBossObjectOutputStream;
@@ -3436,7 +3436,7 @@
oos = new JBossObjectOutputStream(bos);
- RoutableSupport.writeMap(oos, map, true);
+ StreamUtils.writeMap(oos, map, true);
return bos.toByteArray();
}
@@ -3464,7 +3464,7 @@
ois = new JBossObjectInputStream(bis);
- Map m = RoutableSupport.readMap(ois, true);
+ Map m = StreamUtils.readMap(ois, true);
HashMap map;
if (!(m instanceof HashMap))
{
Added: trunk/src/main/org/jboss/messaging/util/StreamUtils.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/StreamUtils.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/messaging/util/StreamUtils.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -0,0 +1,315 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.util.Primitives;
+
+/**
+ * A StreamUtils
+ *
+ * Utility methods for reading and writing stuff to and from streams
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class StreamUtils
+{
+ protected static final byte BYTE = 0;
+
+ protected static final byte SHORT = 1;
+
+ protected static final byte INT = 2;
+
+ protected static final byte LONG = 3;
+
+ protected static final byte FLOAT = 4;
+
+ protected static final byte DOUBLE = 5;
+
+ protected static final byte BOOLEAN = 6;
+
+ protected static final byte STRING = 7;
+
+ protected static final byte SERIALIZABLE = 8;
+
+ protected static final byte NULL = 9;
+
+ protected static final byte BYTES = 10;
+
+ protected static final byte MAP = 11;
+
+ protected static final byte LIST = 12;
+
+ protected static final byte NOT_NULL = 13;
+
+ public static Object readObject(ObjectInput in, boolean longStrings)
+ throws IOException, ClassNotFoundException
+ {
+ byte type = in.readByte();
+ Object value = null;
+ switch (type)
+ {
+ case BYTE :
+ value = new Byte(in.readByte());
+ break;
+ case SHORT :
+ value = new Short(in.readShort());
+ break;
+ case INT :
+ value = new Integer(in.readInt());
+ break;
+ case LONG :
+ value = new Long(in.readLong());
+ break;
+ case FLOAT :
+ value = new Float(in.readFloat());
+ break;
+ case DOUBLE :
+ value = new Double(in.readDouble());
+ break;
+ case BOOLEAN :
+ value = Primitives.valueOf(in.readBoolean());
+ break;
+ case STRING :
+ if (longStrings)
+ {
+ //We cope with >= 64K Strings
+ value = SafeUTF.instance.safeReadUTF(in);
+ }
+ else
+ {
+ //Limited to < 64K Strings
+ value = in.readUTF();
+ }
+ break;
+ case BYTES :
+ int size = in.readInt();
+ byte[] bytes = new byte[size];
+ in.readFully(bytes);
+ value = bytes;
+ break;
+ case MAP:
+ {
+ value = readMap(in, false);
+ break;
+ }
+ case LIST:
+ {
+ value = readList(in);
+ break;
+ }
+ case SERIALIZABLE:
+ {
+ value = (Serializable)in.readObject();
+ break;
+ }
+ case NULL:
+ {
+ value = null;
+ break;
+ }
+ default :
+ {
+ throw new IllegalStateException("Unknown type: " + type);
+ }
+ }
+ return value;
+ }
+
+ public static void writeObject(ObjectOutput out, Object object,
+ boolean containerTypes, boolean longStrings) throws IOException
+ {
+ // We cheat with some often used types - more efficient than using object serialization
+ if (object == null)
+ {
+ out.writeByte(NULL);
+ }
+ else if (object instanceof String)
+ {
+ out.writeByte(STRING);
+ if (longStrings)
+ {
+ //We can cope with >=64K Strings
+ SafeUTF.instance.safeWriteUTF(out, (String)object);
+ }
+ else
+ {
+ //Limited to < 64K Strings
+ out.writeUTF((String)object);
+ }
+ }
+ else if (object instanceof Integer)
+ {
+ out.writeByte(INT);
+ out.writeInt(((Integer) object).intValue());
+ }
+ else if (object instanceof Boolean)
+ {
+ out.writeByte(BOOLEAN);
+ out.writeBoolean(((Boolean) object).booleanValue());
+ }
+ else if (object instanceof Byte)
+ {
+ out.writeByte(BYTE);
+ out.writeByte(((Byte) object).byteValue());
+ }
+ else if (object instanceof Short)
+ {
+ out.writeByte(SHORT);
+ out.writeShort(((Short) object).shortValue());
+ }
+ else if (object instanceof Long)
+ {
+ out.writeByte(LONG);
+ out.writeLong(((Long) object).longValue());
+ }
+ else if (object instanceof Float)
+ {
+ out.writeByte(FLOAT);
+ out.writeFloat(((Float) object).floatValue());
+ }
+ else if (object instanceof Double)
+ {
+ out.writeByte(DOUBLE);
+ out.writeDouble(((Double) object).doubleValue());
+ }
+ else if (object instanceof byte[])
+ {
+ out.writeByte(BYTES);
+ byte[] bytes = (byte[])object;
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ }
+ else if (containerTypes && object instanceof ArrayList)
+ {
+ out.write(LIST);
+ writeList(out, (List)object);
+ }
+ else if (containerTypes && object instanceof HashMap)
+ {
+ out.write(MAP);
+ writeMap(out, (Map)object, false);
+ }
+ else if (object instanceof Serializable)
+ {
+ out.writeByte(SERIALIZABLE);
+ out.writeObject(object);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Don't know how to deal with object " + object);
+ }
+ }
+
+ public static void writeList(ObjectOutput out, List list) throws IOException
+ {
+ out.writeInt(list.size());
+ Iterator iter = list.iterator();
+ while (iter.hasNext())
+ {
+ Object value = iter.next();
+ writeObject(out, value, false, false);
+ }
+ }
+
+ public static ArrayList readList(ObjectInput in) throws ClassNotFoundException, IOException
+ {
+ int size = in.readInt();
+ ArrayList list = new ArrayList(size);
+ for (int i = 0; i < size; i++)
+ {
+ Object obj = readObject(in, false);
+ list.add(obj);
+ }
+ return list;
+ }
+
+ public static void writeMap(ObjectOutput out, Map map, boolean stringKeys) throws IOException
+ {
+ Set entrySet = map.entrySet();
+ out.writeInt(entrySet.size());
+ for (Iterator it = entrySet.iterator(); it.hasNext(); )
+ {
+ Map.Entry me = (Map.Entry)it.next();
+
+ //Write the key
+ if (stringKeys)
+ {
+ out.writeUTF((String)me.getKey());
+ }
+ else
+ {
+ if (!(me.getKey() instanceof Serializable))
+ {
+ throw new IOException("Key in map must be Serializable: " + me.getKey());
+ }
+ writeObject(out, (Serializable)me.getKey(), false, false);
+ }
+
+ Object value = me.getValue();
+ if (value != null && !(value instanceof Serializable))
+ {
+ throw new IOException("Value in map must be Serializable: " + value);
+ }
+
+ // write the value
+ writeObject(out, (Serializable)value, false, false);
+ }
+ }
+
+ public static HashMap readMap(ObjectInput in, boolean stringKeys) throws IOException, ClassNotFoundException
+ {
+ int size = in.readInt();
+ HashMap m = new HashMap(size);
+ for (int i = 0; i < size; i++)
+ {
+ Object key;
+ if (stringKeys)
+ {
+ key = in.readUTF();
+ }
+ else
+ {
+ key = readObject(in, false);
+ }
+
+ Object value = readObject(in, false);
+
+ m.put(key, value);
+ }
+ return m;
+ }
+}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java 2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java 2006-09-10 17:04:21 UTC (rev 1274)
@@ -73,48 +73,6 @@
super.tearDown();
}
- public final void testClusteredBindSameName() throws Throwable
- {
- ClusteredPostOffice office1 = null;
-
- ClusteredPostOffice office2 = null;
-
- try
- {
- office1 = createClusteredPostOffice("node1", "testgroup");
-
- office2 = createClusteredPostOffice("node2", "testgroup");
-
- Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
-
- Binding binding1 =
- office1.bindQueue("sub1", "topic1", queue1);
-
- try
- {
- office2.bindQueue("sub1", "topic1", queue1);
- fail();
- }
- catch (IllegalArgumentException e)
- {
- //OK - this should fail
- }
- }
- finally
- {
- if (office1 != null)
- {
- office1.stop();
- }
-
- if (office2 != null)
- {
- office2.stop();
- }
- }
- }
-
-
public final void testClusteredBindUnbind() throws Throwable
{
ClusteredPostOffice office1 = null;
More information about the jboss-cvs-commits
mailing list