Can you share ChannelFactory / worker threads in a p2p app?

Trustin Lee (이희승) trustin at gmail.com
Mon Nov 30 17:26:06 EST 2009


Thanks a lot!  Your patch looks fine to me.  I will apply it soon.

— Trustin Lee, http://gleamynode.net/



On Mon, Nov 23, 2009 at 10:38 AM, Thingfish <jannick at ovja.dk> wrote:
>
> There is probably not much of a performance difference looking at the
> individual app. I want to be able to run a large number of instances on a
> single machine (for testing), and number of threads might become an issue in
> this case.
>
> I've put a simple (as of yet untested) suggestion together, as for how
> worker sharing might be archived. I noticed that NioServerSocketPipelineSink
> and NioClientSocketPipelineSink implements the same round-robin load
> balancing over a worker array, and that the sinks make no use of
> workerExecutor and workerCount once the worker array has been constructed.
> I've factored this out into a NioWorkerPool class, and changed the sinks to
> receive this instead. The channel factories have had a new constructor
> added, that takes a pre-constructed workerPool, thereby making it possible
> to share the pool between clients and servers.
>
> Have included the code below (as a patch against trunk). A few issues: Need
> to assign a useful bossID to a shared worker, and also change NioWorker so
> that an appropriate threadName is selected. Do you foresee any issues with
> this solution?
>
> Regards
> Jannick Bitsch
>
>
> ### Eclipse Workspace Patch 1.0
> #P Netty
> Index:
> src/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java
> ===================================================================
> ---
> src/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java
> (revision
> 1904)
> +++
> src/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java
> (working
> copy)
> @@ -118,23 +118,30 @@
>     public NioClientSocketChannelFactory(
>             Executor bossExecutor, Executor workerExecutor,
>             int workerCount) {
> +     this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
> +    }
> +
> +
> +    /**
> +     * Creates a new instance.
> +     *
> +     * @param bossExecutor
> +     *        the {@link Executor} which will execute the boss thread
> +     * @param workerPool
> +     *        the {@link NioWorkerPool} which will supply nio workers
> +     */
> +    public NioClientSocketChannelFactory(
> +            Executor bossExecutor, NioWorkerPool workerPool) {
>         if (bossExecutor == null) {
>             throw new NullPointerException("bossExecutor");
>         }
> -        if (workerExecutor == null) {
> -            throw new NullPointerException("workerExecutor");
> -        }
> -        if (workerCount <= 0) {
> -            throw new IllegalArgumentException(
> -                    "workerCount (" + workerCount + ") " +
> -                    "must be a positive integer.");
> -        }
>
>         this.bossExecutor = bossExecutor;
> -        this.workerExecutor = workerExecutor;
> -        sink = new NioClientSocketPipelineSink(bossExecutor,
> workerExecutor, workerCount);
> -    }
> +        this.workerExecutor = workerPool.workerExecutor;
> +        sink = new NioClientSocketPipelineSink(bossExecutor, workerPool);
> +    }
>
> +
>     public SocketChannel newChannel(ChannelPipeline pipeline) {
>         return new NioClientSocketChannel(this, pipeline, sink,
> sink.nextWorker());
>     }
> Index:
> src/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java
> ===================================================================
> --- src/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java
> (revision
> 1904)
> +++ src/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java
> (working
> copy)
> @@ -63,16 +63,12 @@
>     final int id = nextId.incrementAndGet();
>     final Executor bossExecutor;
>     private final Boss boss = new Boss();
> -    private final NioWorker[] workers;
> -    private final AtomicInteger workerIndex = new AtomicInteger();
> +    private final NioWorkerPool workerPool;
>
>     NioClientSocketPipelineSink(
> -            Executor bossExecutor, Executor workerExecutor, int
> workerCount) {
> +            Executor bossExecutor, NioWorkerPool workerPool) {
>         this.bossExecutor = bossExecutor;
> -        workers = new NioWorker[workerCount];
> -        for (int i = 0; i < workers.length; i ++) {
> -            workers[i] = new NioWorker(id, i + 1, workerExecutor);
> -        }
> +        this.workerPool = workerPool;
>     }
>
>     public void eventSunk(
> @@ -160,8 +156,7 @@
>     }
>
>     NioWorker nextWorker() {
> -        return workers[Math.abs(
> -                workerIndex.getAndIncrement() % workers.length)];
> +        return workerPool.nextWorker();
>     }
>
>     private final class Boss implements Runnable {
> Index:
> src/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java
> ===================================================================
> --- src/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java
> (revision
> 1904)
> +++ src/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java
> (working
> copy)
> @@ -57,14 +57,10 @@
>     private static final AtomicInteger nextId = new AtomicInteger();
>
>     private final int id = nextId.incrementAndGet();
> -    private final NioWorker[] workers;
> -    private final AtomicInteger workerIndex = new AtomicInteger();
> +    private final NioWorkerPool workerPool;
>
> -    NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
> -        workers = new NioWorker[workerCount];
> -        for (int i = 0; i < workers.length; i ++) {
> -            workers[i] = new NioWorker(id, i + 1, workerExecutor);
> -        }
> +    NioServerSocketPipelineSink(NioWorkerPool workerPool) {
> +     this.workerPool = workerPool;
>     }
>
>     public void eventSunk(
> @@ -191,8 +187,7 @@
>     }
>
>     NioWorker nextWorker() {
> -        return workers[Math.abs(
> -                workerIndex.getAndIncrement() % workers.length)];
> +        return workerPool.nextWorker();
>     }
>
>     private final class Boss implements Runnable {
> Index: src/org/jboss/netty/channel/socket/nio/NioWorkerPool.java
> ===================================================================
> --- src/org/jboss/netty/channel/socket/nio/NioWorkerPool.java (revision 0)
> +++ src/org/jboss/netty/channel/socket/nio/NioWorkerPool.java (revision 0)
> @@ -0,0 +1,34 @@
> +package org.jboss.netty.channel.socket.nio;
> +
> +import java.util.concurrent.Executor;
> +import java.util.concurrent.atomic.AtomicInteger;
> +
> +public class NioWorkerPool {
> +
> + private final NioWorker[] workers;
> + private final AtomicInteger workerIndex = new AtomicInteger();
> + final Executor workerExecutor;
> +
> + public NioWorkerPool(Executor workerExecutor, int workerCount){
> +        if (workerExecutor == null) {
> +            throw new NullPointerException("workerExecutor");
> +        }
> +        if (workerCount <= 0) {
> +            throw new IllegalArgumentException(
> +                    "workerCount (" + workerCount + ") " +
> +                    "must be a positive integer.");
> +        }
> +
> + this.workerExecutor = workerExecutor;
> +
> + workers = new NioWorker[workerCount];
> +        for (int i = 0; i < workers.length; i ++) {
> +            workers[i] = new NioWorker(0xBEEF, i + 1, workerExecutor);
> +        }
> + }
> +
> +    public NioWorker nextWorker() {
> +        return workers[Math.abs(
> +                workerIndex.getAndIncrement() % workers.length)];
> +    }
> +}
> \ No newline at end of file
> Index:
> src/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java
> ===================================================================
> ---
> src/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java
> (revision
> 1904)
> +++
> src/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java
> (working
> copy)
> @@ -122,22 +122,34 @@
>     public NioServerSocketChannelFactory(
>             Executor bossExecutor, Executor workerExecutor,
>             int workerCount) {
> +        this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
> +    }
> +
> +    /**
> +     * Creates a new instance.
> +     *
> +     * @param bossExecutor
> +     *        the {@link Executor} which will execute the boss threads
> +     * @param workerPool
> +     *        the {@link NioWorkerPool} which will supply nio workers
> +     */
> +    public NioServerSocketChannelFactory(
> +     Executor bossExecutor, NioWorkerPool workerPool){
> +
>         if (bossExecutor == null) {
>             throw new NullPointerException("bossExecutor");
>         }
> -        if (workerExecutor == null) {
> -            throw new NullPointerException("workerExecutor");
> +
> +        if (workerPool == null) {
> +            throw new NullPointerException("workerPool");
>         }
> -        if (workerCount <= 0) {
> -            throw new IllegalArgumentException(
> -                    "workerCount (" + workerCount + ") " +
> -                    "must be a positive integer.");
> -        }
> -        this.bossExecutor = bossExecutor;
> -        this.workerExecutor = workerExecutor;
> -        sink = new NioServerSocketPipelineSink(workerExecutor,
> workerCount);
> +
> +     this.bossExecutor = bossExecutor;
> +     this.workerExecutor = workerPool.workerExecutor;
> +        sink = new NioServerSocketPipelineSink(workerPool);
>     }
> -
> +
> +
>     public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
>         return new NioServerSocketChannel(this, pipeline, sink);
>     }
>
>
>
>
> 2009/11/23 Trustin Lee [via Netty Forums and Mailing Lists] <
> ml-node+4048335-1313223483 at n2.nabble.com<ml-node%2B4048335-1313223483 at n2.nabble.com>
>>
>
>> When I design Netty API, sharing worker threads for client and server
>> side seemed to complicate the API too much comparing to the gained
>> performance, and therefore the worker threads are not shared by
>> different ChannelFactories.
>>
>> If there is obvious advantage in performance, I'd like to implement it
>> though.  Please let me know if you find something.
>>
>> Thanks
>>
>> — Trustin Lee, http://gleamynode.net/
>>
>>
>>
>> On Mon, Nov 23, 2009 at 8:42 AM, Thingfish <[hidden email]<http://n2.nabble.com/user/SendEmail.jtp?type=node&node=4048335&i=0>>
>> wrote:
>>
>> >
>> > Hi
>> >
>> > I'm currently in the process of coding a peer 2 peer app on top of Netty.
>> By
>> > definition each peer acts as both a server and a client, which means that
>> I
>> > currently make use of both the ClientBootstrap and ServerBootstrap   to
>> > support "connection setup" in both directions. Once established
>> "incomming"
>> > and "outgoing" connections are indistinct able, and for this reason they
>> > share the same pipeline factory.
>> >
>> > My question is, if its possible to also have them share the same worker
>> > threads and selectors? Currently the use of seperate NioClient and
>> NioServer
>> > SocketChannelFactories means, that two disjoint sets of worker threads
>> and
>> > I/O resources are created. As both these worker pools deliver the exact
>> same
>> > service, it would be more efficient to avoid the extra threads.
>> >
>> > --
>> > View this message in context:
>> http://n2.nabble.com/Can-you-share-ChannelFactory-worker-threads-in-a-p2p-app-tp4048241p4048241.html
>> > Sent from the Netty User Group mailing list archive at Nabble.com.
>> > _______________________________________________
>> > netty-users mailing list
>> > [hidden email]<http://n2.nabble.com/user/SendEmail.jtp?type=node&node=4048335&i=1>
>> > https://lists.jboss.org/mailman/listinfo/netty-users
>> >
>>
>> _______________________________________________
>> netty-users mailing list
>> [hidden email]<http://n2.nabble.com/user/SendEmail.jtp?type=node&node=4048335&i=2>
>> https://lists.jboss.org/mailman/listinfo/netty-users
>> — Trustin Lee, http://gleamynode.net/
>>
>>
>> ------------------------------
>>  View message @
>> http://n2.nabble.com/Can-you-share-ChannelFactory-worker-threads-in-a-p2p-app-tp4048241p4048335.html
>> To unsubscribe from Can you share ChannelFactory / worker threads in a p2p
>> app?, click here< (link removed) =>.
>>
>>
>>
>
> --
> View this message in context: http://n2.nabble.com/Can-you-share-ChannelFactory-worker-threads-in-a-p2p-app-tp4048241p4048535.html
> Sent from the Netty User Group mailing list archive at Nabble.com.
>
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users
>



More information about the netty-users mailing list