[jboss-cvs] JBossCache/src/org/jboss/cache/loader/tcp ...

Manik Surtani msurtani at jboss.com
Wed Oct 25 08:16:58 EDT 2006


  User: msurtani
  Date: 06/10/25 08:16:58

  Modified:    src/org/jboss/cache/loader/tcp   Tag:
                        Branch_JBossCache_1_4_0 TcpCacheServer.java
                        TcpDelegatingCacheLoader.java
  Log:
  JBCACHE-690
  JBCACHE-800
  JBCACHE-810
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.13.4.1  +281 -163  JBossCache/src/org/jboss/cache/loader/tcp/TcpCacheServer.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: TcpCacheServer.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/loader/tcp/TcpCacheServer.java,v
  retrieving revision 1.13
  retrieving revision 1.13.4.1
  diff -u -b -r1.13 -r1.13.4.1
  --- TcpCacheServer.java	15 Mar 2006 16:02:40 -0000	1.13
  +++ TcpCacheServer.java	25 Oct 2006 12:16:58 -0000	1.13.4.1
  @@ -2,193 +2,288 @@
   
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  -import org.jboss.cache.*;
  +import org.jboss.cache.CacheException;
  +import org.jboss.cache.DataNode;
  +import org.jboss.cache.Fqn;
  +import org.jboss.cache.Modification;
  +import org.jboss.cache.PropertyConfigurator;
  +import org.jboss.cache.TreeCache;
  +import org.jboss.cache.TreeCacheMBean;
   import org.jboss.cache.loader.DelegatingCacheLoader;
   import org.jboss.mx.util.MBeanProxyExt;
   import org.jboss.system.ServiceMBeanSupport;
   
   import javax.management.MalformedObjectNameException;
   import javax.management.ObjectName;
  +import java.io.BufferedInputStream;
  +import java.io.BufferedOutputStream;
   import java.io.IOException;
   import java.io.ObjectInputStream;
   import java.io.ObjectOutputStream;
   import java.net.InetAddress;
   import java.net.ServerSocket;
   import java.net.Socket;
  +import java.net.SocketException;
   import java.net.UnknownHostException;
  -import java.util.*;
  +import java.util.ArrayList;
  +import java.util.HashMap;
  +import java.util.Iterator;
  +import java.util.LinkedList;
  +import java.util.List;
  +import java.util.Map;
  +import java.util.Set;
   
   /**
    * TCP-IP based CacheServer, configure TcpDelegatingCacheLoader with host and port of this server
  + *
    * @author Bela Ban
  - * @version $Id: TcpCacheServer.java,v 1.13 2006/03/15 16:02:40 genman Exp $
  + * @version $Id: TcpCacheServer.java,v 1.13.4.1 2006/10/25 12:16:58 msurtani Exp $
    */
  -public class TcpCacheServer extends ServiceMBeanSupport implements TcpCacheServerMBean {
  +public class TcpCacheServer extends ServiceMBeanSupport implements TcpCacheServerMBean
  +{
      ServerSocket    srv_sock;
  -   InetAddress     bind_addr=null;
  -   int             port=7500;
  +   InetAddress bind_addr = null;
  +   int port = 7500;
      TreeCacheMBean  cache;
      ObjectName      cache_name;
      String          config;
  -   boolean         running=true;
  -   List            conns=new LinkedList();
  +   boolean running = true;
  +   final List conns = new LinkedList();
      String          agendId;
  -   static Log      mylog=LogFactory.getLog(TcpCacheServer.class);
  +   Thread serverThread;
  +   /**
  +    * whether or not to start the server thread as a daemon.  Should be false if started from the command line, true if started as an MBean.
  +    */
  +   boolean daemon = true;
  +   static Log mylog = LogFactory.getLog(TcpCacheServer.class);
   
   
  -   public TcpCacheServer() {
  +   public TcpCacheServer()
  +   {
      }
   
  -   public String getBindAddress() {
  -      return bind_addr != null? bind_addr.toString() : "n/a";
  +   public String getBindAddress()
  +   {
  +      return bind_addr != null ? bind_addr.toString() : "n/a";
      }
   
  -   public void setBindAddress(String bind_addr) throws UnknownHostException {
  -      if(bind_addr != null)
  -         this.bind_addr=InetAddress.getByName(bind_addr);
  +   public void setBindAddress(String bind_addr) throws UnknownHostException
  +   {
  +      if (bind_addr != null)
  +         this.bind_addr = InetAddress.getByName(bind_addr);
      }
   
  -   public int getPort() {
  +   public int getPort()
  +   {
         return port;
      }
   
  -   public void setPort(int port) {
  -      this.port=port;
  +   public void setPort(int port)
  +   {
  +      this.port = port;
      }
   
  -   public String getMBeanServerName() {
  +   public String getMBeanServerName()
  +   {
         return agendId;
      }
   
  -   public void setMBeanServerName(String name) {
  -      agendId=name;
  +   public void setMBeanServerName(String name)
  +   {
  +      agendId = name;
      }
   
  -   public String getConfig() {
  +   public String getConfig()
  +   {
         return config;
      }
   
  -   public void setConfig(String config) {
  -      this.config=config;
  +   public void setConfig(String config)
  +   {
  +      this.config = config;
      }
   
  -   public TreeCacheMBean getCache() {
  +   public TreeCacheMBean getCache()
  +   {
         return cache;
      }
   
  -   public void setCache(TreeCacheMBean cache) {
  -      this.cache=cache;
  +   public void setCache(TreeCacheMBean cache)
  +   {
  +      this.cache = cache;
      }
   
  -   public String getCacheName() {
  -      return cache_name != null? cache_name.toString() : "n/a";
  +   public String getCacheName()
  +   {
  +      return cache_name != null ? cache_name.toString() : "n/a";
      }
   
  -   public void setCacheName(String cache_name) throws MalformedObjectNameException {
  -      this.cache_name=new ObjectName(cache_name);
  +   public void setCacheName(String cache_name) throws MalformedObjectNameException
  +   {
  +      this.cache_name = new ObjectName(cache_name);
      }
   
  -   public void createService() throws Exception {
  +   public void createService() throws Exception
  +   {
         super.createService();
      }
   
  -   public void startService() throws Exception {
  -      Socket client_sock;
  -      Connection conn;
  +   public void startService() throws Exception
  +   {
   
  -      if(cache == null) {
  +      if (cache == null)
  +      {
            // 1. check whether we have an object name, pointing to the cache MBean
  -         if(cache_name != null && server != null) {
  -            cache=(TreeCacheMBean)MBeanProxyExt.create(TreeCacheMBean.class, cache_name, server);
  +         if (cache_name != null && server != null)
  +         {
  +            cache = (TreeCacheMBean) MBeanProxyExt.create(TreeCacheMBean.class, cache_name, server);
            }
         }
   
  -      if(cache == null) { // still not set
  -         if(config != null) {
  -            cache=new TreeCache();
  -            PropertyConfigurator cfg=new PropertyConfigurator();
  +      if (cache == null)
  +      { // still not set
  +         if (config != null)
  +         {
  +            cache = new TreeCache();
  +            PropertyConfigurator cfg = new PropertyConfigurator();
               cfg.configure(cache, config);
               cache.createService();
               cache.startService();
            }
         }
   
  -      if(cache == null)
  +      if (cache == null)
            throw new CacheException("cache reference is not set");
   
   
  -      srv_sock=new ServerSocket(port, 10, bind_addr);
  +      srv_sock = new ServerSocket(port, 10, bind_addr);
         System.out.println("TcpCacheServer listening on : " + srv_sock.getInetAddress() + ":" + srv_sock.getLocalPort());
  -      while(running) {
  -         client_sock=srv_sock.accept();
  -         conn=new Connection(client_sock, cache);
  +      mylog.info("TcpCacheServer listening on : " + srv_sock.getInetAddress() + ":" + srv_sock.getLocalPort());
  +
  +      running = true;
  +
  +      serverThread = new Thread("TcpCacheServer")
  +      {
  +         public void run()
  +         {
  +            try
  +            {
  +               while (running)
  +               {
  +                  Socket client_sock = srv_sock.accept();
  +                  Connection conn = new Connection(client_sock, cache);
            conns.add(conn);
            conn.start();
         }
      }
  +            catch (SocketException se)
  +            {
  +               if (!running)
  +               {
  +                  // this is because of the stop() lifecycle method being called.
  +                  // ignore.
  +                  mylog.info("Shutting down TcpCacheServer");
  +               }
  +               else
  +               {
  +                  mylog.error("Caught exception! Shutting down server thread.", se);
  +               }
  +            }
  +            catch (IOException e)
  +            {
  +               mylog.error("Caught exception! Shutting down server thread.", e);
  +            }
  +         }
  +      };
  +      serverThread.setDaemon(daemon);
  +      serverThread.start();
  +
  +   }
   
  -   public void stopService() {
  -      running=false;
  -      for(Iterator it=conns.iterator(); it.hasNext();) {
  -         Connection conn=(Connection)it.next();
  +   public void stopService()
  +   {
  +      running = false;
  +      for (Iterator it = conns.iterator(); it.hasNext();)
  +      {
  +         Connection conn = (Connection) it.next();
            conn.close();
         }
         conns.clear();
  -      if(srv_sock != null) {
  -         try {
  +
  +      if (srv_sock != null)
  +      {
  +         try
  +         {
               srv_sock.close();
  +            srv_sock = null;
            }
  -         catch(IOException e) {
  +         catch (IOException e)
  +         {
            }
         }
      }
   
   
  -   public String getConnections() {
  -      StringBuffer sb=new StringBuffer();
  +   public String getConnections()
  +   {
  +      StringBuffer sb = new StringBuffer();
         sb.append(conns.size() + " connections:\n");
  -      for(Iterator it=conns.iterator(); it.hasNext();) {
  -         Connection c=(Connection)it.next();
  +      for (Iterator it = conns.iterator(); it.hasNext();)
  +      {
  +         Connection c = (Connection) it.next();
            sb.append(c).append("\n");
         }
         return sb.toString();
      }
   
   
  -   public void destroy() {
  +   public void destroy()
  +   {
         super.destroy();
      }
   
   
  -   private static class Connection implements Runnable {
  -      Socket             sock=null;
  -      ObjectInputStream  input=null;
  -      ObjectOutputStream output=null;
  +   private class Connection implements Runnable
  +   {
  +      Socket sock = null;
  +      ObjectInputStream input = null;
  +      ObjectOutputStream output = null;
         TreeCacheMBean     c;
  -      Thread             t=null;
  +      Thread t = null;
  +
  +      public Connection(Socket sock, TreeCacheMBean cache) throws IOException
  +      {
  +         this.sock = sock;
  +
  +         output = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
  +         output.flush();
  +
  +         input = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
   
  -      public Connection(Socket sock, TreeCacheMBean cache) throws IOException {
  -         this.sock=sock;
  -         output=new ObjectOutputStream(sock.getOutputStream());
  -         input=new ObjectInputStream(sock.getInputStream());
  -         c=cache;
  +         c = cache;
         }
   
   
  -      public void start() {
  -         t=new Thread(this, "TcpCacheServer.Connection");
  +      public void start()
  +      {
  +         t = new Thread(this, "TcpCacheServer.Connection");
            t.setDaemon(true);
            t.start();
         }
   
  -      public void close() {
  -         t=null;
  -         try {if(output != null) output.close();} catch(Throwable th) {}
  -         try {if(input != null) input.close();} catch(Throwable th) {}
  -         try {if(sock != null) sock.close();} catch(Throwable th) {}
  +      public void close()
  +      {
  +         t = null;
  +         try {if (output != null) output.close();} catch (Throwable th) {}
  +         try {if (input != null) input.close();} catch (Throwable th) {}
  +         try {if (sock != null) sock.close();} catch (Throwable th) {}
  +
  +         // remove self from connections list
  +         conns.remove(this);
         }
   
  -      public void run() {
  +      public void run()
  +      {
            int op;
            Fqn fqn;
            Object key, val, retval;
  @@ -197,104 +292,113 @@
            boolean flag;
            byte[] state;
   
  -         while(t != null && Thread.currentThread().equals(t)) {
  -            try {
  -               op=input.readInt();
  -            }
  -            catch(IOException e) {
  -               mylog.warn("failed reading data, thread will terminate", e);
  -               try {if(output != null) output.close();} catch(Throwable th) {}
  -               try {if(input != null) input.close();} catch(Throwable th) {}
  -               try {if(sock != null) sock.close();} catch(Throwable th) {}
  +         while (t != null && Thread.currentThread().equals(t))
  +         {
  +            try
  +            {
  +               op = input.readInt();
  +            }
  +            catch (IOException e)
  +            {
  +               mylog.warn("Client closed socket");
  +               close();
                  break;
               }
   
  -            try {
  -               switch(op) {
  +            try
  +            {
  +               output.reset();
  +               switch (op)
  +               {
                     case DelegatingCacheLoader.delegateGetChildrenNames:
  -                     fqn=(Fqn)input.readObject();
  -                     Set children=c.getChildrenNames(fqn);
  +                     fqn = (Fqn) input.readObject();
  +                     Set children = c.getChildrenNames(fqn);
                        output.writeObject(children);  // this may be null - that's okay
                        break;
                     case DelegatingCacheLoader.delegateGetKey:
  -                     fqn=(Fqn)input.readObject();
  -                     key=input.readObject();
  -                     retval=c.get(fqn, key);
  +                     fqn = (Fqn) input.readObject();
  +                     key = input.readObject();
  +                     retval = c.get(fqn, key);
                        output.writeObject(retval);
                        break;
                     case DelegatingCacheLoader.delegateGet:
  -                     fqn=(Fqn)input.readObject();
  -                     n=c.get(fqn);
  -                     if(n == null) { // node doesn't exist - return null
  +                     fqn = (Fqn) input.readObject();
  +                     n = c.get(fqn);
  +                     if (n == null)
  +                     { // node doesn't exist - return null
                           output.writeObject(n);
                           break;
                        }
  -                     map=n.getData();
  -                     if(map == null) map=new HashMap();
  +                     map = n.getData();
  +                     if (map == null) map = new HashMap();
                        output.writeObject(map);
                        break;
                     case DelegatingCacheLoader.delegateExists:
  -                     fqn=(Fqn)input.readObject();
  -                     flag=c.exists(fqn);
  +                     fqn = (Fqn) input.readObject();
  +                     flag = c.exists(fqn);
                        output.writeObject(Boolean.valueOf(flag));
                        break;
                     case DelegatingCacheLoader.delegatePutKeyVal:
  -                     fqn=(Fqn)input.readObject();
  -                     key=input.readObject();
  -                     val=input.readObject();
  -                     retval=c.put(fqn, key, val);
  +                     fqn = (Fqn) input.readObject();
  +                     key = input.readObject();
  +                     val = input.readObject();
  +                     retval = c.put(fqn, key, val);
                        output.writeObject(retval);
                        break;
                     case DelegatingCacheLoader.delegatePut:
  -                     fqn=(Fqn)input.readObject();
  -                     map=(Map)input.readObject();
  +                     fqn = (Fqn) input.readObject();
  +                     map = (Map) input.readObject();
                        c.put(fqn, map);
                        output.writeObject(Boolean.TRUE);
                        break;
   
                     case DelegatingCacheLoader.putList:
  -                     int length=input.readInt();
  -                     retval=Boolean.TRUE;
  -                     if(length > 0) {
  +                     int length = input.readInt();
  +                     retval = Boolean.TRUE;
  +                     if (length > 0)
  +                     {
                           Modification mod;
  -                        List mods=new ArrayList(length);
  -                        for(int i=0; i < length; i++) {
  -                           mod=new Modification();
  +                        List mods = new ArrayList(length);
  +                        for (int i = 0; i < length; i++)
  +                        {
  +                           mod = new Modification();
                              mod.readExternal(input);
                              mods.add(mod);
                           }
  -                        try {
  +                        try
  +                        {
                              handleModifications(mods);
                           }
  -                        catch(Exception ex) {
  -                           retval=ex;
  +                        catch (Exception ex)
  +                        {
  +                           retval = ex;
                           }
                        }
                        output.writeObject(retval);
                        break;
                     case DelegatingCacheLoader.delegateRemoveKey:
  -                     fqn=(Fqn)input.readObject();
  -                     key=input.readObject();
  -                     retval=c.remove(fqn, key);
  +                     fqn = (Fqn) input.readObject();
  +                     key = input.readObject();
  +                     retval = c.remove(fqn, key);
                        output.writeObject(retval);
                        break;
                     case DelegatingCacheLoader.delegateRemove:
  -                     fqn=(Fqn)input.readObject();
  +                     fqn = (Fqn) input.readObject();
                        c.remove(fqn);
                        output.writeObject(Boolean.TRUE);
                        break;
                     case DelegatingCacheLoader.delegateRemoveData:
  -                     fqn=(Fqn)input.readObject();
  +                     fqn = (Fqn) input.readObject();
                        c.removeData(fqn);
                        output.writeObject(Boolean.TRUE);
                        break;
                     case DelegatingCacheLoader.delegateLoadEntireState:
  -                     state=c.getCacheLoader() != null? c.getCacheLoader().loadEntireState() : null;
  +                     state = c.getCacheLoader() != null ? c.getCacheLoader().loadEntireState() : null;
                        output.writeObject(state);
                        break;
                     case DelegatingCacheLoader.delegateStoreEntireState:
  -                     state=(byte[])input.readObject();
  -                     if(c.getCacheLoader() != null)
  +                     state = (byte[]) input.readObject();
  +                     if (c.getCacheLoader() != null)
                           c.getCacheLoader().storeEntireState(state);
                        output.writeObject(Boolean.TRUE);
                        break;
  @@ -304,12 +408,15 @@
                  }
                  output.flush();
               }
  -            catch(Exception e) {
  -               try {
  +            catch (Exception e)
  +            {
  +               try
  +               {
                     output.writeObject(e);
                     output.flush();
                  }
  -               catch(IOException e1) {
  +               catch (IOException e1)
  +               {
                     e1.printStackTrace();
                  }
               }
  @@ -317,18 +424,23 @@
         }
   
   
  -      public String toString() {
  -         StringBuffer sb=new StringBuffer();
  -         if(sock != null) {
  +      public String toString()
  +      {
  +         StringBuffer sb = new StringBuffer();
  +         if (sock != null)
  +         {
               sb.append(sock.getRemoteSocketAddress());
            }
            return sb.toString();
         }
   
  -      protected void handleModifications(List modifications) throws CacheException {
  -         for(Iterator it=modifications.iterator(); it.hasNext();) {
  -            Modification m=(Modification)it.next();
  -            switch(m.getType()) {
  +      protected void handleModifications(List modifications) throws CacheException
  +      {
  +         for (Iterator it = modifications.iterator(); it.hasNext();)
  +         {
  +            Modification m = (Modification) it.next();
  +            switch (m.getType())
  +            {
                  case Modification.PUT_DATA:
                     c.put(m.getFqn(), m.getData());
                     break;
  @@ -358,41 +470,47 @@
      }
   
   
  +   public static void main(String[] args) throws Exception
  +   {
   
  -
  -   public static void main(String[] args) throws Exception {
  -      String bind_addr=null;
  -      int port=7500;
  +      String bind_addr = null;
  +      int port = 7500;
         TcpCacheServer server;
  -      String config=null;
  +      String config = null;
   
  -      for(int i=0; i < args.length; i++) {
  -         if(args[i].equals("-bind_addr")) {
  -            bind_addr=args[++i];
  +      for (int i = 0; i < args.length; i++)
  +      {
  +         if (args[i].equals("-bind_addr"))
  +         {
  +            bind_addr = args[++i];
               continue;
            }
  -         if(args[i].equals("-port")) {
  -            port=Integer.parseInt(args[++i]);
  +         if (args[i].equals("-port"))
  +         {
  +            port = Integer.parseInt(args[++i]);
               continue;
            }
  -         if(args[i].equals("-config")) {
  -            config=args[++i];
  +         if (args[i].equals("-config"))
  +         {
  +            config = args[++i];
               continue;
            }
            help();
            return;
         }
  -      server=new TcpCacheServer();
  +      server = new TcpCacheServer();
  +      server.daemon = false;
         server.setBindAddress(bind_addr);
         server.setPort(port);
         server.setConfig(config);
         server.createService();
         server.startService();
  -   }
   
  +   }
   
   
  -   private static void help() {
  +   private static void help()
  +   {
         System.out.println("TcpCacheServer [-bind_addr <address>] [-port <port>] [-config <config file>] [-help]");
      }
   }
  
  
  
  1.5.4.1   +194 -114  JBossCache/src/org/jboss/cache/loader/tcp/Attic/TcpDelegatingCacheLoader.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: TcpDelegatingCacheLoader.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/loader/tcp/Attic/TcpDelegatingCacheLoader.java,v
  retrieving revision 1.5
  retrieving revision 1.5.4.1
  diff -u -b -r1.5 -r1.5.4.1
  --- TcpDelegatingCacheLoader.java	20 Jan 2006 12:50:46 -0000	1.5
  +++ TcpDelegatingCacheLoader.java	25 Oct 2006 12:16:58 -0000	1.5.4.1
  @@ -7,31 +7,38 @@
   package org.jboss.cache.loader.tcp;
   
   import org.jboss.cache.Fqn;
  -import org.jboss.cache.TreeCache;
   import org.jboss.cache.Modification;
  +import org.jboss.cache.TreeCache;
   import org.jboss.cache.loader.DelegatingCacheLoader;
   
  +import java.io.BufferedInputStream;
  +import java.io.BufferedOutputStream;
   import java.io.IOException;
   import java.io.ObjectInputStream;
   import java.io.ObjectOutputStream;
   import java.net.Socket;
  -import java.util.*;
  +import java.util.Iterator;
  +import java.util.List;
  +import java.util.Map;
  +import java.util.Properties;
  +import java.util.Set;
   
   /**
    * DelegatingCacheLoader implementation which delegates to a remote (not in the same VM)
    * TreeCache using TCP/IP for communication. Example configuration for connecting to a TcpCacheServer
    * running at myHost:12345:<pre>
    * <attribute name="CacheLoaderClass">org.jboss.cache.loader.tcp.TcpDelegatingCacheLoader</attribute>
  -   <attribute name="CacheLoaderConfig">
  -       host=localhost
  -       port=2099
  -   </attribute>
  - </pre>
  + * <attribute name="CacheLoaderConfig">
  + * host=localhost
  + * port=2099
  + * </attribute>
  + * </pre>
    *
    * @author Bela Ban
  - * @version $Id: TcpDelegatingCacheLoader.java,v 1.5 2006/01/20 12:50:46 bela Exp $
  + * @version $Id: TcpDelegatingCacheLoader.java,v 1.5.4.1 2006/10/25 12:16:58 msurtani Exp $
    */
  -public class TcpDelegatingCacheLoader extends DelegatingCacheLoader {
  +public class TcpDelegatingCacheLoader extends DelegatingCacheLoader
  +{
      private Socket     sock;
      private String     host;
      private int        port;
  @@ -42,7 +49,8 @@
      /**
       * Default constructor.
       */
  -   public TcpDelegatingCacheLoader() {
  +   public TcpDelegatingCacheLoader()
  +   {
         // Empty.
      }
   
  @@ -52,7 +60,8 @@
       * @param host The host on which to look up the remote object.
       * @param port The port on which to look up the remote object.
       */
  -   public TcpDelegatingCacheLoader(String host, int port) {
  +   public TcpDelegatingCacheLoader(String host, int port)
  +   {
         this.host = host;
         this.port = port;
      }
  @@ -62,31 +71,42 @@
       * 
       * @see org.jboss.cache.loader.DelegatingCacheLoader#setConfig(java.util.Properties)
       */
  -   public void setConfig(Properties props) {
  +   public void setConfig(Properties props)
  +   {
         this.host = props.getProperty("host");
  -      if(this.host == null || this.host.length() == 0) {
  +      if (this.host == null || this.host.length() == 0)
  +      {
            this.host = "localhost";
         }
         this.port = Integer.parseInt(props.getProperty("port"));
      }
   
  -   public void start() throws Exception {
  +   public void start() throws Exception
  +   {
         init();
      }
   
  -   public void stop() {
  -      try {if(in != null) in.close();} catch(IOException e) {}
  -      try {if(out != null) out.close();} catch(IOException e) {}
  -      try {if(sock != null) sock.close();} catch(IOException e) {}
  +   public void stop()
  +   {
  +      synchronized (out)
  +      {
  +         try {if (in != null) in.close();} catch (IOException e) {}
  +         try {if (out != null) out.close();} catch (IOException e) {}
  +         try {if (sock != null) sock.close();} catch (IOException e) {}
  +      }
      }
   
   
  -   private void init() throws IOException {
  -      if(host == null)
  -         host="localhost";
  -      sock=new Socket(host, port);
  -      out=new ObjectOutputStream(sock.getOutputStream());
  -      in=new ObjectInputStream(sock.getInputStream());
  +   private void init() throws IOException
  +   {
  +      if (host == null)
  +         host = "localhost";
  +      sock = new Socket(host, port);
  +      out = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
  +      out.flush();
  +//      out=new ObjectOutputStream(sock.getOutputStream());
  +//      in=new ObjectInputStream(sock.getInputStream());
  +      in = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
      }
   
      /**
  @@ -94,25 +114,32 @@
       * 
       * @see org.jboss.cache.loader.DelegatingCacheLoader#setCache(org.jboss.cache.TreeCache)
       */
  -   public void setCache(TreeCache cache) {
  +   public void setCache(TreeCache cache)
  +   {
      }
   
      /**
       * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateGetChildrenNames(org.jboss.cache.Fqn)
       */
  -   protected Set delegateGetChildrenNames(Fqn fqn) throws Exception {
  +   protected Set delegateGetChildrenNames(Fqn fqn) throws Exception
  +   {
  +      synchronized (out)
  +      {
  +         out.reset();
         out.writeInt(DelegatingCacheLoader.delegateGetChildrenNames);
         out.writeObject(fqn);
  -      Object retval=in.readObject();
  -      if(retval instanceof Exception)
  -         throw (Exception)retval;
  -      return (Set)retval;
  +         out.flush();
  +         Object retval = in.readObject();
  +         if (retval instanceof Exception)
  +            throw(Exception) retval;
  +         return (Set) retval;
  +      }
      }
   
       // See http://jira.jboss.com/jira/browse/JBCACHE-118 for why this is commented out.
   
       /**
  -    * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateGet(org.jboss.cache.Fqn, Object)
  +    * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateGet(org.jboss.cache.Fqn,Object)
       */
   //   protected Object delegateGet(Fqn name, Object key) throws Exception {
   //      out.writeInt(DelegatingCacheLoader.delegateGetKey);
  @@ -124,111 +151,164 @@
      /**
       * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateGet(org.jboss.cache.Fqn)
       */
  -   protected Map delegateGet(Fqn name) throws Exception {
  +   protected Map delegateGet(Fqn name) throws Exception
  +   {
  +      synchronized (out)
  +      {
  +         out.reset();
         out.writeInt(DelegatingCacheLoader.delegateGet);
         out.writeObject(name);
  -      Object retval=in.readObject();
  -      if(retval instanceof Exception)
  -         throw (Exception)retval;
  -      return (Map)retval;
  +         out.flush();
  +         Object retval = in.readObject();
  +         if (retval instanceof Exception)
  +            throw(Exception) retval;
  +         return (Map) retval;
  +      }
      }
   
      /**
       * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateExists(org.jboss.cache.Fqn)
       */
  -   protected boolean delegateExists(Fqn name) throws Exception {
  +   protected boolean delegateExists(Fqn name) throws Exception
  +   {
  +      synchronized (out)
  +      {
  +         out.reset();
         out.writeInt(DelegatingCacheLoader.delegateExists);
         out.writeObject(name);
  -      Object retval=in.readObject();
  -      if(retval instanceof Exception)
  -         throw (Exception)retval;
  -      return ((Boolean)retval).booleanValue();
  +         out.flush();
  +         Object retval = in.readObject();
  +         if (retval instanceof Exception)
  +            throw(Exception) retval;
  +         return ((Boolean) retval).booleanValue();
  +      }
      }
   
      /**
  -    * @see org.jboss.cache.loader.DelegatingCacheLoader#delegatePut(org.jboss.cache.Fqn, Object, Object)
  +    * @see org.jboss.cache.loader.DelegatingCacheLoader#delegatePut(org.jboss.cache.Fqn,Object,Object)
       */
  -   protected Object delegatePut(Fqn name, Object key, Object value) throws Exception {
  +   protected Object delegatePut(Fqn name, Object key, Object value) throws Exception
  +   {
  +      synchronized (out)
  +      {
  +
  +         out.reset();
         out.writeInt(DelegatingCacheLoader.delegatePutKeyVal);
         out.writeObject(name);
         out.writeObject(key);
         out.writeObject(value);
  -      Object retval=in.readObject();
  -      if(retval instanceof Exception)
  -         throw (Exception)retval;
  +         out.flush();
  +         Object retval = in.readObject();
  +         if (retval instanceof Exception)
  +            throw(Exception) retval;
         return retval;
      }
  +   }
   
      /**
  -    * @see org.jboss.cache.loader.DelegatingCacheLoader#delegatePut(org.jboss.cache.Fqn, java.util.Map)
  +    * @see org.jboss.cache.loader.DelegatingCacheLoader#delegatePut(org.jboss.cache.Fqn,java.util.Map)
       */
  -   protected void delegatePut(Fqn name, Map attributes) throws Exception {
  +   protected void delegatePut(Fqn name, Map attributes) throws Exception
  +   {
  +      synchronized (out)
  +      {
  +
  +         out.reset();
         out.writeInt(DelegatingCacheLoader.delegatePut);
         out.writeObject(name);
         out.writeObject(attributes);
         out.flush();
  -      Object retval=in.readObject();
  -      if(retval instanceof Exception)
  -         throw (Exception)retval;
  +         Object retval = in.readObject();
  +         if (retval instanceof Exception)
  +            throw(Exception) retval;
  +      }
      }
   
  -   protected void delegatePut(List modifications) throws Exception {
  +   protected void delegatePut(List modifications) throws Exception
  +   {
  +      synchronized (out)
  +      {
  +
  +         out.reset();
         out.writeInt(DelegatingCacheLoader.putList);
  -      int length=modifications != null? modifications.size() : 0;
  +         int length = modifications != null ? modifications.size() : 0;
         out.writeInt(length);
  -      if(length > 0) {
  -         for(Iterator it=modifications.iterator(); it.hasNext();) {
  -            Modification m=(Modification)it.next();
  +         if (length > 0)
  +         {
  +            for (Iterator it = modifications.iterator(); it.hasNext();)
  +            {
  +               Modification m = (Modification) it.next();
               m.writeExternal(out);
            }
         }
         out.flush();
  -      Object retval=in.readObject();
  -      if(retval instanceof Exception)
  -         throw (Exception)retval;
  +         Object retval = in.readObject();
  +         if (retval instanceof Exception)
  +            throw(Exception) retval;
  +      }
      }
   
      /**
  -    * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateRemove(org.jboss.cache.Fqn, Object)
  +    * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateRemove(org.jboss.cache.Fqn,Object)
       */
  -   protected Object delegateRemove(Fqn name, Object key) throws Exception {
  +   protected Object delegateRemove(Fqn name, Object key) throws Exception
  +   {
  +      synchronized (out)
  +      {
  +
  +         out.reset();
         out.writeInt(DelegatingCacheLoader.delegateRemoveKey);
         out.writeObject(name);
         out.writeObject(key);
  -      Object retval=in.readObject();
  -      if(retval instanceof Exception)
  -         throw (Exception)retval;
  +         out.flush();
  +         Object retval = in.readObject();
  +         if (retval instanceof Exception)
  +            throw(Exception) retval;
         return retval;
      }
  +   }
   
      /**
       * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateRemove(org.jboss.cache.Fqn)
       */
  -   protected void delegateRemove(Fqn name) throws Exception {
  +   protected void delegateRemove(Fqn name) throws Exception
  +   {
  +      synchronized (out)
  +      {
  +
  +         out.reset();
         out.writeInt(DelegatingCacheLoader.delegateRemove);
         out.writeObject(name);
         out.flush();
  -      Object retval=in.readObject();
  -      if(retval instanceof Exception)
  -         throw (Exception)retval;
  +         Object retval = in.readObject();
  +         if (retval instanceof Exception)
  +            throw(Exception) retval;
  +      }
      }
   
      /**
       * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateRemoveData(org.jboss.cache.Fqn)
       */
  -   protected void delegateRemoveData(Fqn name) throws Exception {
  +   protected void delegateRemoveData(Fqn name) throws Exception
  +   {
  +      synchronized (out)
  +      {
  +
  +         out.reset();
         out.writeInt(DelegatingCacheLoader.delegateRemoveData);
         out.writeObject(name);
         out.flush();
  -      Object retval=in.readObject();
  -      if(retval instanceof Exception)
  -         throw (Exception)retval;
  +         Object retval = in.readObject();
  +         if (retval instanceof Exception)
  +            throw(Exception) retval;
  +      }
      }
   
      /**
       * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateLoadEntireState()
       */
  -   public byte[] delegateLoadEntireState() throws Exception {
  +   public byte[] delegateLoadEntireState() throws Exception
  +   {
         throw new UnsupportedOperationException("operation is not currently supported - need to define semantics first");
   //      out.writeInt(DelegatingCacheLoader.delegateLoadEntireState);
   //      out.flush();
  @@ -241,7 +321,8 @@
      /**
       * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateStoreEntireState(byte[])
       */
  -   public void delegateStoreEntireState(byte[] state) throws Exception {
  +   public void delegateStoreEntireState(byte[] state) throws Exception
  +   {
         throw new UnsupportedOperationException("operation is not currently supported - need to define semantics first");
   //      out.writeInt(DelegatingCacheLoader.delegateStoreEntireState);
   //      out.writeObject(state);
  @@ -252,5 +333,4 @@
      }
   
   
  -
   }
  
  
  



More information about the jboss-cvs-commits mailing list