[jbosscache-commits] JBoss Cache SVN: r6486 - in core/trunk/src: main/java/org/jboss/cache/io and 4 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Aug 4 06:47:51 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-08-04 06:47:51 -0400 (Mon, 04 Aug 2008)
New Revision: 6486

Added:
   core/trunk/src/main/java/org/jboss/cache/io/
   core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java
   core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java
Removed:
   core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java
   core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
   core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
   core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
   core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
Log:
Updated to use new JGroups Marshaller2 interface and Buffer class instead of byte arrays.  Also created a new subclass of Buffer - ByteBuffer - that exposes a stream on top of the byte buffer.  Finally, moveed EBAOS to o.j.c.io instead of o.j.c.util.

Added: core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java	2008-08-04 10:47:51 UTC (rev 6486)
@@ -0,0 +1,25 @@
+package org.jboss.cache.io;
+
+import org.jgroups.util.Buffer;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+/**
+ * A subclass of a JGroups Buffer
+ */
+public class ByteBuffer extends Buffer
+{
+   public ByteBuffer(byte[] bytes, int offset, int length)
+   {
+      super(bytes, offset, length);
+   }
+
+   /**
+    * @return an input stream for the bytes in the buffer
+    */
+   public InputStream getStream()
+   {
+      return new ByteArrayInputStream(getBuf(), getOffset(), getLength());
+   }
+}

Copied: core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java (from rev 6485, core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java	2008-08-04 10:47:51 UTC (rev 6486)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.cache.io;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Extends ByteArrayOutputStream, but exposes the internal buffer.
+ * Using this, callers don't need to call toByteArray() which copies the
+ * internal buffer.
+ * <p>
+ * Also overrides the superclass' behavior of always doubling the size of the
+ * internal buffer any time more capacity is needed.  This class doubles the
+ * size until the internal buffer reaches a configurable max size (default is
+ * 4MB), after which it begins growing the buffer in 25% increments.  This is
+ * intended to help prevent an OutOfMemoryError during a resize of a large
+ * buffer.
+ * </p>
+ * <p>
+ * A version of this class was originally created by Bela Ban as part of the
+ * JGroups library.
+ * </p>
+ * This class is not threadsafe as it will not support concurrent readers and writers.
+ * <p/>
+ *
+ * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
+ * @version $Id$
+ */
+ at NotThreadSafe
+public class ExposedByteArrayOutputStream extends ByteArrayOutputStream
+{
+   /**
+    * Default buffer size after which if more buffer capacity
+    * is needed the buffer will grow by 25% rather than 100%
+    */
+   public static final int DEFAULT_DOUBLING_SIZE = 4 * 1024 * 1024; // 4MB
+
+   private int maxDoublingSize = DEFAULT_DOUBLING_SIZE;
+
+   public ExposedByteArrayOutputStream()
+   {
+      super();
+   }
+
+   public ExposedByteArrayOutputStream(int size)
+   {
+      super(size);
+   }
+
+   /**
+    * Creates a new byte array output stream, with a buffer capacity of
+    * the specified size, in bytes.
+    *
+    * @param size            the initial size.
+    * @param maxDoublingSize the buffer size, after which if more capacity
+    *                        is needed the buffer will grow by 25%
+    *                        rather than 100%
+    * @throws IllegalArgumentException if size is negative.
+    */
+   public ExposedByteArrayOutputStream(int size, int maxDoublingSize)
+   {
+      super(size);
+      this.maxDoublingSize = maxDoublingSize;
+   }
+
+   /**
+    * Gets the internal buffer array. Note that the length of this array
+    * will almost certainly be longer than the data written to it; call
+    * <code>size()</code> to get the number of bytes of actual data.
+    */
+   public byte[] getRawBuffer()
+   {
+      return buf;
+   }
+
+   @Override
+   public void write(byte[] b, int off, int len)
+   {
+      if ((off < 0) || (off > b.length) || (len < 0) ||
+            ((off + len) > b.length) || ((off + len) < 0))
+      {
+         throw new IndexOutOfBoundsException();
+      }
+      else if (len == 0)
+      {
+         return;
+      }
+
+      int newcount = count + len;
+      if (newcount > buf.length)
+      {
+         byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
+         System.arraycopy(buf, 0, newbuf, 0, count);
+         buf = newbuf;
+      }
+
+      System.arraycopy(b, off, buf, count, len);
+      count = newcount;
+   }
+
+   @Override
+   public void write(int b)
+   {
+      int newcount = count + 1;
+      if (newcount > buf.length)
+      {
+         byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
+         System.arraycopy(buf, 0, newbuf, 0, count);
+         buf = newbuf;
+      }
+      buf[count] = (byte) b;
+      count = newcount;
+   }
+
+   /**
+    * Gets the highest internal buffer size after which if more capacity
+    * is needed the buffer will grow in 25% increments rather than 100%.
+    */
+   public int getMaxDoublingSize()
+   {
+      return maxDoublingSize;
+   }
+
+   /**
+    * Gets the number of bytes to which the internal buffer should be resized.
+    *
+    * @param curSize    the current number of bytes
+    * @param minNewSize the minimum number of bytes required
+    * @return the size to which the internal buffer should be resized
+    */
+   public int getNewBufferSize(int curSize, int minNewSize)
+   {
+      if (curSize <= maxDoublingSize)
+         return Math.max(curSize << 1, minNewSize);
+      else
+         return Math.max(curSize + (curSize >> 2), minNewSize);
+   }
+}


Property changes on: core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java	2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java	2008-08-04 10:47:51 UTC (rev 6486)
@@ -5,10 +5,10 @@
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Modification;
 import org.jboss.cache.config.CacheLoaderConfig;
+import org.jboss.cache.io.ByteBuffer;
 import org.jboss.cache.lock.StripedLock;
 import org.jboss.cache.util.Util;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.sql.Connection;
@@ -424,13 +424,8 @@
 
          if (node != null)
          {
-            //            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            //            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            //            oos.writeObject(node);
-
-            byte[] byteStream = marshall(node);
-            ByteArrayInputStream bais = new ByteArrayInputStream(byteStream);
-            ps.setBinaryStream(2, bais, byteStream.length);
+            ByteBuffer byteBuffer = marshall(node);
+            ps.setBinaryStream(2, byteBuffer.getStream(), byteBuffer.getLength());
          }
          else
          {
@@ -499,30 +494,15 @@
          con = cf.getConnection();
          ps = con.prepareStatement(config.getUpdateNodeSql());
 
-         if (node == null)
-         {
-            //ps.setNull(1, Types.BLOB);
-            //            ps.setNull(1, Types.LONGVARBINARY);
-            // don't set it to null - simply use an empty hash map.
-            node = new HashMap<Object, Object>(0);
-         }
+         if (node == null) node = Collections.emptyMap();
 
-         //         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-         //         ObjectOutputStream oos = new ObjectOutputStream(baos);
-         //         oos.writeObject(node);
+         ByteBuffer byteBuffer = marshall(node);
+         ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength());
 
-         byte[] byteStream = marshall(node);
-         ByteArrayInputStream bais = new ByteArrayInputStream(byteStream);
-         ps.setBinaryStream(1, bais, byteStream.length);
-
          ps.setString(2, name.toString());
 
          /*int rows = */
          ps.executeUpdate();
-         //         if (rows != 1)
-         //         {
-         //            throw new IllegalStateException("Expected one updated row but got " + rows);
-         //         }
       }
       catch (Exception e)
       {
@@ -685,9 +665,9 @@
       return getMarshaller().objectFromStream(from);
    }
 
-   protected byte[] marshall(Object obj) throws Exception
+   protected ByteBuffer marshall(Object obj) throws Exception
    {
-      return getMarshaller().objectToByteBuffer(obj);
+      return getMarshaller().objectToBuffer(obj);
    }
 
    private static String toUpperCase(String s)

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java	2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java	2008-08-04 10:47:51 UTC (rev 6486)
@@ -35,6 +35,7 @@
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.io.ByteBuffer;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jgroups.util.Buffer;
 
@@ -90,10 +91,13 @@
    // implement the basic contract set in RPCDispatcher.AbstractMarshaller
    public byte[] objectToByteBuffer(Object obj) throws Exception
    {
-      throw new RuntimeException("Needs to be overridden!");
+      Buffer b = objectToBuffer(obj);
+      byte[] bytes = new byte[b.getLength()];
+      System.arraycopy(b.getBuf(), b.getOffset(), bytes, 0, b.getLength());
+      return bytes;
    }
 
-   public Buffer objectToBuffer(Object o) throws Exception
+   public ByteBuffer objectToBuffer(Object o) throws Exception
    {
       throw new RuntimeException("Needs to be overridden!");
    }

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2008-08-04 10:47:51 UTC (rev 6486)
@@ -70,7 +70,7 @@
     * is aware of {@link org.jboss.cache.commands.ReplicableCommand} objects.
     */
    public RspList invokeRemoteCommands(Vector<Address> dests, ReplicableCommand command, int mode, long timeout,
-                                       boolean use_anycasting, boolean oob, RspFilter filter) throws NotSerializableException
+                                       boolean anycasting, boolean oob, RspFilter filter) throws NotSerializableException
    {
       if (dests != null && dests.isEmpty())
       {
@@ -96,7 +96,7 @@
       Message msg = new Message();
       msg.setBuffer(buf);
       if (oob) msg.setFlag(Message.OOB);
-      RspList retval = super.castMessage(dests, msg, mode, timeout, use_anycasting, filter);
+      RspList retval = super.castMessage(dests, msg, mode, timeout, anycasting, filter);
       if (trace) log.trace("responses: " + retval);
 
       // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java	2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java	2008-08-04 10:47:51 UTC (rev 6486)
@@ -1,6 +1,7 @@
 package org.jboss.cache.marshall;
 
 import org.jboss.cache.Fqn;
+import org.jboss.cache.io.ByteBuffer;
 import org.jgroups.blocks.RpcDispatcher;
 
 import java.io.InputStream;
@@ -90,4 +91,14 @@
     * @since 2.1.1
     */
    RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception;
+
+   /**
+    * A specialized form of {@link org.jgroups.blocks.RpcDispatcher.Marshaller2#objectToBuffer(Object)} that returns an instance
+    * of {@link ByteBuffer} instead of {@link org.jgroups.util.Buffer}.
+    *
+    * @param o object to marshall
+    * @return a ByteBuffer
+    * @throws Exception
+    */
+   ByteBuffer objectToBuffer(Object o) throws Exception;
 }

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java	2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java	2008-08-04 10:47:51 UTC (rev 6486)
@@ -12,10 +12,10 @@
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Start;
-import org.jboss.cache.util.ExposedByteArrayOutputStream;
+import org.jboss.cache.io.ByteBuffer;
+import org.jboss.cache.io.ExposedByteArrayOutputStream;
 import org.jboss.cache.util.Util;
 import org.jboss.util.stream.MarshalledValueInputStream;
-import org.jgroups.util.Buffer;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
@@ -152,7 +152,7 @@
    }
 
    @Override
-   public Buffer objectToBuffer(Object obj) throws Exception
+   public ByteBuffer objectToBuffer(Object obj) throws Exception
    {
       ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
       ObjectOutputStream out = new ObjectOutputStream(baos);
@@ -165,7 +165,7 @@
       out.close();
 
       // and return bytes.
-      return new Buffer(baos.toByteArray(), 0, baos.size());
+      return new ByteBuffer(baos.toByteArray(), 0, baos.size());
    }
 
    @Override

Deleted: core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java	2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java	2008-08-04 10:47:51 UTC (rev 6486)
@@ -1,140 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
-package org.jboss.cache.util;
-
-import java.io.ByteArrayOutputStream;
-
-/**
- * Extends ByteArrayOutputStream, but exposes the internal buffer.
- * Using this, callers don't need to call toByteArray() which copies the
- * internal buffer.
- * <p>
- * Also overrides the superclass' behavior of always doubling the size of the
- * internal buffer any time more capacity is needed.  This class doubles the
- * size until the internal buffer reaches a configurable max size (default is
- * 4MB), after which it begins growing the buffer in 25% increments.  This is
- * intended to help prevent an OutOfMemoryError during a resize of a large
- * buffer.
- * </p>
- * <p>
- * A version of this class was originally created by Bela Ban as part of the
- * JGroups library.
- * </p>
- *
- * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
- * @version $Id$
- */
-public class ExposedByteArrayOutputStream extends ByteArrayOutputStream
-{
-   /**
-    * Default buffer size after which if more buffer capacity
-    * is needed the buffer will grow by 25% rather than 100%
-    */
-   public static final int DEFAULT_DOUBLING_SIZE = 4 * 1024 * 1024; // 4MB
-
-   private int maxDoublingSize = DEFAULT_DOUBLING_SIZE;
-
-   public ExposedByteArrayOutputStream()
-   {
-      super();
-   }
-
-   public ExposedByteArrayOutputStream(int size)
-   {
-      super(size);
-   }
-
-   /**
-    * Creates a new byte array output stream, with a buffer capacity of
-    * the specified size, in bytes.
-    *
-    * @param size            the initial size.
-    * @param maxDoublingSize the buffer size, after which if more capacity
-    *                        is needed the buffer will grow by 25%
-    *                        rather than 100%
-    * @throws IllegalArgumentException if size is negative.
-    */
-   public ExposedByteArrayOutputStream(int size, int maxDoublingSize)
-   {
-      super(size);
-      this.maxDoublingSize = maxDoublingSize;
-   }
-
-   /**
-    * Gets the internal buffer array. Note that the length of this array
-    * will almost certainly be longer than the data written to it; call
-    * <code>size()</code> to get the number of bytes of actual data.
-    */
-   public byte[] getRawBuffer()
-   {
-      return buf;
-   }
-
-   @Override
-   public void write(byte[] b, int off, int len)
-   {
-      if ((off < 0) || (off > b.length) || (len < 0) ||
-            ((off + len) > b.length) || ((off + len) < 0))
-      {
-         throw new IndexOutOfBoundsException();
-      }
-      else if (len == 0)
-      {
-         return;
-      }
-
-      int newcount = count + len;
-      if (newcount > buf.length)
-      {
-         byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
-         System.arraycopy(buf, 0, newbuf, 0, count);
-         buf = newbuf;
-      }
-
-      System.arraycopy(b, off, buf, count, len);
-      count = newcount;
-   }
-
-   @Override
-   public void write(int b)
-   {
-      int newcount = count + 1;
-      if (newcount > buf.length)
-      {
-         byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
-         System.arraycopy(buf, 0, newbuf, 0, count);
-         buf = newbuf;
-      }
-      buf[count] = (byte) b;
-      count = newcount;
-   }
-
-   /**
-    * Gets the highest internal buffer size after which if more capacity
-    * is needed the buffer will grow in 25% increments rather than 100%.
-    */
-   public int getMaxDoublingSize()
-   {
-      return maxDoublingSize;
-   }
-
-   /**
-    * Gets the number of bytes to which the internal buffer should be resized.
-    *
-    * @param curSize    the current number of bytes
-    * @param minNewSize the minimum number of bytes required
-    * @return the size to which the internal buffer should be resized
-    */
-   public int getNewBufferSize(int curSize, int minNewSize)
-   {
-      if (curSize <= maxDoublingSize)
-         return Math.max(curSize << 1, minNewSize);
-      else
-         return Math.max(curSize + (curSize >> 2), minNewSize);
-   }
-}

Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java	2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java	2008-08-04 10:47:51 UTC (rev 6486)
@@ -8,13 +8,14 @@
 import org.jboss.cache.commands.tx.PrepareCommand;
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.invocation.InvocationContext;
+import org.jboss.cache.io.ByteBuffer;
+import org.jboss.cache.marshall.AbstractMarshaller;
 import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
 import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.RegionalizedMethodCall;
 import org.jboss.cache.util.TestingUtil;
 import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
 
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -149,7 +150,7 @@
    /**
     * Needed for region based marshalling.
     */
-   private class RegionMarshallerDelegate implements Marshaller
+   private class RegionMarshallerDelegate extends AbstractMarshaller
    {
       private Marshaller realOne;
 
@@ -194,17 +195,12 @@
          return realOne.regionalizedMethodCallFromObjectStream(in);
       }
 
-      public byte[] objectToByteBuffer(Object o) throws Exception
-      {
-         return realOne.objectToByteBuffer(o);
-      }
-
       public Object objectFromByteBuffer(byte[] bytes) throws Exception
       {
          return realOne.objectFromByteBuffer(bytes);
       }
 
-      public Buffer objectToBuffer(Object o) throws Exception
+      public ByteBuffer objectToBuffer(Object o) throws Exception
       {
          return realOne.objectToBuffer(o);
       }




More information about the jbosscache-commits mailing list