[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...

Ron Sigal ron_sigal at yahoo.com
Wed Dec 13 20:03:55 EST 2006


  User: rsigal  
  Date: 06/12/13 20:03:55

  Added:       src/main/org/jboss/remoting/transport/bisocket      Tag:
                        remoting_2_x BisocketServerInvoker.java
                        BisocketClientInvoker.java
                        TransportClientFactory.java
                        TransportServerFactory.java Bisocket.java
  Log:
  JBREM-650: Introducing bisocket transport.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.1.2.1   +542 -0    JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/Attic/BisocketServerInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BisocketServerInvoker.java
  ===================================================================
  RCS file: BisocketServerInvoker.java
  diff -N BisocketServerInvoker.java
  --- /dev/null	1 Jan 1970 00:00:00 -0000
  +++ BisocketServerInvoker.java	14 Dec 2006 01:03:55 -0000	1.1.2.1
  @@ -0,0 +1,542 @@
  +/*
  +* JBoss, Home of Professional Open Source
  +* Copyright 2005, JBoss Inc., and individual contributors as indicated
  +* by the @authors tag. See the copyright.txt in the distribution for a
  +* full listing of individual contributors.
  +*
  +* This is free software; you can redistribute it and/or modify it
  +* under the terms of the GNU Lesser General Public License as
  +* published by the Free Software Foundation; either version 2.1 of
  +* the License, or (at your option) any later version.
  +*
  +* This software is distributed in the hope that it will be useful,
  +* but WITHOUT ANY WARRANTY; without even the implied warranty of
  +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  +* Lesser General Public License for more details.
  +*
  +* You should have received a copy of the GNU Lesser General Public
  +* License along with this software; if not, write to the Free
  +* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  +* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  +*/
  +
  +package org.jboss.remoting.transport.bisocket;
  +
  +import java.io.DataInputStream;
  +import java.io.DataOutputStream;
  +import java.io.IOException;
  +import java.net.InetAddress;
  +import java.net.ServerSocket;
  +import java.net.Socket;
  +import java.util.Collection;
  +import java.util.Collections;
  +import java.util.HashMap;
  +import java.util.HashSet;
  +import java.util.Iterator;
  +import java.util.Map;
  +import java.util.Set;
  +import java.util.TimerTask;
  +
  +import org.apache.commons.logging.Log;
  +import org.jboss.remoting.Client;
  +import org.jboss.remoting.InvocationRequest;
  +import org.jboss.remoting.InvokerLocator;
  +import org.jboss.remoting.ServerInvocationHandler;
  +import org.jboss.remoting.invocation.InternalInvocation;
  +import org.jboss.remoting.transport.PortUtil;
  +import org.jboss.remoting.transport.socket.ServerThread;
  +import org.jboss.remoting.transport.socket.SocketServerInvoker;
  +import org.jboss.remoting.util.TimerUtil;
  +
  +
  +/**
  + *  
  + * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
  + * @version $Revision: 1.1.2.1 $
  + * <p>
  + * Copyright Nov 23, 2006
  + * </p>
  + */
  +public class BisocketServerInvoker extends SocketServerInvoker
  +{  
  +   private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
  +
  +   private Map listenerIdToInvokerLocatorMap = new HashMap();
  +   private ServerSocket secondaryServerSocket;
  +   private InvokerLocator secondaryLocator;
  +   private SecondaryServerSocketThread secondaryServerSocketThread;
  +   private Map controlConnectionThreadMap = new HashMap();
  +   private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;
  +   private int pingWindow = 2 * pingFrequency;
  +   private ControlMonitorTimerTask controlMonitorTimerTask;
  +   
  +   
  +   public static BisocketServerInvoker getBisocketServerInvoker(String listenerId)
  +   {
  +      return (BisocketServerInvoker) listenerIdToServerInvokerMap.get(listenerId);
  +   }
  +   
  +   
  +//   public static void addBisocketServerInvoker(String listenerId, BisocketServerInvoker invoker)
  +//   {
  +//      listenerIdToServerInvokerMap.put(listenerId, invoker);
  +//   }
  +   
  +   
  +   public BisocketServerInvoker(InvokerLocator locator)
  +   {
  +      super(locator);
  +   }
  +
  +   
  +   public BisocketServerInvoker(InvokerLocator locator, Map configuration)
  +   {
  +      super(locator, configuration);
  +   }
  +   
  +   
  +   public void start() throws IOException
  +   {
  +      super.start();
  +      
  +      boolean startSecondaryPort = false;
  +      Object o = configuration.get(Bisocket.START_SECONDARY_PORT);
  +      
  +      if (o != null)
  +      {
  +         if (o instanceof String)
  +            startSecondaryPort = Boolean.valueOf((String) o).booleanValue();
  +         else if (o instanceof Boolean)
  +            startSecondaryPort = ((Boolean) o).booleanValue();
  +         else
  +            log.error("unrecognized value for configuration key \"" + 
  +                      Bisocket.START_SECONDARY_PORT + "\": " + o);
  +         
  +         if (startSecondaryPort)
  +         {
  +            InetAddress host = getServerSocket().getInetAddress();
  +            int freePort = PortUtil.findFreePort(host.getHostAddress());
  +            secondaryServerSocket = new ServerSocket(freePort, 0, host);
  +            secondaryLocator = new InvokerLocator(null, host.getHostAddress(), freePort, null, null); 
  +            secondaryServerSocketThread = new SecondaryServerSocketThread(secondaryServerSocket);
  +            secondaryServerSocketThread.setName("secondaryServerSocketThread");
  +            secondaryServerSocketThread.setDaemon(true);
  +            secondaryServerSocketThread.start();
  +            log.info("started secondary port: " + host + ":" + freePort);
  +         }
  +      }
  +//      ServerInvocationHandler handler = new InternalInvocationHandler(secondaryServerSocket);
  +//      addInvocationHandler(Bisocket.BISOCKET_INTERNAL_SUBSYSTEM, handler);
  +   }
  +   
  +   
  +   public boolean isTransportBiDirectional()
  +   {
  +      return true;
  +   }
  +   
  +   
  +   public void createControlConnection(String listenerId, InvokerLocator locator)
  +   throws IOException
  +   {
  +      // first connection
  +      if (locator == null)
  +         locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
  +      // restarted connection
  +      else
  +         listenerIdToInvokerLocatorMap.put(listenerId, locator);
  +      
  +      log.info("creating control connection: " + locator);
  +      Socket socket = new Socket(locator.getHost(), locator.getPort());
  +      DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
  +      dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
  +      dos.writeUTF(listenerId);
  +      Thread thread = new ControlConnectionThread(socket, listenerId);
  +      thread.setName("control: " + socket.toString());
  +      thread.setDaemon(true);
  +      
  +      synchronized (controlConnectionThreadMap)
  +      {
  +         controlConnectionThreadMap.put(listenerId, thread);
  +      }
  +      
  +      thread.start();
  +      log.info("created control connection: " + locator);
  +      
  +      if (controlMonitorTimerTask == null)
  +      {
  +         controlMonitorTimerTask = new ControlMonitorTimerTask();
  +         TimerUtil.schedule(controlMonitorTimerTask, pingFrequency);
  +         log.info("scheduled ControlMonitorTimerTask: " + controlMonitorTimerTask);
  +      }
  +   }
  +   
  +   
  +   public int getPingFrequency()
  +   {
  +      return pingFrequency;
  +   }
  +   
  +   
  +   public void setPingFrequency(int pingFrequency)
  +   {
  +      this.pingFrequency = pingFrequency;
  +      pingWindow = 2 * pingFrequency;
  +      log.info("set ping frequency: " + pingFrequency);
  +   }
  +   
  +   
  +   protected void cleanup()
  +   {
  +      super.cleanup();
  +      
  +      if (controlMonitorTimerTask != null)
  +      {
  +         controlMonitorTimerTask.shutdown();
  +      }
  +      Iterator it = controlConnectionThreadMap.values().iterator();
  +      while (it.hasNext())
  +      {
  +         ControlConnectionThread t = (ControlConnectionThread) it.next();
  +         t.shutdown();
  +         it.remove();
  +      }
  +      if (secondaryServerSocketThread != null)
  +      {
  +         secondaryServerSocketThread.shutdown();
  +      }
  +      if (secondaryServerSocket != null)
  +      {
  +         try
  +         {
  +            secondaryServerSocket.close();
  +            log.info("closed secondary port: " +
  +                     secondaryServerSocket.getInetAddress() + ":" +
  +                     secondaryServerSocket.getLocalPort());
  +         }
  +         catch (IOException e)
  +         {
  +            log.info("Error closing secondary server socket: " + e.getMessage());
  +         }
  +      }
  +   }
  +   
  +   
  +   protected InvokerLocator getSecondaryLocator()
  +   {
  +      return secondaryLocator;
  +   }
  +   
  +   protected ServerSocket getServerSocket()
  +   {
  +      return serverSocket;
  +   }
  +   
  +   
  +   protected Object handleInternalInvocation(InternalInvocation ii,
  +                                             InvocationRequest ir,
  +                                             ServerInvocationHandler handler)
  +   throws Throwable
  +   {
  +      if(Bisocket.GET_SECONDARY_INVOKER_LOCATOR.equals(ii.getMethodName()))
  +      {
  +         log.info("returning secondaryLocator: " + secondaryLocator); 
  +         return secondaryLocator;
  +      }
  +      
  +      Object response = super.handleInternalInvocation(ii, ir, handler);
  +      
  +      if(InternalInvocation.ADDCLIENTLISTENER.equals(ii.getMethodName()))
  +      {
  +         Map metadata = ir.getRequestPayload();
  +         if(metadata != null)
  +         {
  +            String listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
  +            if (listenerId != null)
  +            {
  +               listenerIdToServerInvokerMap.put(listenerId, this);
  +               log.info("registered " + listenerId + " -> " + this);
  +            }
  +         }
  +      }
  +      
  +      return response;
  +   }
  +   
  +   
  +   class ControlConnectionThread extends Thread
  +   {
  +      private Socket controlSocket;
  +      private String listenerId;
  +      private DataInputStream dis;
  +      private boolean running;
  +      private int errorCount;
  +      private long lastPing = System.currentTimeMillis();
  +      
  +      ControlConnectionThread(Socket socket, String listenerId) throws IOException
  +      {
  +         controlSocket = socket;
  +         this.listenerId = listenerId;
  +         dis = new DataInputStream(socket.getInputStream());
  +      }
  +      
  +      void shutdown()
  +      {
  +         running = false;
  +         synchronized (controlConnectionThreadMap)
  +         {
  +            controlConnectionThreadMap.remove(this);
  +         }
  +         try
  +         {
  +            controlSocket.close();
  +         }
  +         catch (IOException e)
  +         {
  +            log.warn("unable to close controlSocket");
  +         }
  +         interrupt();
  +      }
  +      
  +      boolean checkConnection()
  +      {
  +         long currentTime = System.currentTimeMillis();
  +         log.info("elapsed: " + (currentTime - lastPing));
  +         log.info("returning: " + ((currentTime - lastPing > pingWindow) ? false : true)); 
  +         return (currentTime - lastPing <= pingWindow);
  +      }
  +      
  +      String getListenerId()
  +      {
  +         return listenerId;
  +      }
  +      
  +      public void run()
  +      {
  +         log.info("starting ControlConnectionThread");
  +         running = true;
  +         while (running)
  +         {
  +            Socket socket = null;
  +            
  +            try
  +            {
  +               int action = dis.read();
  +               lastPing = System.currentTimeMillis();
  +               
  +               switch (action)
  +               {
  +                  case Bisocket.CREATE_ORDINARY_SOCKET:
  +//                     String listenerId = dis.readUTF();
  +//                     dis.readUTF();
  +                     InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
  +                     socket = new Socket(locator.getHost(), locator.getPort());
  +                     DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
  +                     dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
  +                     dos.writeUTF(listenerId);
  +                     break;
  +                     
  +                  case Bisocket.PING:
  +                     log.info("got ping");
  +//                     log.info("lastPing: " + lastPing);
  +                     continue;
  +                     
  +                  case -1:
  +                     shutdown();
  +                     return;
  +                     
  +                  default:
  +                     log.error("unrecognized action: " + action);
  +                     continue;
  +               }
  +            }
  +            catch (IOException e)
  +            {
  +               if (running)
  +               {
  +                  if ("socket closed".equals(e.getMessage()))
  +                  {
  +                     shutdown();
  +                     return;
  +                  }
  +                  log.error("Unable to create new Socket: " + e.getMessage());
  +                  e.printStackTrace();
  +                  if (++errorCount > 5)
  +                  {
  +                     shutdown();
  +                     return;
  +                  }
  +                  continue;
  +               }
  +               
  +               return;
  +            }
  +            
  +            synchronized (clientpool)
  +            {
  +               if(clientpool.size() < maxPoolSize)
  +               {
  +                  Thread thread = null;
  +                  try
  +                  {
  +                     thread = new ServerThread(socket, BisocketServerInvoker.this,
  +                                               clientpool, threadpool,
  +                                               getTimeout(), serverSocketClass);
  +                     thread.start();
  +                     log.info("created: " + thread);
  +                  }
  +                  catch (Exception e)
  +                  {
  +                     log.error("Unable to create new ServerThread: " + e.getMessage());
  +                     e.printStackTrace();
  +                  }
  +                  
  +                  synchronized (threadpool)
  +                  {
  +                     threadpool.add(thread);
  +                  }
  +               }
  +            }
  +         }
  +      }
  +   }
  +   
  +   
  +//   protected ServerThread createServerThread(Socket socket) throws Exception
  +//   {
  +//      return new BisocketServerThread(socket, this, clientpool, threadpool, getTimeout(), serverSocketClass);
  +//   }
  +   
  +   
  +   class SecondaryServerSocketThread extends Thread
  +   {
  +      private ServerSocket secondaryServerSocket;
  +      boolean running;
  +      
  +      SecondaryServerSocketThread(ServerSocket secondaryServerSocket) throws IOException
  +      {
  +         this.secondaryServerSocket = secondaryServerSocket;
  +      }
  +      
  +      void shutdown()
  +      {
  +         running = false;
  +         interrupt();
  +      }
  +      
  +      public void run()
  +      {
  +         running = true;
  +         while (running)
  +         {
  +            try
  +            {
  +               Socket socket = secondaryServerSocket.accept();
  +               DataInputStream dis = new DataInputStream(socket.getInputStream());
  +               int action = dis.read();
  +               String listenerId = dis.readUTF();
  +               
  +               switch (action)
  +               {
  +                  case Bisocket.CREATE_CONTROL_SOCKET:
  +                     BisocketClientInvoker invoker;
  +                     invoker =  BisocketClientInvoker.getBisocketClientInvoker(listenerId);
  +                     invoker.replaceControlSocket(socket);
  +                     log.info("SecondaryServerSocketThread: created secondary socket: " + listenerId);
  +                     break;
  +                     
  +                  case Bisocket.CREATE_ORDINARY_SOCKET: 
  +                     BisocketClientInvoker.transferSocket(listenerId, socket);
  +                     log.info("SecondaryServerSocketThread: transferred socket: " + listenerId);
  +                     break;
  +                     
  +                  default:
  +                     log.error("unrecognized action: " + action);
  +               }
  +            }
  +            catch (IOException e)
  +            {
  +               if (running)
  +                  log.error("Failed to accept socket connection", e);
  +               else
  +                  return;
  +                  
  +            }
  +         }
  +      }
  +      
  +      ServerSocket getServerSocket()
  +      {
  +         return secondaryServerSocket;
  +      }
  +   }
  +   
  +   
  +   class ControlMonitorTimerTask extends TimerTask
  +   {
  +      void shutdown()
  +      {
  +         running = false;
  +         cancel();
  +         log.info("shutting down " + this);
  +      }
  +      
  +      public void run()
  +      {
  +         Collection controlConnectionThreads = null;
  +         synchronized (controlConnectionThreadMap)
  +         {
  +            controlConnectionThreads = new HashSet(controlConnectionThreadMap.values());
  +         }
  +         
  +         Iterator it = controlConnectionThreads.iterator();
  +         while (it.hasNext() & running)
  +         {
  +            ControlConnectionThread t = (ControlConnectionThread) it.next();
  +            if (!t.checkConnection())
  +            {
  +               log.info("detected failure on control connection: requesting new control connection");
  +               t.shutdown();
  +               it.remove();
  +               
  +               if (!running)
  +                  return;
  +               
  +               try
  +               {
  +                  createControlConnection(t.getListenerId(), null);
  +               }
  +               catch (IOException e)
  +               {
  +                  InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(t.getListenerId());
  +                  log.error(this + ": " + "Unable to recreate control connection: " + locator, e);
  +                  e.printStackTrace();
  +               }
  +            }
  +         }
  +      }
  +   }
  +   
  +   
  +//   class InternalInvocationHandler implements ServerInvocationHandler
  +//   {
  +//      InvokerLocator locator;
  +//      
  +//      InternalInvocationHandler(ServerSocket ss)
  +//      {
  +//         String host = ss.getInetAddress().getHostAddress();
  +//         int port = ss.getLocalPort();
  +//         locator = new InvokerLocator(null, host, port, null, null);   
  +//      }
  +//      
  +//      public Object invoke(InvocationRequest invocation) throws Throwable
  +//      {
  +//         return locator;
  +//      }
  +//
  +//      public void setMBeanServer(MBeanServer server) {}
  +//      public void setInvoker(ServerInvoker invoker) {}
  +//      public void addListener(InvokerCallbackHandler callbackHandler) {}
  +//      public void removeListener(InvokerCallbackHandler callbackHandler) {}
  +//   }
  +}
  \ No newline at end of file
  
  
  
  1.1.2.1   +404 -0    JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/Attic/BisocketClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BisocketClientInvoker.java
  ===================================================================
  RCS file: BisocketClientInvoker.java
  diff -N BisocketClientInvoker.java
  --- /dev/null	1 Jan 1970 00:00:00 -0000
  +++ BisocketClientInvoker.java	14 Dec 2006 01:03:55 -0000	1.1.2.1
  @@ -0,0 +1,404 @@
  +/*
  +* JBoss, Home of Professional Open Source
  +* Copyright 2005, JBoss Inc., and individual contributors as indicated
  +* by the @authors tag. See the copyright.txt in the distribution for a
  +* full listing of individual contributors.
  +*
  +* This is free software; you can redistribute it and/or modify it
  +* under the terms of the GNU Lesser General Public License as
  +* published by the Free Software Foundation; either version 2.1 of
  +* the License, or (at your option) any later version.
  +*
  +* This software is distributed in the hope that it will be useful,
  +* but WITHOUT ANY WARRANTY; without even the implied warranty of
  +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  +* Lesser General Public License for more details.
  +*
  +* You should have received a copy of the GNU Lesser General Public
  +* License along with this software; if not, write to the Free
  +* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  +* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  +*/
  +
  +package org.jboss.remoting.transport.bisocket;
  +
  +import java.io.DataOutputStream;
  +import java.io.IOException;
  +import java.io.OutputStream;
  +import java.net.Socket;
  +import java.util.Collections;
  +import java.util.HashMap;
  +import java.util.HashSet;
  +import java.util.Iterator;
  +import java.util.Map;
  +import java.util.Set;
  +import java.util.TimerTask;
  +
  +import org.jboss.logging.Logger;
  +import org.jboss.remoting.Client;
  +import org.jboss.remoting.ConnectionFailedException;
  +import org.jboss.remoting.InvocationRequest;
  +import org.jboss.remoting.InvokerLocator;
  +import org.jboss.remoting.invocation.InternalInvocation;
  +import org.jboss.remoting.marshal.Marshaller;
  +import org.jboss.remoting.marshal.UnMarshaller;
  +import org.jboss.remoting.transport.BidirectionalClientInvoker;
  +import org.jboss.remoting.transport.socket.SocketClientInvoker;
  +import org.jboss.remoting.util.TimerUtil;
  +
  +/**
  + * BisocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker,
  + * which must be a BisocketServerInvoker.
  + * 
  + * @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
  + */
  +public class BisocketClientInvoker
  +extends SocketClientInvoker
  +implements BidirectionalClientInvoker
  +{  
  +   private static final Logger log = Logger.getLogger(BisocketClientInvoker.class);
  +   private static Map listenerIdToClientInvokerMap = Collections.synchronizedMap(new HashMap());
  +   private static Map listenerIdToSocketsMap = new HashMap();
  + 
  +   private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;;
  +   private String listenerId;
  +   private InvokerLocator secondaryLocator;
  +   private Socket controlSocket;
  +   private OutputStream controlOutputStream;
  +   private PingTimerTask pingTimerTask;
  +
  +   
  +   static BisocketClientInvoker getBisocketClientInvoker(String listenerId)
  +   {
  +      return (BisocketClientInvoker) listenerIdToClientInvokerMap.get(listenerId);
  +   }
  +   
  +   
  +   static void transferSocket(String listenerId, Socket socket)
  +   {
  +      Set sockets = null;
  +      
  +      synchronized (listenerIdToSocketsMap)
  +      {
  +         sockets = (Set) listenerIdToSocketsMap.get(listenerId);
  +         if (sockets == null)
  +         {
  +            sockets = new HashSet();
  +            listenerIdToSocketsMap.put(listenerId, sockets);
  +         }
  +      }
  +      
  +      synchronized (sockets)
  +      {
  +         sockets.add(socket);
  +         sockets.notify();
  +      }
  +   }
  +   
  +   
  +   public BisocketClientInvoker(InvokerLocator locator) throws IOException
  +   {
  +      this(locator, null);
  +   }
  +
  +   
  +   public BisocketClientInvoker(InvokerLocator locator, Map config) throws IOException
  +   {
  +      super(locator, config);
  +      
  +      if (config != null)
  +      {
  +         listenerId = (String) config.get(Client.LISTENER_ID_KEY);
  +         if (listenerId != null)
  +         {
  +            listenerIdToClientInvokerMap.put(listenerId, this);
  +            
  +            synchronized (listenerIdToSocketsMap)
  +            {
  +               if (listenerIdToSocketsMap.get(listenerId) == null)
  +                  listenerIdToSocketsMap.put(listenerId, new HashSet());
  +            }
  +            
  +            log.info("registered " + listenerId + " -> " + this);
  +         }
  +
  +            // look for socketTimeout param
  +            Object val = config.get(Bisocket.PING_FREQUENCY);
  +            if (val != null)
  +            {
  +               try
  +               {
  +                  int nVal = Integer.valueOf((String) val).intValue();
  +                  pingFrequency = nVal;
  +                  log.debug("Setting ping frequency to: " + pingFrequency);
  +               }
  +               catch (Exception e)
  +               {
  +                  log.warn("Could not convert " + Bisocket.PING_FREQUENCY +
  +                           " value of " + val + " to an int value.");
  +               }
  +            }
  +      }
  +   }
  +   
  +   
  +   public int getPingFrequency()
  +   {
  +      return pingFrequency;
  +   }
  +   
  +   
  +   public void setPingFrequency(int pingFrequency)
  +   {
  +      this.pingFrequency = pingFrequency;
  +      log.info("set ping frequency: " + pingFrequency);
  +   }
  +   
  +   
  +   protected void handleConnect() throws ConnectionFailedException
  +   {
  +      super.handleConnect();
  +      
  +      // Callback client on server side.
  +      if (listenerId != null)
  +      {
  +//         pingTimerTask = new PingTimerTask();
  +//         TimerUtil.schedule(pingTimerTask, pingFrequency);
  +//         log.info("scheduled PingTimerTask");
  +         return;
  +      }
  +      
  +      // Client on client side.
  +      try
  +      {
  +         InternalInvocation ii = new InternalInvocation(Bisocket.GET_SECONDARY_INVOKER_LOCATOR, null);
  +         InvocationRequest r = new InvocationRequest(null, null, ii, null, null, null);
  +//         secondaryLocator = (InvokerLocator) invoke(r);
  +         Object o = invoke(r);
  +         log.info("secondary locator: " + o);
  +         secondaryLocator = (InvokerLocator) o;
  +         log.info("got secondary locator: " + secondaryLocator);
  +      }
  +      catch (Throwable e)
  +      {
  +         log.error("Unable to retrieve address/port of secondary server socket", e);
  +         throw new ConnectionFailedException(e.getMessage());
  +      }
  +   }
  +
  +   
  +   protected void handleDisconnect()
  +   {
  +      super.handleDisconnect();
  +      if (listenerId != null)
  +      {
  +         listenerIdToClientInvokerMap.remove(listenerId);
  +         listenerIdToSocketsMap.remove(listenerId);
  +         if (pingTimerTask != null)
  +            pingTimerTask.shutDown();
  +      }
  +   }
  +   
  +   
  +   protected Object transport(String sessionId, Object invocation, Map metadata,
  +                              Marshaller marshaller, UnMarshaller unmarshaller)
  +   throws IOException, ConnectionFailedException, ClassNotFoundException
  +   {  
  +      if (invocation instanceof InvocationRequest)
  +      {
  +         InvocationRequest ir = (InvocationRequest) invocation;
  +         Object o = ir.getParameter();
  +         if (o instanceof InternalInvocation)
  +         {
  +            InternalInvocation ii = (InternalInvocation) o;
  +            if (InternalInvocation.ADDLISTENER.equals(ii.getMethodName()))
  +            {
  +               Map requestPayload = ir.getRequestPayload();
  +               String listenerId = (String) requestPayload.get(Client.LISTENER_ID_KEY);
  +               BisocketServerInvoker callbackServerInvoker;
  +               callbackServerInvoker = BisocketServerInvoker.getBisocketServerInvoker(listenerId);
  +               callbackServerInvoker.createControlConnection(listenerId, secondaryLocator);
  +            }
  +         }
  +      }
  +      
  +      return super.transport(sessionId, invocation, metadata, marshaller, unmarshaller);
  +   }
  +   
  +   
  +   protected Socket createSocket(String address, int port) throws IOException
  +   {
  +      if (listenerId == null)
  +         return super.createSocket(address, port);
  +      
  +      Socket socket = null;
  +      Set sockets = null;
  +      
  +      synchronized (listenerIdToSocketsMap)
  +      {
  +         sockets = (Set) listenerIdToSocketsMap.get(listenerId);
  +      }
  +      
  +      synchronized (sockets)
  +      {
  +         Iterator it = sockets.iterator();
  +         if (it.hasNext())
  +         {
  +            socket = (Socket) it.next();
  +            it.remove();
  +            
  +            if (controlSocket != null)
  +               return socket;
  +            else
  +            {
  +               controlSocket = socket;
  +               controlOutputStream = controlSocket.getOutputStream();
  +               log.info("got control socket");
  +               pingTimerTask = new PingTimerTask();
  +               TimerUtil.schedule(pingTimerTask, pingFrequency);
  +               log.info("scheduled PingTimerTask");
  +               
  +               if (it.hasNext())
  +               {
  +                  socket = (Socket) it.next();
  +                  it.remove();
  +                  return socket;
  +               }
  +            }
  +         }
  +         
  +         if (controlSocket == null)
  +         {
  +            while (true)
  +            {
  +               try
  +               {
  +                  sockets.wait(getTimeout());
  +                  if (sockets.isEmpty())
  +                     throw new IOException("Timed out trying to create control socket");
  +                  break;
  +               }
  +               catch (InterruptedException ignored) {}
  +            }
  +            
  +            it = sockets.iterator();
  +            controlSocket = (Socket) it.next();
  +            controlOutputStream = controlSocket.getOutputStream();
  +            it.remove(); 
  +            log.info("got control socket");
  +            pingTimerTask = new PingTimerTask();
  +            TimerUtil.schedule(pingTimerTask, pingFrequency);
  +            log.info("scheduled PingTimerTask");
  +         }
  +         
  +         log.info("requesting socket");
  +         
  +         synchronized (controlOutputStream)
  +         {
  +            controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
  +         }
  +//         controlOutputStream.writeUTF(listenerId);
  +  
  +         while (true)
  +         {
  +            try
  +            {
  +               sockets.wait(getTimeout());
  +               if (sockets.isEmpty())
  +                  throw new IOException("Timed out trying to create socket");
  +               break;
  +            }
  +            catch (InterruptedException ignored) {}
  +         }
  +         
  +         log.info("got socket");
  +         it = sockets.iterator();
  +         socket = (Socket) it.next();
  +         it.remove();
  +         return socket;
  +      }
  +   }
  +   
  +   
  +   void replaceControlSocket(Socket socket) throws IOException
  +   {
  +      synchronized (controlSocket)
  +      {
  +         controlSocket = socket;
  +      }
  +      controlOutputStream = controlSocket.getOutputStream();
  +      
  +      if (pingTimerTask != null)
  +         pingTimerTask.cancel();
  +      
  +      pingTimerTask = new PingTimerTask();
  +      TimerUtil.schedule(pingTimerTask, pingFrequency);
  +      log.info("replaced PingTimerTask");
  +   }
  +   
  +   
  +   protected Object checkType(Object o, Class c) throws IOException
  +   {
  +      if (c.isInstance(o))
  +         return o;
  +      
  +      throw new IOException(o + "is not an instance of " + c);
  +   }
  +   
  +   protected Object checkNull(Object o, String field) throws IOException
  +   {
  +      if (o == null)
  +         throw new IOException(field + " must not be null");
  +      
  +      return o;
  +   }
  +
  +
  +   public InvokerLocator getCallbackLocator(Map metadata)
  +   {
  +      String transport = (String) metadata.get(Client.CALLBACK_SERVER_PROTOCOL);
  +      String host = (String) metadata.get(Client.CALLBACK_SERVER_HOST);
  +      String sPort = (String) metadata.get(Client.CALLBACK_SERVER_PORT);
  +      int port = -1;
  +      if (sPort != null)
  +      {
  +         try
  +         {
  +            port = Integer.parseInt(sPort);
  +         }
  +         catch (NumberFormatException e)
  +         {
  +            throw new RuntimeException("Can not set internal callback server port as configuration value (" + sPort + " is not a number.");
  +         }
  +      }
  +
  +      return new InvokerLocator(transport, host, port, "callback", metadata);
  +   }
  +   
  +   
  +   class PingTimerTask extends TimerTask
  +   {
  +      public void shutDown()
  +      {
  +         cancel();
  +      }
  +      
  +      public void run()
  +      {
  +         synchronized (controlOutputStream)
  +         {
  +            try
  +            {
  +               log.info("sending ping");
  +               controlOutputStream.write(Bisocket.PING);
  +            }
  +            catch (IOException e)
  +            {
  +               log.warn("Unable to send ping: shutting down PingTimerTask");
  +               pingTimerTask = null;
  +               cancel();
  +            }
  +         }
  +      }
  +   }
  +}
  \ No newline at end of file
  
  
  
  1.1.2.1   +51 -0     JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/Attic/TransportClientFactory.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: TransportClientFactory.java
  ===================================================================
  RCS file: TransportClientFactory.java
  diff -N TransportClientFactory.java
  --- /dev/null	1 Jan 1970 00:00:00 -0000
  +++ TransportClientFactory.java	14 Dec 2006 01:03:55 -0000	1.1.2.1
  @@ -0,0 +1,51 @@
  +/*
  +* JBoss, Home of Professional Open Source
  +* Copyright 2005, JBoss Inc., and individual contributors as indicated
  +* by the @authors tag. See the copyright.txt in the distribution for a
  +* full listing of individual contributors.
  +*
  +* This is free software; you can redistribute it and/or modify it
  +* under the terms of the GNU Lesser General Public License as
  +* published by the Free Software Foundation; either version 2.1 of
  +* the License, or (at your option) any later version.
  +*
  +* This software is distributed in the hope that it will be useful,
  +* but WITHOUT ANY WARRANTY; without even the implied warranty of
  +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  +* Lesser General Public License for more details.
  +*
  +* You should have received a copy of the GNU Lesser General Public
  +* License along with this software; if not, write to the Free
  +* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  +* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  +*/
  +package org.jboss.remoting.transport.bisocket;
  +
  +import org.jboss.remoting.InvokerLocator;
  +import org.jboss.remoting.transport.ClientFactory;
  +import org.jboss.remoting.transport.ClientInvoker;
  +
  +import java.io.IOException;
  +import java.util.Map;
  +
  +/**
  + *  
  + * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
  + * @version $Revision: 1.1.2.1 $
  + * <p>
  + * Copyright Nov 25, 2006
  + * </p>
  + */
  +public class TransportClientFactory implements ClientFactory
  +{
  +   public ClientInvoker createClientInvoker(InvokerLocator locator, Map config)
  +         throws IOException
  +   {
  +      return new BisocketClientInvoker(locator, config);
  +   }
  +
  +   public boolean supportsSSL()
  +   {
  +      return false;
  +   }
  +}
  \ No newline at end of file
  
  
  
  1.1.2.1   +28 -0     JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/Attic/TransportServerFactory.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: TransportServerFactory.java
  ===================================================================
  RCS file: TransportServerFactory.java
  diff -N TransportServerFactory.java
  --- /dev/null	1 Jan 1970 00:00:00 -0000
  +++ TransportServerFactory.java	14 Dec 2006 01:03:55 -0000	1.1.2.1
  @@ -0,0 +1,28 @@
  +package org.jboss.remoting.transport.bisocket;
  +
  +import org.jboss.remoting.InvokerLocator;
  +import org.jboss.remoting.ServerInvoker;
  +import org.jboss.remoting.transport.ServerFactory;
  +
  +import java.util.Map;
  +
  +/**
  + *  
  + * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
  + * @version $Revision: 1.1.2.1 $
  + * <p>
  + * Copyright Nov 25, 2006
  + * </p>
  + */
  +public class TransportServerFactory implements ServerFactory
  +{
  +   public ServerInvoker createServerInvoker(InvokerLocator locator, Map config)
  +   {
  +      return new BisocketServerInvoker(locator, config);
  +   }
  +
  +   public boolean supportsSSL()
  +   {
  +      return false;
  +   }
  +}
  
  
  
  1.1.2.1   +45 -0     JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/Attic/Bisocket.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: Bisocket.java
  ===================================================================
  RCS file: Bisocket.java
  diff -N Bisocket.java
  --- /dev/null	1 Jan 1970 00:00:00 -0000
  +++ Bisocket.java	14 Dec 2006 01:03:55 -0000	1.1.2.1
  @@ -0,0 +1,45 @@
  +/*
  +* JBoss, Home of Professional Open Source
  +* Copyright 2005, JBoss Inc., and individual contributors as indicated
  +* by the @authors tag. See the copyright.txt in the distribution for a
  +* full listing of individual contributors.
  +*
  +* This is free software; you can redistribute it and/or modify it
  +* under the terms of the GNU Lesser General Public License as
  +* published by the Free Software Foundation; either version 2.1 of
  +* the License, or (at your option) any later version.
  +*
  +* This software is distributed in the hope that it will be useful,
  +* but WITHOUT ANY WARRANTY; without even the implied warranty of
  +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  +* Lesser General Public License for more details.
  +*
  +* You should have received a copy of the GNU Lesser General Public
  +* License along with this software; if not, write to the Free
  +* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  +* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  +*/
  +package org.jboss.remoting.transport.bisocket;
  +
  +/** 
  + * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
  + * @version $Revision: 1.1.2.1 $
  + * <p>
  + * Copyright Nov 22, 2006
  + * </p>
  + */
  +public class Bisocket
  +{
  +   public static final String BISOCKET_INTERNAL_SUBSYSTEM = "bisocketInternalSystem";
  +   public static final String CREATE_CALLBACK_SOCKET = "createCallbackSocket";
  +   public static final String GET_SECONDARY_INVOKER_LOCATOR = "getSecondaryInvokerLocator";
  +
  +   public static final String START_SECONDARY_PORT = "startSecondaryPort";
  +   
  +   public static final byte PING                   = 1;
  +   public static final byte CREATE_CONTROL_SOCKET  = 2;
  +   public static final byte CREATE_ORDINARY_SOCKET = 3;
  +   
  +   public static final String PING_FREQUENCY = "pingFrequency";
  +   public static final int PING_FREQUENCY_DEFAULT = 5000;
  +}
  
  
  



More information about the jboss-cvs-commits mailing list