[Jboss-cvs] JBoss Messaging SVN: r1274 - in trunk: src/main/org/jboss/jms/message src/main/org/jboss/jms/server/remoting src/main/org/jboss/messaging/core/message src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/util tests/src/org/jboss/test/messaging/core/plugin

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Sep 10 13:04:30 EDT 2006


Author: timfox
Date: 2006-09-10 13:04:21 -0400 (Sun, 10 Sep 2006)
New Revision: 1274

Added:
   trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java
   trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java
   trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java
   trunk/src/main/org/jboss/messaging/util/StreamUtils.java
Modified:
   trunk/src/main/org/jboss/jms/message/JBossMapMessage.java
   trunk/src/main/org/jboss/jms/message/JBossMessage.java
   trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java
   trunk/src/main/org/jboss/jms/server/remoting/CountingOutputStream.java
   trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java
   trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
Log:
More clustering work



Modified: trunk/src/main/org/jboss/jms/message/JBossMapMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossMapMessage.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/message/JBossMapMessage.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -35,6 +35,7 @@
 import javax.jms.MessageFormatException;
 
 import org.jboss.jms.destination.JBossDestination;
+import org.jboss.messaging.util.StreamUtils;
 import org.jboss.util.Primitives;
 
 /**
@@ -504,21 +505,14 @@
 
    protected void writePayloadExternal(ObjectOutput out, Serializable thePayload) throws IOException
    {
-      writeMap(out, ((Map)getPayload()), true);
+      StreamUtils.writeMap(out, ((Map)getPayload()), true);
    }
 
    protected Serializable readPayloadExternal(ObjectInput in, int length)
       throws IOException, ClassNotFoundException
    {
-      Map m = readMap(in, true);
-      if (!(m instanceof HashMap))
-      {
-         return new HashMap(m);
-      }
-      else
-      {
-         return (HashMap)m;
-      }
+      HashMap m = StreamUtils.readMap(in, true);
+      return m;
    }
 
    // Private -------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossMessage.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/message/JBossMessage.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -28,8 +28,8 @@
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Iterator;
 
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
@@ -50,6 +50,7 @@
 import org.jboss.jms.destination.JBossTopic;
 import org.jboss.jms.util.MessagingJMSException;
 import org.jboss.messaging.core.message.MessageSupport;
+import org.jboss.messaging.util.StreamUtils;
 import org.jboss.util.Primitives;
 import org.jboss.util.Strings;
 
@@ -74,16 +75,22 @@
    // Constants -----------------------------------------------------
    
    private static final long serialVersionUID = 8341387096828690976L;
-   
+         
    public static final byte TYPE = 0;
    
-   private static final int QUEUE = 1;
+   private static final byte NULL = 0;
    
-   private static final int TOPIC = 2;
+   private static final byte STRING = 1;
    
-   private static final int TEMP_QUEUE = 3;
+   private static final byte BYTES = 2;
    
-   private static final int TEMP_TOPIC = 4;
+   private static final byte QUEUE = 3;
+   
+   private static final byte TOPIC = 4;
+   
+   private static final byte TEMP_QUEUE = 5;
+   
+   private static final byte TEMP_TOPIC = 6;
 
    // Static --------------------------------------------------------
 
@@ -949,7 +956,7 @@
          out.writeUTF(jmsType);
       }
             
-      writeMap(out, properties, true);
+      StreamUtils.writeMap(out, properties, true);
       
       if (correlationID == null && correlationIDBytes == null)
       {
@@ -991,7 +998,7 @@
          jmsType = in.readUTF();
       }
       
-      Map m = readMap(in, true);
+      Map m = StreamUtils.readMap(in, true);
       if (!(m instanceof HashMap))
       {
          properties =  new HashMap(m);

Modified: trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -36,6 +36,7 @@
 import javax.jms.StreamMessage;
 
 import org.jboss.jms.destination.JBossDestination;
+import org.jboss.messaging.util.StreamUtils;
 import org.jboss.util.Primitives;
 
 /**
@@ -695,21 +696,14 @@
 
    protected void writePayloadExternal(ObjectOutput out, Serializable thePayload) throws IOException
    {
-      writeList(out, (List)thePayload);
+      StreamUtils.writeList(out, (List)thePayload);
    }
 
    protected Serializable readPayloadExternal(ObjectInput in, int length)
       throws IOException, ClassNotFoundException
    {
-      List l = readList(in);
-      if (!(l instanceof ArrayList))
-      {
-         return new ArrayList(l);
-      }
-      else
-      {
-         return (ArrayList)l;
-      }
+      ArrayList l = StreamUtils.readList(in);
+      return l;
    }
 
    // Private -------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/remoting/CountingOutputStream.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/CountingOutputStream.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/server/remoting/CountingOutputStream.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -32,7 +32,7 @@
  * 
  * Counts how many bytes written, used as a diagnostic tool
  * during development when we want to see exactly what's being written,
- * typically to sockets OuputStreams.
+ * typically to socket output streams.
  * 
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @version 1.1

Added: trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -0,0 +1,54 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.remoting;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+
+/**
+ * A MessagingObjectInputStream
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MessagingObjectInputStream extends ObjectInputStream
+{
+   private InputStream in;
+
+   public MessagingObjectInputStream(InputStream in) throws IOException, SecurityException
+   {
+      super(in);
+      
+      this.in = in;
+      
+   }
+   
+   public InputStream getUnderlyingStream()
+   {
+      return in;
+   }
+
+}

Added: trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.remoting;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+/**
+ * A MessagingObjectOutputStream
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MessagingObjectOutputStream extends ObjectOutputStream
+{
+   private OutputStream out;
+   
+   public MessagingObjectOutputStream(OutputStream out) throws IOException
+   {
+      super(out);
+      this.out = out;
+   }
+   
+   public OutputStream getUnderlyingStream()
+   {
+      return out;
+   }
+
+}

Added: trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.remoting;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+import org.jboss.remoting.serialization.IMarshalledValue;
+import org.jboss.remoting.serialization.SerializationManager;
+
+/**
+ * A MessagingSerializationManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MessagingSerializationManager extends SerializationManager
+{
+   public IMarshalledValue createdMarshalledValue(Object arg0) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public ObjectInputStream createInput(InputStream in, ClassLoader cl) throws IOException
+   {
+      return new MessagingObjectInputStream(in);
+   }
+   
+   public ObjectOutputStream createOutput(OutputStream out) throws IOException
+   {
+      return new MessagingObjectOutputStream(out);
+   }
+
+   public IMarshalledValue createMarshalledValueForClone(Object arg0) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+  
+   public Object receiveObject(InputStream arg0, ClassLoader arg1) throws IOException, ClassNotFoundException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void sendObject(ObjectOutputStream arg0, Object arg1) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -30,6 +30,7 @@
 import java.util.Map;
 
 import org.jboss.messaging.core.Message;
+import org.jboss.messaging.util.StreamUtils;
 import org.jboss.serial.io.JBossObjectInputStream;
 import org.jboss.serial.io.JBossObjectOutputStream;
 
@@ -338,7 +339,7 @@
     */
    protected void writePayloadExternal(ObjectOutput out, Serializable thePayload) throws IOException
    {
-      internalWriteObject(out, thePayload, true, true);
+      StreamUtils.writeObject(out, thePayload, true, true);
    }
 
    /**
@@ -347,7 +348,7 @@
    protected Serializable readPayloadExternal(ObjectInput in, int length)
       throws IOException, ClassNotFoundException
    {
-      return internalReadObject(in, true);
+      return (Serializable)StreamUtils.readObject(in, true);
    }
 
    /**

Modified: trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -26,17 +26,13 @@
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Routable;
-import org.jboss.messaging.util.SafeUTF;
-import org.jboss.util.Primitives;
+import org.jboss.messaging.util.StreamUtils;
 
 /**
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -52,293 +48,12 @@
 
    private static final Logger log = Logger.getLogger(RoutableSupport.class);
    
-   /** A byte property */
-   protected static final byte BYTE = 0;
-   /** A short property */
-   protected static final byte SHORT = 1;
-   /** An integer property */
-   protected static final byte INT = 2;
-   /** A long property */
-   protected static final byte LONG = 3;
-   /** A float property */
-   protected static final byte FLOAT = 4;
-   /** A double property */
-   protected static final byte DOUBLE = 5;
-   /** A boolean property */
-   protected static final byte BOOLEAN = 6;
-   /** A string property */
-   protected static final byte STRING = 7;
-   /** An object property */
-   protected static final byte OBJECT = 8;
-   /** A null property */
-   protected static final byte NULL = 9;
-   
-   protected static final byte BYTES = 10;
-   
-   protected static final byte MAP = 11;
-   
-   protected static final byte LIST = 12;
 
+
    // Static --------------------------------------------------------
    
-   public static void writeList(ObjectOutput out, List list) throws IOException
-   {
-      out.writeInt(list.size());
-      Iterator iter = list.iterator();
-      while (iter.hasNext())
-      {
-         Object value = iter.next();
-         if (value != null && !(value instanceof Serializable))
-         {
-            throw new IOException("Object in List must be serializable: " + value);
-         }
-         internalWriteObject(out, (Serializable)value, false, false);
-      }
-   }
    
-   public static List readList(ObjectInput in) throws ClassNotFoundException, IOException
-   {
-      int size = in.readInt();
-      ArrayList list = new ArrayList(size);
-      for (int i = 0; i < size; i++)
-      {
-         Object obj = internalReadObject(in, false);
-         list.add(obj);
-      }
-      return list;
-   }
    
-   protected static Serializable internalReadObject(ObjectInput in, boolean longStrings) throws IOException, ClassNotFoundException
-   {
-      byte type = in.readByte();
-      Serializable value = null;
-      switch (type)
-      {
-         case BYTE :
-            value = new Byte(in.readByte());
-            break;
-         case SHORT :
-            value = new Short(in.readShort());
-            break;
-         case INT :
-            value = new Integer(in.readInt());
-            break;
-         case LONG :
-            value = new Long(in.readLong());
-            break;
-         case FLOAT :
-            value = new Float(in.readFloat());
-            break;
-         case DOUBLE :
-            value = new Double(in.readDouble());
-            break;
-         case BOOLEAN :
-            value = Primitives.valueOf(in.readBoolean());
-            break;
-         case STRING :
-            if (longStrings)
-            {
-               //We cope with >= 64K Strings
-               value = SafeUTF.instance.safeReadUTF(in);
-            }
-            else
-            {
-               //Limited to < 64K Strings
-               value = in.readUTF();
-            }
-            break;
-         case BYTES :
-            int size = in.readInt();
-            byte[] bytes = new byte[size];
-            in.readFully(bytes);
-            return bytes;
-         case MAP:
-         {
-            Map m = readMap(in, false);
-            if (m instanceof HashMap)
-            {
-               return (HashMap)m;
-            }
-            else
-            {
-               return new HashMap(m);
-            }
-         }            
-         case LIST:
-         {
-            List l = readList(in);
-            if (l instanceof ArrayList)
-            {
-               return (ArrayList)l;
-            }
-            else
-            {
-               return new ArrayList(l);
-            }
-         }
-         case NULL:
-            value = null;
-            break;
-         default :
-            value = (Serializable)in.readObject();
-      }
-      return value;
-   }
-   
-   protected static void internalWriteObject(ObjectOutput out, Serializable value,
-                                             boolean containerTypes, boolean longStrings) throws IOException
-   {
-      // We cheat with some often used types - more efficient than using object serialization
-      if (value == null)
-      {
-         out.writeByte(NULL);
-      }
-      else if (value instanceof String)
-      {
-         out.writeByte(STRING);
-         if (longStrings)
-         {
-            //We can cope with >=64K Strings
-            SafeUTF.instance.safeWriteUTF(out, (String)value);
-         }
-         else
-         {
-            //Limited to < 64K Strings
-            out.writeUTF((String)value);
-         }
-      }
-      else if (value instanceof Integer)
-      {
-         out.writeByte(INT);
-         out.writeInt(((Integer) value).intValue());
-      }
-      else if (value instanceof Boolean)
-      {
-         out.writeByte(BOOLEAN);
-         out.writeBoolean(((Boolean) value).booleanValue());
-      }
-      else if (value instanceof Byte)
-      {
-         out.writeByte(BYTE);
-         out.writeByte(((Byte) value).byteValue());
-      }
-      else if (value instanceof Short)
-      {
-         out.writeByte(SHORT);
-         out.writeShort(((Short) value).shortValue());
-      }
-      else if (value instanceof Long)
-      {
-         out.writeByte(LONG);
-         out.writeLong(((Long) value).longValue());
-      }
-      else if (value instanceof Float)
-      {
-         out.writeByte(FLOAT);
-         out.writeFloat(((Float) value).floatValue());
-      }
-      else if (value instanceof Double)
-      {
-         out.writeByte(DOUBLE);
-         out.writeDouble(((Double) value).doubleValue());
-      }
-      else if (value instanceof byte[])
-      {
-         out.writeByte(BYTES);
-         byte[] bytes = (byte[])value;
-         out.writeInt(bytes.length);
-         out.write(bytes);
-      }      
-      else if (containerTypes && value instanceof ArrayList)
-      {
-         out.write(LIST);
-         writeList(out, (List)value);
-      }
-      else if (containerTypes && value instanceof HashMap)
-      {
-         out.write(MAP);
-         writeMap(out, (Map)value, false);
-      }
-      else
-      {
-         // Default to standard serialization
-         out.writeByte(OBJECT);
-         out.writeObject(value);
-      }
-   }
-   
-   public static void writeMap(ObjectOutput out, Map map, boolean stringKeys) throws IOException
-   {      
-      if (map.isEmpty())
-      {
-         out.writeByte(NULL);
-      }
-      else
-      {      
-         out.writeByte(MAP);
-         Set entrySet = map.entrySet();
-         out.writeInt(entrySet.size());
-         for (Iterator it = entrySet.iterator(); it.hasNext(); )
-         {
-            Map.Entry me = (Map.Entry)it.next();
-            
-            //Write the key
-            if (stringKeys)
-            {
-               out.writeUTF((String)me.getKey());
-            }
-            else
-            {
-               if (!(me.getKey() instanceof Serializable))
-               {
-                  throw new IOException("Key in map must be Serializable: " + me.getKey());
-               }
-               internalWriteObject(out, (Serializable)me.getKey(), false, false);
-            }
-
-            Object value = me.getValue();
-            if (value != null && !(value instanceof Serializable))
-            {
-               throw new IOException("Value in map must be Serializable: " + value);
-            }
-            
-            // write the value
-            internalWriteObject(out, (Serializable)value, false, false);
-         }
-      }
-   }
-   
-   public static Map readMap(ObjectInput in, boolean stringKeys) throws IOException, ClassNotFoundException
-   {
-      byte b = in.readByte();
-      if (b == NULL)
-      {
-         return new HashMap();
-      }
-      else
-      {      
-         int size = in.readInt();
-         HashMap m = new HashMap(size);
-         for (int i = 0; i < size; i++)
-         {
-            Object key;
-            if (stringKeys)
-            {
-               key = in.readUTF();
-            }
-            else
-            {
-               key = internalReadObject(in, false);
-            }
-            
-            Object value = internalReadObject(in, false);
-            
-            m.put(key, value);
-         }
-         return m;
-      }
-   }  
-   
    // Attributes ----------------------------------------------------
 
    private boolean trace = log.isTraceEnabled();
@@ -535,7 +250,7 @@
       out.writeBoolean(reliable);
       out.writeLong(expiration);
       out.writeLong(timestamp);
-      writeMap(out, headers, true);
+      StreamUtils.writeMap(out, headers, true);
       out.writeBoolean(redelivered);
       out.writeByte(priority);
       out.writeInt(deliveryCount);
@@ -547,7 +262,7 @@
       reliable = in.readBoolean();
       expiration = in.readLong();
       timestamp = in.readLong();
-      Map m = readMap(in, true);
+      Map m = StreamUtils.readMap(in, true);
       if (!(m instanceof HashMap))
       {
          headers =  new HashMap(m);

Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -54,12 +54,12 @@
 import org.jboss.messaging.core.message.CoreMessage;
 import org.jboss.messaging.core.message.MessageFactory;
 import org.jboss.messaging.core.message.MessageSupport;
-import org.jboss.messaging.core.message.RoutableSupport;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TxCallback;
 import org.jboss.messaging.core.tx.XidImpl;
 import org.jboss.messaging.util.JDBCUtil;
+import org.jboss.messaging.util.StreamUtils;
 import org.jboss.messaging.util.Util;
 import org.jboss.serial.io.JBossObjectInputStream;
 import org.jboss.serial.io.JBossObjectOutputStream;
@@ -3436,7 +3436,7 @@
          
          oos = new JBossObjectOutputStream(bos);
          
-         RoutableSupport.writeMap(oos, map, true);
+         StreamUtils.writeMap(oos, map, true);
          
          return bos.toByteArray();
       }
@@ -3464,7 +3464,7 @@
          
          ois = new JBossObjectInputStream(bis);
          
-         Map m = RoutableSupport.readMap(ois, true);
+         Map m = StreamUtils.readMap(ois, true);
          HashMap map;
          if (!(m instanceof HashMap))
          {

Added: trunk/src/main/org/jboss/messaging/util/StreamUtils.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/StreamUtils.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/src/main/org/jboss/messaging/util/StreamUtils.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -0,0 +1,315 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.util.Primitives;
+
+/**
+ * A StreamUtils
+ *
+ * Utility methods for reading and writing stuff to and from streams
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class StreamUtils
+{
+   protected static final byte BYTE = 0;
+
+   protected static final byte SHORT = 1;
+
+   protected static final byte INT = 2;
+
+   protected static final byte LONG = 3;
+
+   protected static final byte FLOAT = 4;
+
+   protected static final byte DOUBLE = 5;
+
+   protected static final byte BOOLEAN = 6;
+
+   protected static final byte STRING = 7;
+
+   protected static final byte SERIALIZABLE = 8;
+
+   protected static final byte NULL = 9;
+   
+   protected static final byte BYTES = 10;
+   
+   protected static final byte MAP = 11;
+   
+   protected static final byte LIST = 12;
+   
+   protected static final byte NOT_NULL = 13;
+         
+   public static Object readObject(ObjectInput in, boolean longStrings)
+      throws IOException, ClassNotFoundException
+   {
+      byte type = in.readByte();
+      Object value = null;
+      switch (type)
+      {
+         case BYTE :
+            value = new Byte(in.readByte());
+            break;
+         case SHORT :
+            value = new Short(in.readShort());
+            break;
+         case INT :
+            value = new Integer(in.readInt());
+            break;
+         case LONG :
+            value = new Long(in.readLong());
+            break;
+         case FLOAT :
+            value = new Float(in.readFloat());
+            break;
+         case DOUBLE :
+            value = new Double(in.readDouble());
+            break;
+         case BOOLEAN :
+            value = Primitives.valueOf(in.readBoolean());
+            break;
+         case STRING :
+            if (longStrings)
+            {
+               //We cope with >= 64K Strings
+               value = SafeUTF.instance.safeReadUTF(in);
+            }
+            else
+            {
+               //Limited to < 64K Strings
+               value = in.readUTF();
+            }
+            break;
+         case BYTES :
+            int size = in.readInt();
+            byte[] bytes = new byte[size];
+            in.readFully(bytes);
+            value = bytes;
+            break;
+         case MAP:
+         {
+            value = readMap(in, false);
+            break;
+         }            
+         case LIST:
+         {
+            value = readList(in);
+            break;
+         }
+         case SERIALIZABLE:
+         {
+            value = (Serializable)in.readObject();
+            break;
+         }                  
+         case NULL:
+         {
+            value = null;
+            break;
+         }
+         default :
+         {
+            throw new IllegalStateException("Unknown type: " + type);
+         }
+      }
+      return value;
+   }
+   
+   public static void writeObject(ObjectOutput out, Object object,
+                                  boolean containerTypes, boolean longStrings) throws IOException
+   {
+      // We cheat with some often used types - more efficient than using object serialization
+      if (object == null)
+      {
+         out.writeByte(NULL);
+      }
+      else if (object instanceof String)
+      {
+         out.writeByte(STRING);
+         if (longStrings)
+         {
+            //We can cope with >=64K Strings
+            SafeUTF.instance.safeWriteUTF(out, (String)object);
+         }
+         else
+         {
+            //Limited to < 64K Strings
+            out.writeUTF((String)object);
+         }
+      }
+      else if (object instanceof Integer)
+      {
+         out.writeByte(INT);
+         out.writeInt(((Integer) object).intValue());
+      }
+      else if (object instanceof Boolean)
+      {
+         out.writeByte(BOOLEAN);
+         out.writeBoolean(((Boolean) object).booleanValue());
+      }
+      else if (object instanceof Byte)
+      {
+         out.writeByte(BYTE);
+         out.writeByte(((Byte) object).byteValue());
+      }
+      else if (object instanceof Short)
+      {
+         out.writeByte(SHORT);
+         out.writeShort(((Short) object).shortValue());
+      }
+      else if (object instanceof Long)
+      {
+         out.writeByte(LONG);
+         out.writeLong(((Long) object).longValue());
+      }
+      else if (object instanceof Float)
+      {
+         out.writeByte(FLOAT);
+         out.writeFloat(((Float) object).floatValue());
+      }
+      else if (object instanceof Double)
+      {
+         out.writeByte(DOUBLE);
+         out.writeDouble(((Double) object).doubleValue());
+      }
+      else if (object instanceof byte[])
+      {
+         out.writeByte(BYTES);
+         byte[] bytes = (byte[])object;
+         out.writeInt(bytes.length);
+         out.write(bytes);
+      }      
+      else if (containerTypes && object instanceof ArrayList)
+      {
+         out.write(LIST);
+         writeList(out, (List)object);
+      }
+      else if (containerTypes && object instanceof HashMap)
+      {
+         out.write(MAP);
+         writeMap(out, (Map)object, false);
+      }
+      else if (object instanceof Serializable)
+      {
+         out.writeByte(SERIALIZABLE);
+         out.writeObject(object);
+      }
+      else
+      {
+         throw new IllegalArgumentException("Don't know how to deal with object " + object);
+      }
+   }  
+   
+   public static void writeList(ObjectOutput out, List list) throws IOException
+   {
+      out.writeInt(list.size());
+      Iterator iter = list.iterator();
+      while (iter.hasNext())
+      {
+         Object value = iter.next();
+         writeObject(out, value, false, false);
+      }
+   }
+   
+   public static ArrayList readList(ObjectInput in) throws ClassNotFoundException, IOException
+   {
+      int size = in.readInt();
+      ArrayList list = new ArrayList(size);
+      for (int i = 0; i < size; i++)
+      {
+         Object obj = readObject(in, false);
+         list.add(obj);
+      }
+      return list;
+   }
+   
+   public static void writeMap(ObjectOutput out, Map map, boolean stringKeys) throws IOException
+   {      
+      Set entrySet = map.entrySet();
+      out.writeInt(entrySet.size());
+      for (Iterator it = entrySet.iterator(); it.hasNext(); )
+      {
+         Map.Entry me = (Map.Entry)it.next();
+         
+         //Write the key
+         if (stringKeys)
+         {
+            out.writeUTF((String)me.getKey());
+         }
+         else
+         {
+            if (!(me.getKey() instanceof Serializable))
+            {
+               throw new IOException("Key in map must be Serializable: " + me.getKey());
+            }
+            writeObject(out, (Serializable)me.getKey(), false, false);
+         }
+
+         Object value = me.getValue();
+         if (value != null && !(value instanceof Serializable))
+         {
+            throw new IOException("Value in map must be Serializable: " + value);
+         }
+         
+         // write the value
+         writeObject(out, (Serializable)value, false, false);
+      }      
+   }
+   
+   public static HashMap readMap(ObjectInput in, boolean stringKeys) throws IOException, ClassNotFoundException
+   {     
+      int size = in.readInt();
+      HashMap m = new HashMap(size);
+      for (int i = 0; i < size; i++)
+      {
+         Object key;
+         if (stringKeys)
+         {
+            key = in.readUTF();
+         }
+         else
+         {
+            key = readObject(in, false);
+         }
+         
+         Object value = readObject(in, false);
+         
+         m.put(key, value);
+      }
+      return m;      
+   }  
+}

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java	2006-09-10 11:53:54 UTC (rev 1273)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java	2006-09-10 17:04:21 UTC (rev 1274)
@@ -73,48 +73,6 @@
       super.tearDown();
    }
    
-   public final void testClusteredBindSameName() throws Throwable
-   {
-      ClusteredPostOffice office1 = null;
-      
-      ClusteredPostOffice office2 = null;
-      
-      try
-      {         
-         office1 = createClusteredPostOffice("node1", "testgroup");
-         
-         office2 = createClusteredPostOffice("node2", "testgroup");
-         
-         Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
-         
-         Binding binding1 =
-            office1.bindQueue("sub1", "topic1", queue1);
-                  
-         try
-         {
-            office2.bindQueue("sub1", "topic1", queue1);
-            fail();
-         }
-         catch (IllegalArgumentException e)
-         {
-            //OK - this should fail
-         }
-      }
-      finally
-      {
-         if (office1 != null)
-         {
-            office1.stop();
-         }
-         
-         if (office2 != null)
-         {
-            office2.stop();
-         }
-      }
-   }
-   
-   
    public final void testClusteredBindUnbind() throws Throwable
    {
       ClusteredPostOffice office1 = null;




More information about the jboss-cvs-commits mailing list