Can you share ChannelFactory / worker threads in a p2p app?

Thingfish jannick at ovja.dk
Sun Nov 22 20:38:28 EST 2009


There is probably not much of a performance difference looking at the
individual app. I want to be able to run a large number of instances on a
single machine (for testing), and number of threads might become an issue in
this case.

I've put a simple (as of yet untested) suggestion together, as for how
worker sharing might be archived. I noticed that NioServerSocketPipelineSink
and NioClientSocketPipelineSink implements the same round-robin load
balancing over a worker array, and that the sinks make no use of
workerExecutor and workerCount once the worker array has been constructed.
I've factored this out into a NioWorkerPool class, and changed the sinks to
receive this instead. The channel factories have had a new constructor
added, that takes a pre-constructed workerPool, thereby making it possible
to share the pool between clients and servers.

Have included the code below (as a patch against trunk). A few issues: Need
to assign a useful bossID to a shared worker, and also change NioWorker so
that an appropriate threadName is selected. Do you foresee any issues with
this solution?

Regards
Jannick Bitsch


### Eclipse Workspace Patch 1.0
#P Netty
Index:
src/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java
===================================================================
---
src/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java
(revision
1904)
+++
src/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java
(working
copy)
@@ -118,23 +118,30 @@
     public NioClientSocketChannelFactory(
             Executor bossExecutor, Executor workerExecutor,
             int workerCount) {
+     this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
+    }
+
+
+    /**
+     * Creates a new instance.
+     *
+     * @param bossExecutor
+     *        the {@link Executor} which will execute the boss thread
+     * @param workerPool
+     *        the {@link NioWorkerPool} which will supply nio workers
+     */
+    public NioClientSocketChannelFactory(
+            Executor bossExecutor, NioWorkerPool workerPool) {
         if (bossExecutor == null) {
             throw new NullPointerException("bossExecutor");
         }
-        if (workerExecutor == null) {
-            throw new NullPointerException("workerExecutor");
-        }
-        if (workerCount <= 0) {
-            throw new IllegalArgumentException(
-                    "workerCount (" + workerCount + ") " +
-                    "must be a positive integer.");
-        }

         this.bossExecutor = bossExecutor;
-        this.workerExecutor = workerExecutor;
-        sink = new NioClientSocketPipelineSink(bossExecutor,
workerExecutor, workerCount);
-    }
+        this.workerExecutor = workerPool.workerExecutor;
+        sink = new NioClientSocketPipelineSink(bossExecutor, workerPool);
+    }

+
     public SocketChannel newChannel(ChannelPipeline pipeline) {
         return new NioClientSocketChannel(this, pipeline, sink,
sink.nextWorker());
     }
Index:
src/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java
===================================================================
--- src/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java
(revision
1904)
+++ src/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java
(working
copy)
@@ -63,16 +63,12 @@
     final int id = nextId.incrementAndGet();
     final Executor bossExecutor;
     private final Boss boss = new Boss();
-    private final NioWorker[] workers;
-    private final AtomicInteger workerIndex = new AtomicInteger();
+    private final NioWorkerPool workerPool;

     NioClientSocketPipelineSink(
-            Executor bossExecutor, Executor workerExecutor, int
workerCount) {
+            Executor bossExecutor, NioWorkerPool workerPool) {
         this.bossExecutor = bossExecutor;
-        workers = new NioWorker[workerCount];
-        for (int i = 0; i < workers.length; i ++) {
-            workers[i] = new NioWorker(id, i + 1, workerExecutor);
-        }
+        this.workerPool = workerPool;
     }

     public void eventSunk(
@@ -160,8 +156,7 @@
     }

     NioWorker nextWorker() {
-        return workers[Math.abs(
-                workerIndex.getAndIncrement() % workers.length)];
+        return workerPool.nextWorker();
     }

     private final class Boss implements Runnable {
Index:
src/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java
===================================================================
--- src/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java
(revision
1904)
+++ src/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java
(working
copy)
@@ -57,14 +57,10 @@
     private static final AtomicInteger nextId = new AtomicInteger();

     private final int id = nextId.incrementAndGet();
-    private final NioWorker[] workers;
-    private final AtomicInteger workerIndex = new AtomicInteger();
+    private final NioWorkerPool workerPool;

-    NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
-        workers = new NioWorker[workerCount];
-        for (int i = 0; i < workers.length; i ++) {
-            workers[i] = new NioWorker(id, i + 1, workerExecutor);
-        }
+    NioServerSocketPipelineSink(NioWorkerPool workerPool) {
+     this.workerPool = workerPool;
     }

     public void eventSunk(
@@ -191,8 +187,7 @@
     }

     NioWorker nextWorker() {
-        return workers[Math.abs(
-                workerIndex.getAndIncrement() % workers.length)];
+        return workerPool.nextWorker();
     }

     private final class Boss implements Runnable {
Index: src/org/jboss/netty/channel/socket/nio/NioWorkerPool.java
===================================================================
--- src/org/jboss/netty/channel/socket/nio/NioWorkerPool.java (revision 0)
+++ src/org/jboss/netty/channel/socket/nio/NioWorkerPool.java (revision 0)
@@ -0,0 +1,34 @@
+package org.jboss.netty.channel.socket.nio;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NioWorkerPool {
+
+ private final NioWorker[] workers;
+ private final AtomicInteger workerIndex = new AtomicInteger();
+ final Executor workerExecutor;
+
+ public NioWorkerPool(Executor workerExecutor, int workerCount){
+        if (workerExecutor == null) {
+            throw new NullPointerException("workerExecutor");
+        }
+        if (workerCount <= 0) {
+            throw new IllegalArgumentException(
+                    "workerCount (" + workerCount + ") " +
+                    "must be a positive integer.");
+        }
+
+ this.workerExecutor = workerExecutor;
+
+ workers = new NioWorker[workerCount];
+        for (int i = 0; i < workers.length; i ++) {
+            workers[i] = new NioWorker(0xBEEF, i + 1, workerExecutor);
+        }
+ }
+
+    public NioWorker nextWorker() {
+        return workers[Math.abs(
+                workerIndex.getAndIncrement() % workers.length)];
+    }
+}
\ No newline at end of file
Index:
src/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java
===================================================================
---
src/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java
(revision
1904)
+++
src/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java
(working
copy)
@@ -122,22 +122,34 @@
     public NioServerSocketChannelFactory(
             Executor bossExecutor, Executor workerExecutor,
             int workerCount) {
+        this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param bossExecutor
+     *        the {@link Executor} which will execute the boss threads
+     * @param workerPool
+     *        the {@link NioWorkerPool} which will supply nio workers
+     */
+    public NioServerSocketChannelFactory(
+     Executor bossExecutor, NioWorkerPool workerPool){
+
         if (bossExecutor == null) {
             throw new NullPointerException("bossExecutor");
         }
-        if (workerExecutor == null) {
-            throw new NullPointerException("workerExecutor");
+
+        if (workerPool == null) {
+            throw new NullPointerException("workerPool");
         }
-        if (workerCount <= 0) {
-            throw new IllegalArgumentException(
-                    "workerCount (" + workerCount + ") " +
-                    "must be a positive integer.");
-        }
-        this.bossExecutor = bossExecutor;
-        this.workerExecutor = workerExecutor;
-        sink = new NioServerSocketPipelineSink(workerExecutor,
workerCount);
+
+     this.bossExecutor = bossExecutor;
+     this.workerExecutor = workerPool.workerExecutor;
+        sink = new NioServerSocketPipelineSink(workerPool);
     }
-
+
+
     public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
         return new NioServerSocketChannel(this, pipeline, sink);
     }




2009/11/23 Trustin Lee [via Netty Forums and Mailing Lists] <
ml-node+4048335-1313223483 at n2.nabble.com<ml-node%2B4048335-1313223483 at n2.nabble.com>
>

> When I design Netty API, sharing worker threads for client and server
> side seemed to complicate the API too much comparing to the gained
> performance, and therefore the worker threads are not shared by
> different ChannelFactories.
>
> If there is obvious advantage in performance, I'd like to implement it
> though.  Please let me know if you find something.
>
> Thanks
>
> — Trustin Lee, http://gleamynode.net/
>
>
>
> On Mon, Nov 23, 2009 at 8:42 AM, Thingfish <[hidden email]<http://n2.nabble.com/user/SendEmail.jtp?type=node&node=4048335&i=0>>
> wrote:
>
> >
> > Hi
> >
> > I'm currently in the process of coding a peer 2 peer app on top of Netty.
> By
> > definition each peer acts as both a server and a client, which means that
> I
> > currently make use of both the ClientBootstrap and ServerBootstrap   to
> > support "connection setup" in both directions. Once established
> "incomming"
> > and "outgoing" connections are indistinct able, and for this reason they
> > share the same pipeline factory.
> >
> > My question is, if its possible to also have them share the same worker
> > threads and selectors? Currently the use of seperate NioClient and
> NioServer
> > SocketChannelFactories means, that two disjoint sets of worker threads
> and
> > I/O resources are created. As both these worker pools deliver the exact
> same
> > service, it would be more efficient to avoid the extra threads.
> >
> > --
> > View this message in context:
> http://n2.nabble.com/Can-you-share-ChannelFactory-worker-threads-in-a-p2p-app-tp4048241p4048241.html
> > Sent from the Netty User Group mailing list archive at Nabble.com.
> > _______________________________________________
> > netty-users mailing list
> > [hidden email]<http://n2.nabble.com/user/SendEmail.jtp?type=node&node=4048335&i=1>
> > https://lists.jboss.org/mailman/listinfo/netty-users
> >
>
> _______________________________________________
> netty-users mailing list
> [hidden email]<http://n2.nabble.com/user/SendEmail.jtp?type=node&node=4048335&i=2>
> https://lists.jboss.org/mailman/listinfo/netty-users
> — Trustin Lee, http://gleamynode.net/
>
>
> ------------------------------
>  View message @
> http://n2.nabble.com/Can-you-share-ChannelFactory-worker-threads-in-a-p2p-app-tp4048241p4048335.html
> To unsubscribe from Can you share ChannelFactory / worker threads in a p2p
> app?, click here< (link removed) =>.
>
>
>

-- 
View this message in context: http://n2.nabble.com/Can-you-share-ChannelFactory-worker-threads-in-a-p2p-app-tp4048241p4048535.html
Sent from the Netty User Group mailing list archive at Nabble.com.



More information about the netty-users mailing list