[jboss-cvs] JBossCache/src/org/jboss/cache/loader/tcp ...
Manik Surtani
msurtani at jboss.com
Wed Oct 25 08:16:58 EDT 2006
User: msurtani
Date: 06/10/25 08:16:58
Modified: src/org/jboss/cache/loader/tcp Tag:
Branch_JBossCache_1_4_0 TcpCacheServer.java
TcpDelegatingCacheLoader.java
Log:
JBCACHE-690
JBCACHE-800
JBCACHE-810
Revision Changes Path
No revision
No revision
1.13.4.1 +281 -163 JBossCache/src/org/jboss/cache/loader/tcp/TcpCacheServer.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TcpCacheServer.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/loader/tcp/TcpCacheServer.java,v
retrieving revision 1.13
retrieving revision 1.13.4.1
diff -u -b -r1.13 -r1.13.4.1
--- TcpCacheServer.java 15 Mar 2006 16:02:40 -0000 1.13
+++ TcpCacheServer.java 25 Oct 2006 12:16:58 -0000 1.13.4.1
@@ -2,193 +2,288 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.*;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.DataNode;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.Modification;
+import org.jboss.cache.PropertyConfigurator;
+import org.jboss.cache.TreeCache;
+import org.jboss.cache.TreeCacheMBean;
import org.jboss.cache.loader.DelegatingCacheLoader;
import org.jboss.mx.util.MBeanProxyExt;
import org.jboss.system.ServiceMBeanSupport;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketException;
import java.net.UnknownHostException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* TCP-IP based CacheServer, configure TcpDelegatingCacheLoader with host and port of this server
+ *
* @author Bela Ban
- * @version $Id: TcpCacheServer.java,v 1.13 2006/03/15 16:02:40 genman Exp $
+ * @version $Id: TcpCacheServer.java,v 1.13.4.1 2006/10/25 12:16:58 msurtani Exp $
*/
-public class TcpCacheServer extends ServiceMBeanSupport implements TcpCacheServerMBean {
+public class TcpCacheServer extends ServiceMBeanSupport implements TcpCacheServerMBean
+{
ServerSocket srv_sock;
- InetAddress bind_addr=null;
- int port=7500;
+ InetAddress bind_addr = null;
+ int port = 7500;
TreeCacheMBean cache;
ObjectName cache_name;
String config;
- boolean running=true;
- List conns=new LinkedList();
+ boolean running = true;
+ final List conns = new LinkedList();
String agendId;
- static Log mylog=LogFactory.getLog(TcpCacheServer.class);
+ Thread serverThread;
+ /**
+ * whether or not to start the server thread as a daemon. Should be false if started from the command line, true if started as an MBean.
+ */
+ boolean daemon = true;
+ static Log mylog = LogFactory.getLog(TcpCacheServer.class);
- public TcpCacheServer() {
+ public TcpCacheServer()
+ {
}
- public String getBindAddress() {
- return bind_addr != null? bind_addr.toString() : "n/a";
+ public String getBindAddress()
+ {
+ return bind_addr != null ? bind_addr.toString() : "n/a";
}
- public void setBindAddress(String bind_addr) throws UnknownHostException {
- if(bind_addr != null)
- this.bind_addr=InetAddress.getByName(bind_addr);
+ public void setBindAddress(String bind_addr) throws UnknownHostException
+ {
+ if (bind_addr != null)
+ this.bind_addr = InetAddress.getByName(bind_addr);
}
- public int getPort() {
+ public int getPort()
+ {
return port;
}
- public void setPort(int port) {
- this.port=port;
+ public void setPort(int port)
+ {
+ this.port = port;
}
- public String getMBeanServerName() {
+ public String getMBeanServerName()
+ {
return agendId;
}
- public void setMBeanServerName(String name) {
- agendId=name;
+ public void setMBeanServerName(String name)
+ {
+ agendId = name;
}
- public String getConfig() {
+ public String getConfig()
+ {
return config;
}
- public void setConfig(String config) {
- this.config=config;
+ public void setConfig(String config)
+ {
+ this.config = config;
}
- public TreeCacheMBean getCache() {
+ public TreeCacheMBean getCache()
+ {
return cache;
}
- public void setCache(TreeCacheMBean cache) {
- this.cache=cache;
+ public void setCache(TreeCacheMBean cache)
+ {
+ this.cache = cache;
}
- public String getCacheName() {
- return cache_name != null? cache_name.toString() : "n/a";
+ public String getCacheName()
+ {
+ return cache_name != null ? cache_name.toString() : "n/a";
}
- public void setCacheName(String cache_name) throws MalformedObjectNameException {
- this.cache_name=new ObjectName(cache_name);
+ public void setCacheName(String cache_name) throws MalformedObjectNameException
+ {
+ this.cache_name = new ObjectName(cache_name);
}
- public void createService() throws Exception {
+ public void createService() throws Exception
+ {
super.createService();
}
- public void startService() throws Exception {
- Socket client_sock;
- Connection conn;
+ public void startService() throws Exception
+ {
- if(cache == null) {
+ if (cache == null)
+ {
// 1. check whether we have an object name, pointing to the cache MBean
- if(cache_name != null && server != null) {
- cache=(TreeCacheMBean)MBeanProxyExt.create(TreeCacheMBean.class, cache_name, server);
+ if (cache_name != null && server != null)
+ {
+ cache = (TreeCacheMBean) MBeanProxyExt.create(TreeCacheMBean.class, cache_name, server);
}
}
- if(cache == null) { // still not set
- if(config != null) {
- cache=new TreeCache();
- PropertyConfigurator cfg=new PropertyConfigurator();
+ if (cache == null)
+ { // still not set
+ if (config != null)
+ {
+ cache = new TreeCache();
+ PropertyConfigurator cfg = new PropertyConfigurator();
cfg.configure(cache, config);
cache.createService();
cache.startService();
}
}
- if(cache == null)
+ if (cache == null)
throw new CacheException("cache reference is not set");
- srv_sock=new ServerSocket(port, 10, bind_addr);
+ srv_sock = new ServerSocket(port, 10, bind_addr);
System.out.println("TcpCacheServer listening on : " + srv_sock.getInetAddress() + ":" + srv_sock.getLocalPort());
- while(running) {
- client_sock=srv_sock.accept();
- conn=new Connection(client_sock, cache);
+ mylog.info("TcpCacheServer listening on : " + srv_sock.getInetAddress() + ":" + srv_sock.getLocalPort());
+
+ running = true;
+
+ serverThread = new Thread("TcpCacheServer")
+ {
+ public void run()
+ {
+ try
+ {
+ while (running)
+ {
+ Socket client_sock = srv_sock.accept();
+ Connection conn = new Connection(client_sock, cache);
conns.add(conn);
conn.start();
}
}
+ catch (SocketException se)
+ {
+ if (!running)
+ {
+ // this is because of the stop() lifecycle method being called.
+ // ignore.
+ mylog.info("Shutting down TcpCacheServer");
+ }
+ else
+ {
+ mylog.error("Caught exception! Shutting down server thread.", se);
+ }
+ }
+ catch (IOException e)
+ {
+ mylog.error("Caught exception! Shutting down server thread.", e);
+ }
+ }
+ };
+ serverThread.setDaemon(daemon);
+ serverThread.start();
+
+ }
- public void stopService() {
- running=false;
- for(Iterator it=conns.iterator(); it.hasNext();) {
- Connection conn=(Connection)it.next();
+ public void stopService()
+ {
+ running = false;
+ for (Iterator it = conns.iterator(); it.hasNext();)
+ {
+ Connection conn = (Connection) it.next();
conn.close();
}
conns.clear();
- if(srv_sock != null) {
- try {
+
+ if (srv_sock != null)
+ {
+ try
+ {
srv_sock.close();
+ srv_sock = null;
}
- catch(IOException e) {
+ catch (IOException e)
+ {
}
}
}
- public String getConnections() {
- StringBuffer sb=new StringBuffer();
+ public String getConnections()
+ {
+ StringBuffer sb = new StringBuffer();
sb.append(conns.size() + " connections:\n");
- for(Iterator it=conns.iterator(); it.hasNext();) {
- Connection c=(Connection)it.next();
+ for (Iterator it = conns.iterator(); it.hasNext();)
+ {
+ Connection c = (Connection) it.next();
sb.append(c).append("\n");
}
return sb.toString();
}
- public void destroy() {
+ public void destroy()
+ {
super.destroy();
}
- private static class Connection implements Runnable {
- Socket sock=null;
- ObjectInputStream input=null;
- ObjectOutputStream output=null;
+ private class Connection implements Runnable
+ {
+ Socket sock = null;
+ ObjectInputStream input = null;
+ ObjectOutputStream output = null;
TreeCacheMBean c;
- Thread t=null;
+ Thread t = null;
+
+ public Connection(Socket sock, TreeCacheMBean cache) throws IOException
+ {
+ this.sock = sock;
+
+ output = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
+ output.flush();
+
+ input = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
- public Connection(Socket sock, TreeCacheMBean cache) throws IOException {
- this.sock=sock;
- output=new ObjectOutputStream(sock.getOutputStream());
- input=new ObjectInputStream(sock.getInputStream());
- c=cache;
+ c = cache;
}
- public void start() {
- t=new Thread(this, "TcpCacheServer.Connection");
+ public void start()
+ {
+ t = new Thread(this, "TcpCacheServer.Connection");
t.setDaemon(true);
t.start();
}
- public void close() {
- t=null;
- try {if(output != null) output.close();} catch(Throwable th) {}
- try {if(input != null) input.close();} catch(Throwable th) {}
- try {if(sock != null) sock.close();} catch(Throwable th) {}
+ public void close()
+ {
+ t = null;
+ try {if (output != null) output.close();} catch (Throwable th) {}
+ try {if (input != null) input.close();} catch (Throwable th) {}
+ try {if (sock != null) sock.close();} catch (Throwable th) {}
+
+ // remove self from connections list
+ conns.remove(this);
}
- public void run() {
+ public void run()
+ {
int op;
Fqn fqn;
Object key, val, retval;
@@ -197,104 +292,113 @@
boolean flag;
byte[] state;
- while(t != null && Thread.currentThread().equals(t)) {
- try {
- op=input.readInt();
- }
- catch(IOException e) {
- mylog.warn("failed reading data, thread will terminate", e);
- try {if(output != null) output.close();} catch(Throwable th) {}
- try {if(input != null) input.close();} catch(Throwable th) {}
- try {if(sock != null) sock.close();} catch(Throwable th) {}
+ while (t != null && Thread.currentThread().equals(t))
+ {
+ try
+ {
+ op = input.readInt();
+ }
+ catch (IOException e)
+ {
+ mylog.warn("Client closed socket");
+ close();
break;
}
- try {
- switch(op) {
+ try
+ {
+ output.reset();
+ switch (op)
+ {
case DelegatingCacheLoader.delegateGetChildrenNames:
- fqn=(Fqn)input.readObject();
- Set children=c.getChildrenNames(fqn);
+ fqn = (Fqn) input.readObject();
+ Set children = c.getChildrenNames(fqn);
output.writeObject(children); // this may be null - that's okay
break;
case DelegatingCacheLoader.delegateGetKey:
- fqn=(Fqn)input.readObject();
- key=input.readObject();
- retval=c.get(fqn, key);
+ fqn = (Fqn) input.readObject();
+ key = input.readObject();
+ retval = c.get(fqn, key);
output.writeObject(retval);
break;
case DelegatingCacheLoader.delegateGet:
- fqn=(Fqn)input.readObject();
- n=c.get(fqn);
- if(n == null) { // node doesn't exist - return null
+ fqn = (Fqn) input.readObject();
+ n = c.get(fqn);
+ if (n == null)
+ { // node doesn't exist - return null
output.writeObject(n);
break;
}
- map=n.getData();
- if(map == null) map=new HashMap();
+ map = n.getData();
+ if (map == null) map = new HashMap();
output.writeObject(map);
break;
case DelegatingCacheLoader.delegateExists:
- fqn=(Fqn)input.readObject();
- flag=c.exists(fqn);
+ fqn = (Fqn) input.readObject();
+ flag = c.exists(fqn);
output.writeObject(Boolean.valueOf(flag));
break;
case DelegatingCacheLoader.delegatePutKeyVal:
- fqn=(Fqn)input.readObject();
- key=input.readObject();
- val=input.readObject();
- retval=c.put(fqn, key, val);
+ fqn = (Fqn) input.readObject();
+ key = input.readObject();
+ val = input.readObject();
+ retval = c.put(fqn, key, val);
output.writeObject(retval);
break;
case DelegatingCacheLoader.delegatePut:
- fqn=(Fqn)input.readObject();
- map=(Map)input.readObject();
+ fqn = (Fqn) input.readObject();
+ map = (Map) input.readObject();
c.put(fqn, map);
output.writeObject(Boolean.TRUE);
break;
case DelegatingCacheLoader.putList:
- int length=input.readInt();
- retval=Boolean.TRUE;
- if(length > 0) {
+ int length = input.readInt();
+ retval = Boolean.TRUE;
+ if (length > 0)
+ {
Modification mod;
- List mods=new ArrayList(length);
- for(int i=0; i < length; i++) {
- mod=new Modification();
+ List mods = new ArrayList(length);
+ for (int i = 0; i < length; i++)
+ {
+ mod = new Modification();
mod.readExternal(input);
mods.add(mod);
}
- try {
+ try
+ {
handleModifications(mods);
}
- catch(Exception ex) {
- retval=ex;
+ catch (Exception ex)
+ {
+ retval = ex;
}
}
output.writeObject(retval);
break;
case DelegatingCacheLoader.delegateRemoveKey:
- fqn=(Fqn)input.readObject();
- key=input.readObject();
- retval=c.remove(fqn, key);
+ fqn = (Fqn) input.readObject();
+ key = input.readObject();
+ retval = c.remove(fqn, key);
output.writeObject(retval);
break;
case DelegatingCacheLoader.delegateRemove:
- fqn=(Fqn)input.readObject();
+ fqn = (Fqn) input.readObject();
c.remove(fqn);
output.writeObject(Boolean.TRUE);
break;
case DelegatingCacheLoader.delegateRemoveData:
- fqn=(Fqn)input.readObject();
+ fqn = (Fqn) input.readObject();
c.removeData(fqn);
output.writeObject(Boolean.TRUE);
break;
case DelegatingCacheLoader.delegateLoadEntireState:
- state=c.getCacheLoader() != null? c.getCacheLoader().loadEntireState() : null;
+ state = c.getCacheLoader() != null ? c.getCacheLoader().loadEntireState() : null;
output.writeObject(state);
break;
case DelegatingCacheLoader.delegateStoreEntireState:
- state=(byte[])input.readObject();
- if(c.getCacheLoader() != null)
+ state = (byte[]) input.readObject();
+ if (c.getCacheLoader() != null)
c.getCacheLoader().storeEntireState(state);
output.writeObject(Boolean.TRUE);
break;
@@ -304,12 +408,15 @@
}
output.flush();
}
- catch(Exception e) {
- try {
+ catch (Exception e)
+ {
+ try
+ {
output.writeObject(e);
output.flush();
}
- catch(IOException e1) {
+ catch (IOException e1)
+ {
e1.printStackTrace();
}
}
@@ -317,18 +424,23 @@
}
- public String toString() {
- StringBuffer sb=new StringBuffer();
- if(sock != null) {
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+ if (sock != null)
+ {
sb.append(sock.getRemoteSocketAddress());
}
return sb.toString();
}
- protected void handleModifications(List modifications) throws CacheException {
- for(Iterator it=modifications.iterator(); it.hasNext();) {
- Modification m=(Modification)it.next();
- switch(m.getType()) {
+ protected void handleModifications(List modifications) throws CacheException
+ {
+ for (Iterator it = modifications.iterator(); it.hasNext();)
+ {
+ Modification m = (Modification) it.next();
+ switch (m.getType())
+ {
case Modification.PUT_DATA:
c.put(m.getFqn(), m.getData());
break;
@@ -358,41 +470,47 @@
}
+ public static void main(String[] args) throws Exception
+ {
-
- public static void main(String[] args) throws Exception {
- String bind_addr=null;
- int port=7500;
+ String bind_addr = null;
+ int port = 7500;
TcpCacheServer server;
- String config=null;
+ String config = null;
- for(int i=0; i < args.length; i++) {
- if(args[i].equals("-bind_addr")) {
- bind_addr=args[++i];
+ for (int i = 0; i < args.length; i++)
+ {
+ if (args[i].equals("-bind_addr"))
+ {
+ bind_addr = args[++i];
continue;
}
- if(args[i].equals("-port")) {
- port=Integer.parseInt(args[++i]);
+ if (args[i].equals("-port"))
+ {
+ port = Integer.parseInt(args[++i]);
continue;
}
- if(args[i].equals("-config")) {
- config=args[++i];
+ if (args[i].equals("-config"))
+ {
+ config = args[++i];
continue;
}
help();
return;
}
- server=new TcpCacheServer();
+ server = new TcpCacheServer();
+ server.daemon = false;
server.setBindAddress(bind_addr);
server.setPort(port);
server.setConfig(config);
server.createService();
server.startService();
- }
+ }
- private static void help() {
+ private static void help()
+ {
System.out.println("TcpCacheServer [-bind_addr <address>] [-port <port>] [-config <config file>] [-help]");
}
}
1.5.4.1 +194 -114 JBossCache/src/org/jboss/cache/loader/tcp/Attic/TcpDelegatingCacheLoader.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TcpDelegatingCacheLoader.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/loader/tcp/Attic/TcpDelegatingCacheLoader.java,v
retrieving revision 1.5
retrieving revision 1.5.4.1
diff -u -b -r1.5 -r1.5.4.1
--- TcpDelegatingCacheLoader.java 20 Jan 2006 12:50:46 -0000 1.5
+++ TcpDelegatingCacheLoader.java 25 Oct 2006 12:16:58 -0000 1.5.4.1
@@ -7,31 +7,38 @@
package org.jboss.cache.loader.tcp;
import org.jboss.cache.Fqn;
-import org.jboss.cache.TreeCache;
import org.jboss.cache.Modification;
+import org.jboss.cache.TreeCache;
import org.jboss.cache.loader.DelegatingCacheLoader;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
-import java.util.*;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
/**
* DelegatingCacheLoader implementation which delegates to a remote (not in the same VM)
* TreeCache using TCP/IP for communication. Example configuration for connecting to a TcpCacheServer
* running at myHost:12345:<pre>
* <attribute name="CacheLoaderClass">org.jboss.cache.loader.tcp.TcpDelegatingCacheLoader</attribute>
- <attribute name="CacheLoaderConfig">
- host=localhost
- port=2099
- </attribute>
- </pre>
+ * <attribute name="CacheLoaderConfig">
+ * host=localhost
+ * port=2099
+ * </attribute>
+ * </pre>
*
* @author Bela Ban
- * @version $Id: TcpDelegatingCacheLoader.java,v 1.5 2006/01/20 12:50:46 bela Exp $
+ * @version $Id: TcpDelegatingCacheLoader.java,v 1.5.4.1 2006/10/25 12:16:58 msurtani Exp $
*/
-public class TcpDelegatingCacheLoader extends DelegatingCacheLoader {
+public class TcpDelegatingCacheLoader extends DelegatingCacheLoader
+{
private Socket sock;
private String host;
private int port;
@@ -42,7 +49,8 @@
/**
* Default constructor.
*/
- public TcpDelegatingCacheLoader() {
+ public TcpDelegatingCacheLoader()
+ {
// Empty.
}
@@ -52,7 +60,8 @@
* @param host The host on which to look up the remote object.
* @param port The port on which to look up the remote object.
*/
- public TcpDelegatingCacheLoader(String host, int port) {
+ public TcpDelegatingCacheLoader(String host, int port)
+ {
this.host = host;
this.port = port;
}
@@ -62,31 +71,42 @@
*
* @see org.jboss.cache.loader.DelegatingCacheLoader#setConfig(java.util.Properties)
*/
- public void setConfig(Properties props) {
+ public void setConfig(Properties props)
+ {
this.host = props.getProperty("host");
- if(this.host == null || this.host.length() == 0) {
+ if (this.host == null || this.host.length() == 0)
+ {
this.host = "localhost";
}
this.port = Integer.parseInt(props.getProperty("port"));
}
- public void start() throws Exception {
+ public void start() throws Exception
+ {
init();
}
- public void stop() {
- try {if(in != null) in.close();} catch(IOException e) {}
- try {if(out != null) out.close();} catch(IOException e) {}
- try {if(sock != null) sock.close();} catch(IOException e) {}
+ public void stop()
+ {
+ synchronized (out)
+ {
+ try {if (in != null) in.close();} catch (IOException e) {}
+ try {if (out != null) out.close();} catch (IOException e) {}
+ try {if (sock != null) sock.close();} catch (IOException e) {}
+ }
}
- private void init() throws IOException {
- if(host == null)
- host="localhost";
- sock=new Socket(host, port);
- out=new ObjectOutputStream(sock.getOutputStream());
- in=new ObjectInputStream(sock.getInputStream());
+ private void init() throws IOException
+ {
+ if (host == null)
+ host = "localhost";
+ sock = new Socket(host, port);
+ out = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
+ out.flush();
+// out=new ObjectOutputStream(sock.getOutputStream());
+// in=new ObjectInputStream(sock.getInputStream());
+ in = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
}
/**
@@ -94,25 +114,32 @@
*
* @see org.jboss.cache.loader.DelegatingCacheLoader#setCache(org.jboss.cache.TreeCache)
*/
- public void setCache(TreeCache cache) {
+ public void setCache(TreeCache cache)
+ {
}
/**
* @see org.jboss.cache.loader.DelegatingCacheLoader#delegateGetChildrenNames(org.jboss.cache.Fqn)
*/
- protected Set delegateGetChildrenNames(Fqn fqn) throws Exception {
+ protected Set delegateGetChildrenNames(Fqn fqn) throws Exception
+ {
+ synchronized (out)
+ {
+ out.reset();
out.writeInt(DelegatingCacheLoader.delegateGetChildrenNames);
out.writeObject(fqn);
- Object retval=in.readObject();
- if(retval instanceof Exception)
- throw (Exception)retval;
- return (Set)retval;
+ out.flush();
+ Object retval = in.readObject();
+ if (retval instanceof Exception)
+ throw(Exception) retval;
+ return (Set) retval;
+ }
}
// See http://jira.jboss.com/jira/browse/JBCACHE-118 for why this is commented out.
/**
- * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateGet(org.jboss.cache.Fqn, Object)
+ * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateGet(org.jboss.cache.Fqn,Object)
*/
// protected Object delegateGet(Fqn name, Object key) throws Exception {
// out.writeInt(DelegatingCacheLoader.delegateGetKey);
@@ -124,111 +151,164 @@
/**
* @see org.jboss.cache.loader.DelegatingCacheLoader#delegateGet(org.jboss.cache.Fqn)
*/
- protected Map delegateGet(Fqn name) throws Exception {
+ protected Map delegateGet(Fqn name) throws Exception
+ {
+ synchronized (out)
+ {
+ out.reset();
out.writeInt(DelegatingCacheLoader.delegateGet);
out.writeObject(name);
- Object retval=in.readObject();
- if(retval instanceof Exception)
- throw (Exception)retval;
- return (Map)retval;
+ out.flush();
+ Object retval = in.readObject();
+ if (retval instanceof Exception)
+ throw(Exception) retval;
+ return (Map) retval;
+ }
}
/**
* @see org.jboss.cache.loader.DelegatingCacheLoader#delegateExists(org.jboss.cache.Fqn)
*/
- protected boolean delegateExists(Fqn name) throws Exception {
+ protected boolean delegateExists(Fqn name) throws Exception
+ {
+ synchronized (out)
+ {
+ out.reset();
out.writeInt(DelegatingCacheLoader.delegateExists);
out.writeObject(name);
- Object retval=in.readObject();
- if(retval instanceof Exception)
- throw (Exception)retval;
- return ((Boolean)retval).booleanValue();
+ out.flush();
+ Object retval = in.readObject();
+ if (retval instanceof Exception)
+ throw(Exception) retval;
+ return ((Boolean) retval).booleanValue();
+ }
}
/**
- * @see org.jboss.cache.loader.DelegatingCacheLoader#delegatePut(org.jboss.cache.Fqn, Object, Object)
+ * @see org.jboss.cache.loader.DelegatingCacheLoader#delegatePut(org.jboss.cache.Fqn,Object,Object)
*/
- protected Object delegatePut(Fqn name, Object key, Object value) throws Exception {
+ protected Object delegatePut(Fqn name, Object key, Object value) throws Exception
+ {
+ synchronized (out)
+ {
+
+ out.reset();
out.writeInt(DelegatingCacheLoader.delegatePutKeyVal);
out.writeObject(name);
out.writeObject(key);
out.writeObject(value);
- Object retval=in.readObject();
- if(retval instanceof Exception)
- throw (Exception)retval;
+ out.flush();
+ Object retval = in.readObject();
+ if (retval instanceof Exception)
+ throw(Exception) retval;
return retval;
}
+ }
/**
- * @see org.jboss.cache.loader.DelegatingCacheLoader#delegatePut(org.jboss.cache.Fqn, java.util.Map)
+ * @see org.jboss.cache.loader.DelegatingCacheLoader#delegatePut(org.jboss.cache.Fqn,java.util.Map)
*/
- protected void delegatePut(Fqn name, Map attributes) throws Exception {
+ protected void delegatePut(Fqn name, Map attributes) throws Exception
+ {
+ synchronized (out)
+ {
+
+ out.reset();
out.writeInt(DelegatingCacheLoader.delegatePut);
out.writeObject(name);
out.writeObject(attributes);
out.flush();
- Object retval=in.readObject();
- if(retval instanceof Exception)
- throw (Exception)retval;
+ Object retval = in.readObject();
+ if (retval instanceof Exception)
+ throw(Exception) retval;
+ }
}
- protected void delegatePut(List modifications) throws Exception {
+ protected void delegatePut(List modifications) throws Exception
+ {
+ synchronized (out)
+ {
+
+ out.reset();
out.writeInt(DelegatingCacheLoader.putList);
- int length=modifications != null? modifications.size() : 0;
+ int length = modifications != null ? modifications.size() : 0;
out.writeInt(length);
- if(length > 0) {
- for(Iterator it=modifications.iterator(); it.hasNext();) {
- Modification m=(Modification)it.next();
+ if (length > 0)
+ {
+ for (Iterator it = modifications.iterator(); it.hasNext();)
+ {
+ Modification m = (Modification) it.next();
m.writeExternal(out);
}
}
out.flush();
- Object retval=in.readObject();
- if(retval instanceof Exception)
- throw (Exception)retval;
+ Object retval = in.readObject();
+ if (retval instanceof Exception)
+ throw(Exception) retval;
+ }
}
/**
- * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateRemove(org.jboss.cache.Fqn, Object)
+ * @see org.jboss.cache.loader.DelegatingCacheLoader#delegateRemove(org.jboss.cache.Fqn,Object)
*/
- protected Object delegateRemove(Fqn name, Object key) throws Exception {
+ protected Object delegateRemove(Fqn name, Object key) throws Exception
+ {
+ synchronized (out)
+ {
+
+ out.reset();
out.writeInt(DelegatingCacheLoader.delegateRemoveKey);
out.writeObject(name);
out.writeObject(key);
- Object retval=in.readObject();
- if(retval instanceof Exception)
- throw (Exception)retval;
+ out.flush();
+ Object retval = in.readObject();
+ if (retval instanceof Exception)
+ throw(Exception) retval;
return retval;
}
+ }
/**
* @see org.jboss.cache.loader.DelegatingCacheLoader#delegateRemove(org.jboss.cache.Fqn)
*/
- protected void delegateRemove(Fqn name) throws Exception {
+ protected void delegateRemove(Fqn name) throws Exception
+ {
+ synchronized (out)
+ {
+
+ out.reset();
out.writeInt(DelegatingCacheLoader.delegateRemove);
out.writeObject(name);
out.flush();
- Object retval=in.readObject();
- if(retval instanceof Exception)
- throw (Exception)retval;
+ Object retval = in.readObject();
+ if (retval instanceof Exception)
+ throw(Exception) retval;
+ }
}
/**
* @see org.jboss.cache.loader.DelegatingCacheLoader#delegateRemoveData(org.jboss.cache.Fqn)
*/
- protected void delegateRemoveData(Fqn name) throws Exception {
+ protected void delegateRemoveData(Fqn name) throws Exception
+ {
+ synchronized (out)
+ {
+
+ out.reset();
out.writeInt(DelegatingCacheLoader.delegateRemoveData);
out.writeObject(name);
out.flush();
- Object retval=in.readObject();
- if(retval instanceof Exception)
- throw (Exception)retval;
+ Object retval = in.readObject();
+ if (retval instanceof Exception)
+ throw(Exception) retval;
+ }
}
/**
* @see org.jboss.cache.loader.DelegatingCacheLoader#delegateLoadEntireState()
*/
- public byte[] delegateLoadEntireState() throws Exception {
+ public byte[] delegateLoadEntireState() throws Exception
+ {
throw new UnsupportedOperationException("operation is not currently supported - need to define semantics first");
// out.writeInt(DelegatingCacheLoader.delegateLoadEntireState);
// out.flush();
@@ -241,7 +321,8 @@
/**
* @see org.jboss.cache.loader.DelegatingCacheLoader#delegateStoreEntireState(byte[])
*/
- public void delegateStoreEntireState(byte[] state) throws Exception {
+ public void delegateStoreEntireState(byte[] state) throws Exception
+ {
throw new UnsupportedOperationException("operation is not currently supported - need to define semantics first");
// out.writeInt(DelegatingCacheLoader.delegateStoreEntireState);
// out.writeObject(state);
@@ -252,5 +333,4 @@
}
-
}
More information about the jboss-cvs-commits
mailing list