[jbosscache-commits] JBoss Cache SVN: r5294 - in core/trunk/src: main/java/org/jboss/cache/marshall/io and 1 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Feb 5 13:06:42 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-02-05 13:06:41 -0500 (Tue, 05 Feb 2008)
New Revision: 5294

Modified:
   core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
   core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
   core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
   core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
   core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
   core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
Log:
Added object stream pooling for input streams as well

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java	2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java	2008-02-05 18:06:41 UTC (rev 5294)
@@ -18,10 +18,7 @@
 import org.jboss.cache.factories.annotations.Start;
 import org.jboss.cache.transaction.GlobalTransaction;
 
-import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -75,30 +72,33 @@
    // implement the basic contract set in RPcDispatcher.AbstractMarshaller
    public byte[] objectToByteBuffer(Object obj) throws Exception
    {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      ObjectOutputStream out = ObjectSerializationFactory.createObjectOutputStream(baos);
-      objectToObjectStream(obj, out);
-      out.close();
-      return baos.toByteArray();
+      throw new RuntimeException("Needs to be overridden!");
+//      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+//      ObjectOutputStream out = ObjectSerializationFactory.createObjectOutputStream(baos);
+//      objectToObjectStream(obj, out);
+//      out.close();
+//      return baos.toByteArray();
    }
 
    public Object objectFromByteBuffer(byte[] bytes) throws Exception
    {
-      ObjectInputStream in = ObjectSerializationFactory.createObjectInputStream(bytes);
-      return objectFromObjectStream(in);
+      throw new RuntimeException("Needs to be overridden!");
+      //ObjectInputStream in = ObjectSerializationFactory.createObjectInputStream(bytes);
+      //return objectFromObjectStream(in);
    }
 
    public Object objectFromStream(InputStream in) throws Exception
    {
+      throw new RuntimeException("Needs to be overridden!");
       // by default just create an OIS from this IS and pass in to the relevant method
-      if (in instanceof ObjectInputStream)
-      {
-         return objectFromObjectStream((ObjectInputStream) in);
-      }
-      else
-      {
-         return objectFromObjectStream(ObjectSerializationFactory.createObjectInputStream(in));
-      }
+//      if (in instanceof ObjectInputStream)
+//      {
+//         return objectFromObjectStream((ObjectInputStream) in);
+//      }
+//      else
+//      {
+//         return objectFromObjectStream(ObjectSerializationFactory.createObjectInputStream(in));
+//      }
    }
 
    /**

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java	2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java	2008-02-05 18:06:41 UTC (rev 5294)
@@ -160,11 +160,6 @@
 
    public byte[] objectToByteBuffer(Object obj) throws Exception
    {
-//      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-//      ObjectOutputStream out;
-
-      // based on the default marshaller, construct an object output stream based on what's compatible.
-//      out = ObjectSerializationFactory.createObjectOutputStream(bos);
       ReusableObjectOutputStream out = pool.getOutputStream();
       try
       {
@@ -173,16 +168,12 @@
 
          //now marshall the contents of the object
          defaultMarshaller.objectToObjectStream(obj, out);
-//         out.flush();
 
          // and return bytes.
          return out.getBytes();
       }
       finally
       {
-         // return to queue
-//         out.reset();
-//         queue.put(out);
          pool.returnStreamToPool(out);
       }
    }
@@ -191,44 +182,38 @@
    {
       Marshaller marshaller;
       int versionId;
-      ObjectInputStream in;
+      ObjectInputStream in = pool.getInputStream(buf);
+      ;
+
       try
       {
-         in = ObjectSerializationFactory.createObjectInputStream(buf);
-         versionId = in.readShort();
-         if (trace) log.trace("Read version " + versionId);
+         try
+         {
+            versionId = in.readShort();
+            if (trace) log.trace("Read version " + versionId);
+         }
+         catch (Exception e)
+         {
+            log.error("Unable to read version id from first two bytes of stream, barfing.");
+            throw e;
+         }
+
+         marshaller = getMarshaller(versionId);
+
+         return marshaller.objectFromObjectStream(in);
       }
-      catch (Exception e)
+      finally
       {
-         log.error("Unable to read version id from first two bytes of stream, barfing.");
-         throw e;
+         pool.returnStreamToPool(in);
       }
-
-      marshaller = getMarshaller(versionId);
-
-      return marshaller.objectFromObjectStream(in);
    }
 
    public Object objectFromStream(InputStream is) throws Exception
    {
-      Marshaller marshaller;
-      int versionId;
-      ObjectInputStream in;
-      try
-      {
-         in = ObjectSerializationFactory.createObjectInputStream(is);
-         versionId = in.readShort();
-         if (trace) log.trace("Read version " + versionId);
-      }
-      catch (Exception e)
-      {
-         log.error("Unable to read version id from first two bytes of stream, barfing.");
-         throw e;
-      }
-
-      marshaller = getMarshaller(versionId);
-
-      return marshaller.objectFromObjectStream(in);
+      int avbl = is.available();
+      byte[] bytes = new byte[avbl];
+      is.read(bytes, 0, avbl);
+      return objectFromByteBuffer(bytes);
    }
 
    public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java	2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java	2008-02-05 18:06:41 UTC (rev 5294)
@@ -1,8 +1,5 @@
 package org.jboss.cache.marshall.io;
 
-import org.jboss.util.stream.MarshalledValueInputStream;
-
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.concurrent.BlockingQueue;
@@ -17,10 +14,8 @@
 public class ObjectStreamPool
 {
    BlockingQueue<ReusableObjectOutputStream> outputStreams;
+   BlockingQueue<ReusableObjectInputStream> inputStreams;
 
-   // TODO: Implement for input streams as well - much harder.
-//   BlockingQueue<ReusableObjectInputStream> inputStreams;
-
    /**
     * Constructs the pool with a fixed number of object output and input streams
     *
@@ -32,10 +27,10 @@
    public ObjectStreamPool(int numOutputStreams, int numInputStreams) throws InterruptedException, IOException
    {
       outputStreams = new LinkedBlockingQueue<ReusableObjectOutputStream>();
-//      inputStreams = new LinkedBlockingQueue<ReusableObjectInputStream>();
+      inputStreams = new LinkedBlockingQueue<ReusableObjectInputStream>();
 
       for (int i = 0; i < numOutputStreams; i++) outputStreams.put(new ReusableObjectOutputStream());
-//      for (int i=0; i<numInputStreams; i++) inputStreams.put(new ReusableObjectInputStream());
+      for (int i = 0; i < numInputStreams; i++) inputStreams.put(new ReusableObjectInputStream());
    }
 
    /**
@@ -44,15 +39,17 @@
     */
    public ReusableObjectOutputStream getOutputStream() throws InterruptedException
    {
-      return outputStreams.take();
+      ReusableObjectOutputStream roos = outputStreams.take();
+      roos.init();
+      return roos;
    }
 
    public ObjectInputStream getInputStream(byte[] buf) throws IOException, InterruptedException
    {
-      return new MarshalledValueInputStream(new ByteArrayInputStream(buf));
-//      ReusableObjectInputStream rois = inputStreams.take();
-//      rois.init(buf);
-//      return rois;
+//      return new MarshalledValueInputStream(new ByteArrayInputStream(buf));
+      ReusableObjectInputStream rois = inputStreams.take();
+      rois.init(buf);
+      return rois;
    }
 
    public void returnStreamToPool(ReusableObjectOutputStream stream) throws IOException, InterruptedException
@@ -63,7 +60,7 @@
 
    public void returnStreamToPool(ObjectInputStream stream) throws InterruptedException, IOException
    {
-//      stream.reset();
-//      inputStreams.put((ReusableObjectInputStream) stream);
+      stream.reset();
+      inputStreams.put((ReusableObjectInputStream) stream);
    }
 }

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java	2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java	2008-02-05 18:06:41 UTC (rev 5294)
@@ -3,9 +3,7 @@
 import org.jboss.util.stream.MarshalledValueInputStream;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectStreamConstants;
 
 /**
  * A reusable ObjectInputStream that uses an internal byte array.  The byte array can be set using {@link #init(byte[])},
@@ -53,7 +51,6 @@
    public void init(byte[] buf) throws IOException
    {
       bytes.init(buf);
-//      readStreamHeader();
    }
 
    /**
@@ -61,26 +58,18 @@
     */
    static class ReusableByteArrayInputStream extends ByteArrayInputStream
    {
-      static byte[] initBytes;
+      private final static byte[] INIT_BYTES = {(byte) ((STREAM_MAGIC >>> 8) & 0xFF),
+            (byte) ((STREAM_MAGIC) & 0xFF),
+            (byte) ((STREAM_VERSION >>> 8)),
+            (byte) ((STREAM_VERSION) & 0xFF),
+            TC_BLOCKDATA, 0};
 
-      static
-      {
-         ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-         bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF));
-         bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC) & 0xFF));
-         bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION >>> 8)));
-         bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION) & 0xFF));
-         bytes.write(TC_BLOCKDATA);
-         bytes.write(0);
-         initBytes = bytes.toByteArray();
-      }
-
       /**
        * Creates a new instance with a null byte buffer.  Use {@link #init(byte[])} to set the byte buffer to use.
        */
       ReusableByteArrayInputStream()
       {
-         super(initBytes);
+         super(INIT_BYTES);
       }
 
       /**
@@ -99,7 +88,8 @@
       public void init(byte[] b)
       {
          buf = b;
-         count = 0;
+         count = buf.length;
+         pos = 0;
       }
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java	2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java	2008-02-05 18:06:41 UTC (rev 5294)
@@ -18,6 +18,7 @@
 public class ReusableObjectOutputStream extends ObjectOutputStream
 {
    ResettableByteArrayOutputStream baos;
+   boolean initialised = false;
 
    public ReusableObjectOutputStream() throws IOException, SecurityException
    {
@@ -28,10 +29,20 @@
    {
       super(baos);
       this.baos = baos;
-      reset();
    }
 
    /**
+    * Initialises a pooled stream for use.
+    */
+   public void init()
+   {
+      baos.init();
+      initialised = true;
+      // write a TC_RESET to any input streams know to clear internal data structures before attempting to read
+      baos.write(TC_RESET);
+   }
+
+   /**
     * Unlike {@link java.io.ObjectOutputStream#reset()}, this method will flush and wipe the underlying byte stream,
     * and ensure ObjectOutputStream headers are rewritten, awaiting proper reuse.
     *
@@ -40,19 +51,15 @@
    @Override
    public void reset() throws IOException
    {
-      super.reset();
-      super.flush();
-      baos.reset();
-      writeStreamHeader();
-      super.flush();
-
-      // make sure there are 6 bytes in the byte stream - 2 block data markers (which need to be removed),
-      // 2 magic number bytes, and 2 version bytes.
-      baos.assertCount(6);
-
-      // remove the 2 leading block data marker bytes, so that we now have 4 bytes which is a header that an
-      // ObjectInputStream knows how to deal with.
-      baos.removeBytesFromHead(2);
+      if (initialised)
+      {
+         super.reset();
+         super.flush();
+         baos.reset();
+         super.flush();
+         baos.trim();
+         initialised = false;
+      }
    }
 
    /**
@@ -69,21 +76,21 @@
     */
    static class ResettableByteArrayOutputStream extends ByteArrayOutputStream
    {
-      public void assertCount(int expectedCount) throws IOException
+      int getBufSize()
       {
-         if (count != expectedCount)
-            throw new IOException("Expected a count of " + expectedCount + " but was " + count);
+         return buf.length;
       }
 
-      public void removeBytesFromHead(int numBytes)
+      public void trim()
       {
-         if (count < numBytes)
-            throw new ArrayIndexOutOfBoundsException("Can't remove " + numBytes + " from a byte array of just " + count + " bytes!");
-         int newCount = count - numBytes;
-         byte[] newByteArray = new byte[16];
-         System.arraycopy(buf, numBytes, newByteArray, 0, newCount);
-         buf = newByteArray;
-         count = newCount;
+         // make sure we trim the buffer array to it's original size, to save on memory
+         buf = null;
       }
+
+      public void init()
+      {
+         buf = new byte[32];
+         count = 0;
+      }
    }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java	2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java	2008-02-05 18:06:41 UTC (rev 5294)
@@ -6,6 +6,7 @@
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.io.Serializable;
 import java.util.Date;
 import java.util.Random;
 
@@ -14,24 +15,24 @@
  * @since 2.1.0
  */
 @Test(groups = "functional")
-public class ObjectStreamPoolTest
+public class ObjectStreamPoolTest implements Serializable
 {
-   ObjectStreamPool pool;
-   ReusableObjectOutputStream oos;
-   ObjectInputStream ois;
+   transient ObjectStreamPool pool;
+   transient ReusableObjectOutputStream oos;
+   transient ObjectInputStream ois;
 
    @BeforeTest
    public void setUp() throws IOException, InterruptedException
    {
-      pool = new ObjectStreamPool(2, 2);
+      pool = new ObjectStreamPool(1, 1);
    }
 
    @AfterMethod
    public void afterMethod() throws IOException, InterruptedException
    {
       // ensure we return stuff to the pool
-      pool.returnStreamToPool(oos);
-      pool.returnStreamToPool(ois);
+      if (oos != null) pool.returnStreamToPool(oos);
+      if (ois != null) pool.returnStreamToPool(ois);
    }
 
    public void testStreamReset() throws IOException, ClassNotFoundException, InterruptedException
@@ -68,6 +69,18 @@
 
       ois = pool.getInputStream(asBytes);
       assert ois.readObject().equals(toWrite);
+
+      // return to pool
+      pool.returnStreamToPool(oos);
+      pool.returnStreamToPool(ois);
+
+      oos = pool.getOutputStream();
+      toWrite = new MyClass(876, "Hello");
+      oos.writeObject(toWrite);
+      asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      assert ois.readObject().equals(toWrite);
    }
 
    /**
@@ -105,6 +118,39 @@
 
       ois = pool.getInputStream(asBytes);
       for (int i = 0; i < 0xFFFF; i++) assert ois.readObject().equals(toWrite);
+
+      // return to pool
+      pool.returnStreamToPool(oos);
+      pool.returnStreamToPool(ois);
+
+      oos = pool.getOutputStream();
+      toWrite = new MyClass(1, "x");
+      for (int i = 0; i < 0xFFFF; i++) oos.writeObject(toWrite);
+      asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      for (int i = 0; i < 0xFFFF; i++) assert ois.readObject().equals(toWrite);
+
+      // return to pool
+      pool.returnStreamToPool(oos);
+      pool.returnStreamToPool(ois);
+
+      oos = pool.getOutputStream();
+      int arraySize = 0xFFFF;
+      Object[] toWriteArray = new Object[arraySize];
+      for (int i = 0; i < arraySize; i++)
+      {
+         toWriteArray[i] = new MyClass(i, "x");
+         oos.writeObject(toWriteArray[i]);
+      }
+      asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      for (int i = 0; i < arraySize; i++)
+      {
+         Object read = ois.readObject();
+         assert read.equals(toWriteArray[i]) : "Loop " + i + ": Expected " + toWriteArray[i] + " but was " + read;
+      }
    }
 
    public void testStreamResetWithFlush() throws Exception
@@ -155,4 +201,96 @@
       assert ois.readObject().equals(toWrite);
       assert ois.readObject().equals(toWrite2);
    }
+
+   public void testCustomTypes() throws Exception
+   {
+      oos = pool.getOutputStream();
+
+      MyClass toWrite = new MyClass(123, "hello");
+      MyClass toWrite2 = new MyClass(456, "world");
+      oos.writeObject(toWrite);
+      oos.flush();
+      oos.writeObject(toWrite2);
+      byte[] asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      assert ois.readObject().equals(toWrite);
+      assert ois.readObject().equals(toWrite2);
+   }
+
+   public void testMixedTypes() throws Exception
+   {
+      oos = pool.getOutputStream();
+
+      MyClass toWrite = new MyClass(123, "hello");
+      MyClass toWrite2 = new MyClass(456, "world");
+      oos.writeObject(toWrite);
+      oos.flush();
+      oos.writeBoolean(false);
+      oos.flush();
+      oos.writeObject(toWrite2);
+      oos.flush();
+      oos.writeShort(6);
+      byte[] asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      assert ois.readObject().equals(toWrite);
+      assert ois.readBoolean() == false;
+      assert ois.readObject().equals(toWrite2);
+      assert ois.readShort() == 6;
+   }
+
+   public void testReusableOutputStreamArraySize() throws InterruptedException, IOException
+   {
+      oos = pool.getOutputStream();
+      int originalLength = oos.baos.getBufSize();
+      for (int i = 0; i < 0xFF; i++) oos.write(7);
+      oos.flush();
+      assert oos.baos.getBufSize() > 0xFF;
+
+      oos.reset();
+      oos.init();
+      assert oos.baos.getBufSize() == originalLength;
+   }
+
+   public static class MyClass implements Serializable
+   {
+      int number;
+      String name;
+
+      public MyClass(int number, String name)
+      {
+         this.number = number;
+         this.name = name;
+      }
+
+      public boolean equals(Object o)
+      {
+         if (this == o) return true;
+         if (o == null || getClass() != o.getClass()) return false;
+
+         MyClass myClass = (MyClass) o;
+
+         if (number != myClass.number) return false;
+         if (name != null ? !name.equals(myClass.name) : myClass.name != null) return false;
+
+         return true;
+      }
+
+      public int hashCode()
+      {
+         int result;
+         result = number;
+         result = 31 * result + (name != null ? name.hashCode() : 0);
+         return result;
+      }
+
+      public String toString()
+      {
+         return "MyClass{" +
+               "number=" + number +
+               ", name='" + name + '\'' +
+               '}';
+      }
+   }
 }




More information about the jbosscache-commits mailing list