[jbosscache-commits] JBoss Cache SVN: r5292 - in core/trunk/src: main/java/org/jboss/cache/marshall/io and 2 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Feb 5 10:42:05 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-02-05 10:42:05 -0500 (Tue, 05 Feb 2008)
New Revision: 5292

Added:
   core/trunk/src/main/java/org/jboss/cache/marshall/io/
   core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
   core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
   core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
   core/trunk/src/test/java/org/jboss/cache/marshall/io/
   core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
Log:
Added object stream pooling (initial version for testing)

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java	2008-02-05 15:41:33 UTC (rev 5291)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java	2008-02-05 15:42:05 UTC (rev 5292)
@@ -8,13 +8,15 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.marshall.io.ObjectStreamPool;
+import org.jboss.cache.marshall.io.ReusableObjectOutputStream;
 import org.jboss.cache.util.Util;
 
-import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -43,6 +45,7 @@
    Marshaller defaultMarshaller;
    Map<Integer, Marshaller> marshallers = new HashMap<Integer, Marshaller>();
    private int versionInt;
+   ObjectStreamPool pool;
 
    @Inject
    void injectComponentRegistry(ComponentRegistry componentRegistry)
@@ -96,6 +99,15 @@
          log.debug("Started with version " + replVersionString + " and versionInt " + versionInt);
          log.debug("Using default marshaller class " + this.defaultMarshaller.getClass());
       }
+
+      try
+      {
+         pool = new ObjectStreamPool(25, 25);
+      }
+      catch (Exception e)
+      {
+         throw new CacheException(e);
+      }
    }
 
    protected int getCustomMarshallerVersionInt()
@@ -148,20 +160,31 @@
 
    public byte[] objectToByteBuffer(Object obj) throws Exception
    {
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      ObjectOutputStream out;
+//      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+//      ObjectOutputStream out;
 
       // based on the default marshaller, construct an object output stream based on what's compatible.
-      out = ObjectSerializationFactory.createObjectOutputStream(bos);
-      out.writeShort(versionInt);
-      if (trace) log.trace("Wrote version " + versionInt);
+//      out = ObjectSerializationFactory.createObjectOutputStream(bos);
+      ReusableObjectOutputStream out = pool.getOutputStream();
+      try
+      {
+         out.writeShort(versionInt);
+         if (trace) log.trace("Wrote version " + versionInt);
 
-      //now marshall the contents of the object
-      defaultMarshaller.objectToObjectStream(obj, out);
-      out.close();
+         //now marshall the contents of the object
+         defaultMarshaller.objectToObjectStream(obj, out);
+//         out.flush();
 
-      // and return bytes.
-      return bos.toByteArray();
+         // and return bytes.
+         return out.getBytes();
+      }
+      finally
+      {
+         // return to queue
+//         out.reset();
+//         queue.put(out);
+         pool.returnStreamToPool(out);
+      }
    }
 
    public Object objectFromByteBuffer(byte[] buf) throws Exception

Added: core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java	2008-02-05 15:42:05 UTC (rev 5292)
@@ -0,0 +1,69 @@
+package org.jboss.cache.marshall.io;
+
+import org.jboss.util.stream.MarshalledValueInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A stream pool
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ObjectStreamPool
+{
+   BlockingQueue<ReusableObjectOutputStream> outputStreams;
+
+   // TODO: Implement for input streams as well - much harder.
+//   BlockingQueue<ReusableObjectInputStream> inputStreams;
+
+   /**
+    * Constructs the pool with a fixed number of object output and input streams
+    *
+    * @param numOutputStreams number of object output streams
+    * @param numInputStreams  number of object input streams
+    * @throws InterruptedException
+    * @throws IOException
+    */
+   public ObjectStreamPool(int numOutputStreams, int numInputStreams) throws InterruptedException, IOException
+   {
+      outputStreams = new LinkedBlockingQueue<ReusableObjectOutputStream>();
+//      inputStreams = new LinkedBlockingQueue<ReusableObjectInputStream>();
+
+      for (int i = 0; i < numOutputStreams; i++) outputStreams.put(new ReusableObjectOutputStream());
+//      for (int i=0; i<numInputStreams; i++) inputStreams.put(new ReusableObjectInputStream());
+   }
+
+   /**
+    * @return the next available ObjectOutputStream
+    * @throws InterruptedException
+    */
+   public ReusableObjectOutputStream getOutputStream() throws InterruptedException
+   {
+      return outputStreams.take();
+   }
+
+   public ObjectInputStream getInputStream(byte[] buf) throws IOException, InterruptedException
+   {
+      return new MarshalledValueInputStream(new ByteArrayInputStream(buf));
+//      ReusableObjectInputStream rois = inputStreams.take();
+//      rois.init(buf);
+//      return rois;
+   }
+
+   public void returnStreamToPool(ReusableObjectOutputStream stream) throws IOException, InterruptedException
+   {
+      stream.reset();
+      outputStreams.put(stream);
+   }
+
+   public void returnStreamToPool(ObjectInputStream stream) throws InterruptedException, IOException
+   {
+//      stream.reset();
+//      inputStreams.put((ReusableObjectInputStream) stream);
+   }
+}

Added: core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java	2008-02-05 15:42:05 UTC (rev 5292)
@@ -0,0 +1,106 @@
+package org.jboss.cache.marshall.io;
+
+import org.jboss.util.stream.MarshalledValueInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectStreamConstants;
+
+/**
+ * A reusable ObjectInputStream that uses an internal byte array.  The byte array can be set using {@link #init(byte[])},
+ * objects read from the byte array, and the stream reset using {@link #reset()}.
+ * <p/>
+ * These features allow the stream to be reused and pooled.
+ * <p/>
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ReusableObjectInputStream extends MarshalledValueInputStream
+{
+   ReusableByteArrayInputStream bytes;
+
+   public ReusableObjectInputStream() throws IOException, SecurityException
+   {
+      this(new ReusableByteArrayInputStream());
+   }
+
+   protected ReusableObjectInputStream(ReusableByteArrayInputStream in) throws IOException
+   {
+      super(in);
+      bytes = in;
+   }
+
+   /**
+    * Resets the ObjectInputStream so it can be reused, by resetting and clearing any internal caches and by clearing the
+    * underlying byte stream to free up memory.
+    *
+    * @throws IOException
+    */
+   @Override
+   public void reset() throws IOException
+   {
+      bytes.reset();
+      bytes.clear();
+   }
+
+   /**
+    * Initialises the stream to read from the byte buffer passed in
+    *
+    * @param buf byte buffer to read from
+    */
+   public void init(byte[] buf) throws IOException
+   {
+      bytes.init(buf);
+//      readStreamHeader();
+   }
+
+   /**
+    * A byte array input stream that can be reused (i.e., have it's byte array re-initialised).
+    */
+   static class ReusableByteArrayInputStream extends ByteArrayInputStream
+   {
+      static byte[] initBytes;
+
+      static
+      {
+         ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+         bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF));
+         bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC) & 0xFF));
+         bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION >>> 8)));
+         bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION) & 0xFF));
+         bytes.write(TC_BLOCKDATA);
+         bytes.write(0);
+         initBytes = bytes.toByteArray();
+      }
+
+      /**
+       * Creates a new instance with a null byte buffer.  Use {@link #init(byte[])} to set the byte buffer to use.
+       */
+      ReusableByteArrayInputStream()
+      {
+         super(initBytes);
+      }
+
+      /**
+       * Sets the underlying byte buffer to null, to free up memory after use.
+       */
+      public void clear()
+      {
+         buf = null;
+      }
+
+      /**
+       * Initialises the byte stream with the byte array passed in.
+       *
+       * @param b byte array to use
+       */
+      public void init(byte[] b)
+      {
+         buf = b;
+         count = 0;
+      }
+   }
+
+}

Added: core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java	2008-02-05 15:42:05 UTC (rev 5292)
@@ -0,0 +1,89 @@
+package org.jboss.cache.marshall.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+/**
+ * An ObjectOutputStream that can safely be reset and reused.  Allows for easy pooling of streams for performance.
+ * <p/>
+ * This is an ObjectOutputStream with an internal underlying byte stream, which can be queried with {@link #getBytes()}.
+ * This stream is reusable - calling {@link #reset()} will ensure the underlying byte stream is reset and the ObjectOutputStream
+ * caches are flushed, and headers are written.
+ * <p/>
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ReusableObjectOutputStream extends ObjectOutputStream
+{
+   ResettableByteArrayOutputStream baos;
+
+   public ReusableObjectOutputStream() throws IOException, SecurityException
+   {
+      this(new ResettableByteArrayOutputStream());
+   }
+
+   protected ReusableObjectOutputStream(ResettableByteArrayOutputStream baos) throws IOException, SecurityException
+   {
+      super(baos);
+      this.baos = baos;
+      reset();
+   }
+
+   /**
+    * Unlike {@link java.io.ObjectOutputStream#reset()}, this method will flush and wipe the underlying byte stream,
+    * and ensure ObjectOutputStream headers are rewritten, awaiting proper reuse.
+    *
+    * @throws IOException
+    */
+   @Override
+   public void reset() throws IOException
+   {
+      super.reset();
+      super.flush();
+      baos.reset();
+      writeStreamHeader();
+      super.flush();
+
+      // make sure there are 6 bytes in the byte stream - 2 block data markers (which need to be removed),
+      // 2 magic number bytes, and 2 version bytes.
+      baos.assertCount(6);
+
+      // remove the 2 leading block data marker bytes, so that we now have 4 bytes which is a header that an
+      // ObjectInputStream knows how to deal with.
+      baos.removeBytesFromHead(2);
+   }
+
+   /**
+    * @return bytes in the underlying byte stream.  Will internally do a flush first.
+    */
+   public byte[] getBytes() throws IOException
+   {
+      flush();
+      return baos.toByteArray();
+   }
+
+   /**
+    * A byte array stream that allows for more direct manipulation of the byte array.
+    */
+   static class ResettableByteArrayOutputStream extends ByteArrayOutputStream
+   {
+      public void assertCount(int expectedCount) throws IOException
+      {
+         if (count != expectedCount)
+            throw new IOException("Expected a count of " + expectedCount + " but was " + count);
+      }
+
+      public void removeBytesFromHead(int numBytes)
+      {
+         if (count < numBytes)
+            throw new ArrayIndexOutOfBoundsException("Can't remove " + numBytes + " from a byte array of just " + count + " bytes!");
+         int newCount = count - numBytes;
+         byte[] newByteArray = new byte[16];
+         System.arraycopy(buf, numBytes, newByteArray, 0, newCount);
+         buf = newByteArray;
+         count = newCount;
+      }
+   }
+}

Added: core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java	2008-02-05 15:42:05 UTC (rev 5292)
@@ -0,0 +1,158 @@
+package org.jboss.cache.marshall.io;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Date;
+import java.util.Random;
+
+/**
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+ at Test(groups = "functional")
+public class ObjectStreamPoolTest
+{
+   ObjectStreamPool pool;
+   ReusableObjectOutputStream oos;
+   ObjectInputStream ois;
+
+   @BeforeTest
+   public void setUp() throws IOException, InterruptedException
+   {
+      pool = new ObjectStreamPool(2, 2);
+   }
+
+   @AfterMethod
+   public void afterMethod() throws IOException, InterruptedException
+   {
+      // ensure we return stuff to the pool
+      pool.returnStreamToPool(oos);
+      pool.returnStreamToPool(ois);
+   }
+
+   public void testStreamReset() throws IOException, ClassNotFoundException, InterruptedException
+   {
+      oos = pool.getOutputStream();
+
+      Object toWrite = "BlahBlah";
+      oos.writeObject(toWrite);
+      byte[] asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      assert ois.readObject().equals(toWrite);
+
+      // return to pool
+      pool.returnStreamToPool(oos);
+      pool.returnStreamToPool(ois);
+
+      oos = pool.getOutputStream();
+      toWrite = new Integer(9);
+      oos.writeObject(toWrite);
+      asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      assert ois.readObject().equals(toWrite);
+
+      // return to pool
+      pool.returnStreamToPool(oos);
+      pool.returnStreamToPool(ois);
+
+      oos = pool.getOutputStream();
+      toWrite = new Date();
+      oos.writeObject(toWrite);
+      asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      assert ois.readObject().equals(toWrite);
+   }
+
+   /**
+    * To test block data header sizes
+    *
+    * @throws IOException
+    * @throws ClassNotFoundException
+    * @throws InterruptedException
+    */
+   public void testLargeStreams() throws IOException, ClassNotFoundException, InterruptedException
+   {
+      oos = pool.getOutputStream();
+
+      StringBuilder sb = new StringBuilder();
+      Random r = new Random();
+      // 1K string
+      for (int i = 0; i < 0xFFFF; i++) sb.append((char) (r.nextInt(26) + 65));
+
+      Object toWrite = sb.toString();
+      System.out.println("Writing string (length = " + ((String) toWrite).length() + ") " + toWrite);
+      oos.writeObject(toWrite);
+      byte[] asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      assert ois.readObject().equals(toWrite);
+
+      // return to pool
+      pool.returnStreamToPool(oos);
+      pool.returnStreamToPool(ois);
+
+      oos = pool.getOutputStream();
+      toWrite = new Integer(9);
+      for (int i = 0; i < 0xFFFF; i++) oos.writeObject(toWrite);
+      asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      for (int i = 0; i < 0xFFFF; i++) assert ois.readObject().equals(toWrite);
+   }
+
+   public void testStreamResetWithFlush() throws Exception
+   {
+      oos = pool.getOutputStream();
+
+      Object toWrite = "BlahBlah", toWrite2 = "Hello world";
+      oos.writeObject(toWrite);
+      oos.flush();
+      oos.writeObject(toWrite2);
+      byte[] asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      assert ois.readObject().equals(toWrite);
+      assert ois.readObject().equals(toWrite2);
+
+      // return to pool
+      pool.returnStreamToPool(oos);
+      pool.returnStreamToPool(ois);
+
+      oos = pool.getOutputStream();
+
+      toWrite = new Integer(9);
+      toWrite2 = new Long(99);
+      oos.writeObject(toWrite);
+      oos.flush();
+      oos.writeObject(toWrite2);
+      asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      assert ois.readObject().equals(toWrite);
+      assert ois.readObject().equals(toWrite2);
+
+      // return to pool
+      pool.returnStreamToPool(oos);
+      pool.returnStreamToPool(ois);
+
+      oos = pool.getOutputStream();
+
+      toWrite = new Date();
+      toWrite2 = Boolean.FALSE;
+      oos.writeObject(toWrite);
+      oos.flush();
+      oos.writeObject(toWrite2);
+      asBytes = oos.getBytes();
+
+      ois = pool.getInputStream(asBytes);
+      assert ois.readObject().equals(toWrite);
+      assert ois.readObject().equals(toWrite2);
+   }
+}




More information about the jbosscache-commits mailing list