JDK HttpClient客户端的构建和启动-编程思维

HttpClient客户端的构建和启动

1. 简述

上篇简单测试过,Http Client 的性能相对高效。那么,这样一个客户端,又是怎样构建的呢?短短的构建代码,背后又发生了什么呢?

简而言之,HttpClient的构建使用了建造者模式,在构建时同时产生了HttpClient的两个实现类对应的对象:外观代理对象HttpclientFacade和真正的实现对象HttpclientImpl,两者互相引用彼此,一切对客户端的行为都首先作用在外观对象上,再交由实现对象来处理。在构建过程中,HttpclientImpl会创建一个作为NIO中的选择器管理者的守护线程,负责对各类I/O事件进行轮询,并分发相关事件进行处理。

理解本文,需要JAVA NIO相关知识。

本文所指的[HTTPClient] 都是指 JDK11 开始内置的HTTPClient及相关类。源代码分析基于JDK17。

2. HttpClient客户端构建:建造者模式、代理模式

先回顾下HttpClient客户端的构建代码:

HttpClient client = HttpClient.newBuilder()
        .executor(executor) //对建造者进行参数化
        .build();

可以看到,这里使用了典型的建造者模式(Builder Pattern)。使用建造者模式的好处是:Httpclient相对复杂的构建过程被隐藏了起来,使用者无需知道具体的构建细节。

在newBuilder()被调用后,产生一个HttpclientBuilderImpl的实例,之后可以链式地对该建造者参数化,最后通过build()方法,构建出需要的HttpClient实现类。

那么,build()执行时,发生了什么呢?我们走进build()方法的源码:

	@Override
    public HttpClient build() {
        return HttpClientImpl.create(this);
    }

可以看到,建造者将自身作为HttpClientImpl的静态方法create()的入参,将最后的构建过程交给了HttpClientImpl,我们跟随进入。

/**
 * Client implementation. Contains all configuration information and also
 * the selector manager thread which allows async events to be registered
 * and delivered when they occur. See AsyncEvent.
 */
final class HttpClientImpl extends HttpClient implements Trackable {
    
    static final AtomicLong CLIENT_IDS = new AtomicLong();

    //此处列出大部分成员变量
    
    private final CookieHandler cookieHandler;
    private final Duration connectTimeout;
    private final Redirect followRedirects;
    private final ProxySelector userProxySelector;
    private final ProxySelector proxySelector;
    private final Authenticator authenticator;
    private final Version version;
    private final ConnectionPool connections;
    private final DelegatingExecutor delegatingExecutor;
    private final boolean isDefaultExecutor;
    // Security parameters
    private final SSLContext sslContext;
    private final SSLParameters sslParams;
    private final SelectorManager selmgr;
    private final FilterFactory filters;
    //HttpClient2客户端
    private final Http2ClientImpl client2;
    private final long id;
    private final String dbgTag;
    
    private final SSLDirectBufferSupplier sslBufferSupplier
            = new SSLDirectBufferSupplier(this);
    //对HttpClient外观实现类的弱引用
    private final WeakReference<HttpClientFacade> facadeRef;
	
    //对待处理的请求的计数
    private final AtomicLong pendingOperationCount = new AtomicLong();
    private final AtomicLong pendingWebSocketCount = new AtomicLong();
    private final AtomicLong pendingHttpRequestCount = new AtomicLong();
    private final AtomicLong pendingHttp2StreamCount = new AtomicLong();

    /** 过期时间 */
    private final TreeSet<TimeoutEvent> timeouts;
/**
     * This is a bit tricky:
     * 1. an HttpClientFacade has a final HttpClientImpl field.
     * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
     *    where the referent is the facade created for that instance.
     * 3. We cannot just create the HttpClientFacade in the HttpClientImpl
     *    constructor, because it would be only weakly referenced and could
     *    be GC'ed before we can return it.
     * The solution is to use an instance of SingleFacadeFactory which will
     * allow the caller of new HttpClientImpl(...) to retrieve the facade
     * after the HttpClientImpl has been created.
     */
    //外观工厂
    private static final class SingleFacadeFactory {
        HttpClientFacade facade;
        HttpClientFacade createFacade(HttpClientImpl impl) {
            assert facade == null;
            return (facade = new HttpClientFacade(impl));
        }
    }

  //我们要分析的方法
    static HttpClientFacade create(HttpClientBuilderImpl builder) {
        //这是个Factory?这是在做什么呢?
        //其实,这里是HttpClient的外观代理实现类HttpClientFacade的构建工厂。
        SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
        //构建方法的重点:实例化HttpClientImpl。稍后详细分析
        HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
        //启动NIO选择子管理者守护线程,接收并响应I/O事件
        impl.start();
        assert facadeFactory.facade != null;
        assert impl.facadeRef.get() == facadeFactory.facade;
        //返回外观实现类
        return facadeFactory.facade;
    }
    
    //HttpClientImpl的初始化构造方法
    private HttpClientImpl(HttpClientBuilderImpl builder,
                           SingleFacadeFactory facadeFactory) {
    	//此处暂时省略,稍后分析
    }
    
}

可以看到,调用build()方法返回的是一个HttpClientFacade的外观实现类。看了上面的几行代码,语言的力量已经显得不足。HttpClientFacade是什么?它和HttpClientImpl的关系是什么?SingleFacadeFactory又是什么用途呢?因此,我们通过UML类图来说明。

分析HttpClient几个类的关系,我们可以看到,HttpClient抽象类有两个实现类:

  • 外观实现类HttpClientFacade(简称外观类)
  • 真正的实现类HttpClientImpl(简称实现类)。

由此我们可以看到HttpClientFacade存在的一个目的:作为一个“中介”,提供一个简单的封装,它强引用了HttpClientImpl,一切作用在HttpClient上的调用都会被交由HttpClientImpl处理。尽管其名字中带有Facade(名为外观)字样,但实际上承担的是代理类的角色:这是一个典型的静态代理模式的运用。

/**
 * An HttpClientFacade is a simple class that wraps an HttpClient implementation
 * and delegates everything to its implementation delegate.
 */
final class HttpClientFacade extends HttpClient implements Trackable {

    final HttpClientImpl impl;

    /**
     * Creates an HttpClientFacade.
     */
    HttpClientFacade(HttpClientImpl impl) {
        this.impl = impl;
    }

    @Override // for tests
    public Tracker getOperationsTracker() {
        return impl.getOperationsTracker();
    }

 //此处省略大批getter代码。可以看到,HttpClientFacade类中的方法都是对实际的实现类HttpClientImpl的简单调用

    @Override
    public Optional<Executor> executor() {
        return impl.executor();
    }

    @Override
    public <T> HttpResponse<T>
    send(HttpRequest req, HttpResponse.BodyHandler<T> responseBodyHandler)
        throws IOException, InterruptedException
    {
        try {
            return impl.send(req, responseBodyHandler);
        } finally {
            Reference.reachabilityFence(this);
        }
    }

    @Override
    public <T> CompletableFuture<HttpResponse<T>>
    sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseBodyHandler) {
        try {
            return impl.sendAsync(req, responseBodyHandler);
        } finally {
            Reference.reachabilityFence(this);
        }
    }
//省略一些方法

    @Override
    public WebSocket.Builder newWebSocketBuilder() {
        try {
            return impl.newWebSocketBuilder();
        } finally {
            Reference.reachabilityFence(this);
        }
    }

}

那么,实现类为什么要弱引用外观类呢?我们稍后再看。

我们关注为什么创建外观对象需要一个外观工厂(SingleFacadeFactory):正如其英文注释写到的这样:外观对象维持对实现类对象的强引用,而实现类对象则只维持了对外观对象的弱引用;如果直接在实现类的构造函数(稍后分析)中创建外观对象,可能外观对象会被JVM垃圾收集器直接回收。因此,需要一个工厂对象,维持对外观对象的引用,直到实现类被完全初始化完成,这样可以保证始终能返回给调用者可用的外观对象。

接下来,我们关注实现类HttpClientImpl的构造方法,看看初始化过程中究竟做了什么:

//HttpClientImpl的私有初始化构造方法
private HttpClientImpl(HttpClientBuilderImpl builder,
                           SingleFacadeFactory facadeFactory) {
    id = CLIENT_IDS.incrementAndGet();
    dbgTag = "HttpClientImpl(" + id +")";
    if (builder.sslContext == null) {
        try {
            //初始化默认ssl环境
            sslContext = SSLContext.getDefault();
        } catch (NoSuchAlgorithmException ex) {
            throw new UncheckedIOException(new IOException(ex));
        }
    } else {
        sslContext = builder.sslContext;
    }
    Executor ex = builder.executor;
    if (ex == null) {
        //若没有自定义线程池,则使用不限大小的线程池
        ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
        isDefaultExecutor = true;
    } else {
        isDefaultExecutor = false;
    }
    delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
    //外观HttpClient类弱引用初始化
    facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
    //初始化Http2专属的client
    client2 = new Http2ClientImpl(this);
    //cookie处理器、连接超时事件和重定向策略初始化和默认值设置
    cookieHandler = builder.cookieHandler;
    connectTimeout = builder.connectTimeout;
    //默认不跟随服务器的重定向请求
    followRedirects = builder.followRedirects == null ?
        Redirect.NEVER : builder.followRedirects;
    this.userProxySelector = builder.proxy;
    //若没有设置代理,使用默认的代理选择器
    this.proxySelector = Optional.ofNullable(userProxySelector)
        .orElseGet(HttpClientImpl::getDefaultProxySelector);
    if (debug.on())
        debug.log("proxySelector is %s (user-supplied=%s)",
                  this.proxySelector, userProxySelector != null);
    authenticator = builder.authenticator;
    //先初始化为Http2版本的客户端,之后针对请求会有降级的操作
    if (builder.version == null) {
        version = HttpClient.Version.HTTP_2;
    } else {
        version = builder.version;
    }
    //设置默认的ssl参数
    if (builder.sslParams == null) {
        sslParams = getDefaultParams(sslContext);
    } else {
        sslParams = builder.sslParams;
    }
    //连接池的初始化
    connections = new ConnectionPool(id);
    connections.start();
    //超时时间treeSet的初始化
    timeouts = new TreeSet<>();
    try {
        /*
            重点!此处初始化了一个SelectorManager的守护线程。
            该线程负责向操作系统轮询各类事件,并在事件发生时派发事件
            */
        selmgr = new SelectorManager(this);
    } catch (IOException e) {
        // unlikely
        throw new UncheckedIOException(e);
    }
    selmgr.setDaemon(true);
    //初始化请求头过滤器,包括重定向、认证和cookie管理器(若有)
    filters = new FilterFactory();
    initFilters();
    assert facadeRef.get() != null;
}

//此方法在上面分析过的create()静态方法中调用,
//启动选择子管理者守护线程
private void start() {
    selmgr.start();
}

//此处省略大量代码

//过滤器初始化,此处可以看到只添加了实现类到过滤器链表中,这是处于懒加载的考虑
//这里有个添加顺序的问题,后篇会稍作分析
private void initFilters() {
    addFilter(AuthenticationFilter.class);
    addFilter(RedirectFilter.class);
    if (this.cookieHandler != null) {
        addFilter(CookieFilter.class);
    }
}

可以看到,HttpClientImpl对象的构建过程相对复杂:

  • 进行了客户端策略的设置和默认值填充
  • 初始化了线程池(ex)和连接池(connections)
  • 初始化了请求头过滤器(懒加载)
  • 同时,初始化了一个SelectorManager的守护线程,并直接以新的线程方式启动。

那么,这个SelectorManager是何方神圣,扮演了怎样的作用?从名字中,你可能已经猜出,它是NIO编程中的选择器的管理者。

3. 选择器线程的运行

选择器管理者的主要行为可以如下概括:向系统轮询发生的各类I/O事件,并调用事件自身的方法进行分发处理

3.1 源码分析

我们进入SelectorManager的源码,重点关注主方法run():

//SelectorManager,HttpClientImpl的内部类,可直接作为线程启动,负责向系统轮询并派发事件	
// Main loop for this client's selector
    private final static class SelectorManager extends Thread {

        // 控制选择器在没有事件发生时的唤醒时间相关变量
        private static final int MIN_NODEADLINE = 1000; // ms
        private static final int MAX_NODEADLINE = 1000 * 1200; // ms
        private static final int DEF_NODEADLINE = 3000; // ms
        private static final long NODEADLINE; // default is DEF_NODEADLINE ms
        static {
            // ensure NODEADLINE is initialized with some valid value.
            long deadline =  Utils.getIntegerProperty(
                "jdk.internal.httpclient.selectorTimeout",
                DEF_NODEADLINE); // millis
            if (deadline <= 0) deadline = DEF_NODEADLINE;
            deadline = Math.max(deadline, MIN_NODEADLINE);
            NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
        }

        //JAVA NIO中的选择器,负责向操作系统轮询事件
        private final Selector selector;
        private volatile boolean closed;
        //注册和解挂相关的事件
        private final List<AsyncEvent> registrations;
        private final List<AsyncTriggerEvent> deregistrations;
        private final Logger debug;
        private final Logger debugtimeout;
        HttpClientImpl owner;
        ConnectionPool pool;

        //构造方法
        SelectorManager(HttpClientImpl ref) throws IOException {
            super(null, null,
                  "HttpClient-" + ref.id + "-SelectorManager",
                  0, false);
            owner = ref;
            debug = ref.debug;
            debugtimeout = ref.debugtimeout;
            pool = ref.connectionPool();
            registrations = new ArrayList<>();
            deregistrations = new ArrayList<>();
            selector = Selector.open();
        }

        void eventUpdated(AsyncEvent e) throws ClosedChannelException {
          	//添加事件到注册事件列表,并唤醒选择器,省略
        }
        
        // This returns immediately. So caller not allowed to send/receive
        // on connection.
        //注册事件方法,将时间添加到注册时间列表等待选择器处理,并唤醒选择器
        //客户端会间接调用此方法添加事件
        synchronized void register(AsyncEvent e) {
            registrations.add(e);
            selector.wakeup();
        }

        synchronized void cancel(SocketChannel e) {
            //该方法没有被调用?忽略
        }

        //唤醒选择器
        void wakeupSelector() {
            selector.wakeup();
        }

        synchronized void shutdown() {
            //关闭选择器,连接和线程池,暂时省略
        }

        /*
       	SelectorManager线程的主要运行方法
        */
        @Override
        public void run() {
            List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
            //初始化就绪事件和重置事件列表
            List<AsyncEvent> readyList = new ArrayList<>();
            List<Runnable> resetList = new ArrayList<>();
            try {
                if (Log.channel()) Log.logChannel(getName() + ": starting");
                //开启无限循环
                while (!Thread.currentThread().isInterrupted()) {
                    synchronized (this) {
                        assert errorList.isEmpty();
                        assert readyList.isEmpty();
                        assert resetList.isEmpty();
                        //首先处理要注销的事件,然后清空注销事件列表
                        for (AsyncTriggerEvent event : deregistrations) {
                            event.handle();
                        }
                        deregistrations.clear();
                        //处理注册事件列表中的事件
                        for (AsyncEvent event : registrations) {
                            //AsyncTriggerEvent无需注册到通道上,直接加入待处理列表
                            if (event instanceof AsyncTriggerEvent) {
                                readyList.add(event);
                                continue;
                            }
                            //从事件中获取事件维护的NIO channel通道
                            SelectableChannel chan = event.channel();
                            SelectionKey key = null;
                            try {
                                //获取通道对应的选择键(是连接建立时,通道绑定到选择器上分配的)
                                key = chan.keyFor(selector);
                                SelectorAttachment sa;
                                if (key == null || !key.isValid()) {
                                    if (key != null) {
                                        // key is canceled.
                                        // invoke selectNow() to purge it
                                        // before registering the new event.
                                        selector.selectNow();
                                    }
                                    sa = new SelectorAttachment(chan, selector);
                                } else {
                                    //获取绑定到选择器上的附件(也是通道注册到选择器上时附带的)
                                    //稍后将看到,该附件维护了通道、选择器、通道上的待处理事件列表的关系,
                                    //扮演着异步事件中转站的角色
                                    sa = (SelectorAttachment) key.attachment();
                                }
                                //添加事件到附件中的待处理列表(pending属性),并将通道重新向选择器注册
                                // may throw IOE if channel closed: that's OK
                                sa.register(event);
                                if (!chan.isOpen()) {
                                    throw new IOException("Channel closed");
                                }
                            } catch (IOException e) {
                                Log.logTrace("{0}: {1}", getName(), e);
                                if (debug.on())
                                    debug.log("Got " + e.getClass().getName()
                                              + " while handling registration events");
                                chan.close();
                                //发生I/O错误的情况下,将事件加入错误列表,并取消选择键
                                // let the event abort deal with it
                                errorList.add(new Pair<>(event, e));
                                if (key != null) {
                                    key.cancel();
                                    selector.selectNow();
                                }
                            }
                        }
                        registrations.clear();
                        selector.selectedKeys().clear();
                    }
                    // 处理 加入到列表的AsyncTriggerEvent
                    for (AsyncEvent event : readyList) {
                        assert event instanceof AsyncTriggerEvent;
                        event.handle();
                    }
                    readyList.clear();

                    for (Pair<AsyncEvent,IOException> error : errorList) {
                        // an IOException was raised and the channel closed.
                        handleEvent(error.first, error.second);
                    }
                    errorList.clear();

                    //当客户端不再被引用时,结束选择器的运行
                    // Check whether client is still alive, and if not,
                    // gracefully stop this thread
                    if (!owner.isReferenced()) {
                        Log.logTrace("{0}: {1}",
                                getName(),
                                "HttpClient no longer referenced. Exiting...");
                        return;
                    }

                    //下面是一些对选择器select方法的阻塞时长的计算
                    long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
                    if (debugtimeout.on())
                        debugtimeout.log("next timeout: %d", nextTimeout);

                    long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
                    if (debugtimeout.on())
                        debugtimeout.log("next expired: %d", nextExpiry);
                    assert nextTimeout >= 0;
                    assert nextExpiry >= 0;
                    if (nextTimeout <= 0) nextTimeout = NODEADLINE;
                    if (nextExpiry <= 0) nextExpiry = NODEADLINE;
                    else nextExpiry = Math.min(NODEADLINE, nextExpiry);
                    long millis = Math.min(nextExpiry, nextTimeout);
                    if (debugtimeout.on())
                        debugtimeout.log("Next deadline is %d",
                                         (millis == 0 ? NODEADLINE : millis));
                    /*selector的select方法:负责向操作系统轮询阻塞的事件。若在millis毫秒后,
                    没有任何通道有就绪事件发生,该方法会在millis毫秒后返回0;否则阻塞过程中,
                    当有至少1个通道有事件发生时返回,返回有事件发生的通道的数量。*/
                    int n = selector.select(millis == 0 ? NODEADLINE : millis);
                    if (n == 0) {
                        //如果没有事件发生,看看外观客户端是否还被引用,否则退出方法
                        // Check whether client is still alive, and if not,
                        // gracefully stop this thread
                        if (!owner.isReferenced()) {
                            Log.logTrace("{0}: {1}",
                                    getName(),
                                    "HttpClient no longer referenced. Exiting...");
                            return;
                        }
                        //清楚连接池中的超时连接
                        owner.purgeTimeoutsAndReturnNextDeadline();
                        continue;
                    }

                    //返回有就绪的事件的通道的选择键
                    Set<SelectionKey> keys = selector.selectedKeys();
                    assert errorList.isEmpty();
                    
                    /*
                    这一步是关键:
                    遍历有事件I/O事件发生的选择键,取出对应的“附件”,
                    匹配筛选附件上事件列表中 感兴趣的事件类型和发生的I/O事件类型相符合的 事件,
                    异步处理这些事件,并将它们从选择键中的待办事件列表中删除
                    */
                    for (SelectionKey key : keys) {
                        SelectorAttachment sa = (SelectorAttachment) key.attachment();
                        //处理选择键失效的情况(键被取消,通道关闭或选择器关闭)的情况
                        if (!key.isValid()) {
                            IOException ex = sa.chan.isOpen()
                                    ? new IOException("Invalid key")
                                    : new ClosedChannelException();
                            sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
                            sa.pending.clear();
                            continue;
                        }

                        int eventsOccurred;
                        try {
                            eventsOccurred = key.readyOps();
                        } catch (CancelledKeyException ex) {
                            //处理选择键被取消的情况
                            IOException io = Utils.getIOException(ex);
                            sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
                            sa.pending.clear();
                            continue;
                        }
                        //从附件的等待列表中去除待处理的事件(上一个大的for循环中加入的),加入待处理列表
                        sa.events(eventsOccurred).forEach(readyList::add);
                        //将被保存在该“附件”的等待事件列表内的、操作类型和给定的感兴趣操作相符合的 
                        //待处理事件从该附件的等待列表中移除
                        resetList.add(() -> sa.resetInterestOps(eventsOccurred));
                    }

                    selector.selectNow(); // complete cancellation
                    selector.selectedKeys().clear();

                    // handle selected events  处理待处理事件
                    readyList.forEach((e) -> handleEvent(e, null));
                    readyList.clear();

                    // handle errors (closed channels etc...) 处理错误
                    errorList.forEach((p) -> handleEvent(p.first, p.second));
                    errorList.clear();

                    // reset interest ops for selected channels 
                    resetList.forEach(r -> r.run());
                    resetList.clear();

                }
            } catch (Throwable e) {
                if (!closed) {
                    // This terminates thread. So, better just print stack trace
                    String err = Utils.stackTrace(e);
                    Log.logError("{0}: {1}: {2}", getName(),
                            "HttpClientImpl shutting down due to fatal error", err);
                }
                if (debug.on()) debug.log("shutting down", e);
                if (Utils.ASSERTIONSENABLED && !debug.on()) {
                    e.printStackTrace(System.err); // always print the stack
                }
            } finally {
                if (Log.channel()) Log.logChannel(getName() + ": stopping");
                shutdown();
            }
        }

下面是SelectorManager类的源码,同样是HttpClientImpl的内部类。该类在通道注册到选择器上时作为“附件”附着到选择键上,在事件发生时可以取出来。该类的作用是管理多个针对同一个选择键的注册行为。其相当于一个”中转站“的作用。

/**
 * Tracks multiple user level registrations associated with one NIO
 * registration (SelectionKey). In this implementation, registrations
 * are one-off and when an event is posted the registration is cancelled
 * until explicitly registered again.
 *
 * <p> No external synchronization required as this class is only used
 * by the SelectorManager thread. One of these objects required per
 * connection.
 */
private static class SelectorAttachment {
    private final SelectableChannel chan;
    private final Selector selector;
    private final Set<AsyncEvent> pending;
    private final static Logger debug =
            Utils.getDebugLogger("SelectorAttachment"::toString, Utils.DEBUG);
    private int interestOps;

    SelectorAttachment(SelectableChannel chan, Selector selector) {
        this.pending = new HashSet<>();
        this.chan = chan;
        this.selector = selector;
    }

    void register(AsyncEvent e) throws ClosedChannelException {
        int newOps = e.interestOps();
        //判断是否重新注册
        boolean reRegister = (interestOps & newOps) != newOps;
        interestOps |= newOps;
        pending.add(e);
        if (debug.on())
            debug.log("Registering %s for %d (%s)", e, newOps, reRegister);
        if (reRegister) {
            // first time registration happens here also
            try {
                //将通道注册到选择器上,会更新感兴趣的事件,并将自身“附着”到选择器上
                chan.register(selector, interestOps, this);
            } catch (Throwable x) {
                abortPending(x);
            }
        } else if (!chan.isOpen()) {
            abortPending(new ClosedChannelException());
        }
    }

    /**筛选出操作类型与该通道感兴趣的操作重合的待处理事件
     * Returns a Stream<AsyncEvents> containing only events that are
     * registered with the given {@code interestOps}.
     */
    Stream<AsyncEvent> events(int interestOps) {
        return pending.stream()
                .filter(ev -> (ev.interestOps() & interestOps) != 0);
    }

    /**
        将被保存在该“附件”的等待事件列表内的、操作类型和给定的感兴趣操作相符合的 待处理事件
        从该附件的等待列表中移除
     * Removes any events with the given {@code interestOps}, and if no
     * events remaining, cancels the associated SelectionKey.
     */
    void resetInterestOps(int interestOps) {
        int newOps = 0;

        Iterator<AsyncEvent> itr = pending.iterator();
        while (itr.hasNext()) {
            AsyncEvent event = itr.next();
            int evops = event.interestOps();
            if (event.repeating()) {
                newOps |= evops;
                continue;
            }
            if ((evops & interestOps) != 0) {
                itr.remove();
            } else {
                newOps |= evops;
            }
        }

        this.interestOps = newOps;
        SelectionKey key = chan.keyFor(selector);
        if (newOps == 0 && key != null && pending.isEmpty()) {
            key.cancel();
        } else {
            try {
                if (key == null || !key.isValid()) {
                    throw new CancelledKeyException();
                }
                key.interestOps(newOps);
                // double check after
                if (!chan.isOpen()) {
                    abortPending(new ClosedChannelException());
                    return;
                }
                assert key.interestOps() == newOps;
            } catch (CancelledKeyException x) {
                // channel may have been closed
                if (debug.on()) debug.log("key cancelled for " + chan);
                abortPending(x);
            }
        }
    }

    void abortPending(Throwable x) {
        if (!pending.isEmpty()) {
            AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
            pending.clear();
            IOException io = Utils.getIOException(x);
            for (AsyncEvent event : evts) {
                event.abort(io);
            }
        }
    }
}

3.2 基本流程

在线程初始化后,选择器线程会固定间隔地执行如下流程:

  1. 遍历初始时为空的等待事件列表。
  2. 阻塞地向操作系统轮询各I/O通道的事件,每隔一段时间返回就绪事件对应的选择键。
  3. 在没有连接、请求等操作下,没有其余操作

而当存在请求时,后续将看到,客户端会将异步事件放入选择器线程的等待事件列表中。此时,选择器线程流程如下:

  1. 遍历等待事件列表,若有相关事件,则将他们加入到对应的选择键上附着的SelectorAttachment对象的等待事件队列。
  2. 阻塞地向操作系统轮询各I/O通道的事件,当有事件发生时,遍历对应选择键,筛选出对应的“附件”中与发生的I/O事件相符合的待办事件列表
  3. 异步处理待办事件列表,并清除对应附件上已处理的事件

需要注意的是,上述表述中“待办事件”或”等待事件“等表述的是具体的AsyncEvent对象,而通道或选择器上的”事件“则表示I/O事件,两者有联系也有区别。

3.3 外观客户端的意义

同时,我们注意到,代码中,当选择器轮询I/O事件一无所获时,会检查外观类(HttpClientFacade)是否还被实现类(HttpClientImpl)引用,如果没有,便结束线程。

int n = selector.select(millis == 0 ? NODEADLINE : millis);
if (n == 0) {
    //如果没有事件发生,看看外观客户端是否还被引用,否则退出方法
    // Check whether client is still alive, and if not,
    // gracefully stop this thread
    if (!owner.isReferenced()) {
        Log.logTrace("{0}: {1}",
                     getName(),
                     "HttpClient no longer referenced. Exiting...");
        return;
    }
    owner.purgeTimeoutsAndReturnNextDeadline();
    continue;
}

结合代码中的注释,我们可以看出外观类HttpClientFacade的意义:它实质上并非只是对真正实现类HttpClientImpl的代理。HttpClientImpl需要知道调用者(应用程序)什么时候不再持有了对HttpClient(Facade)外观对象的引用,以便及时中止守护线程。而弱引用,正好满足了这样的要求:当应用程序不再持有Facade对象时,外观对象会在垃圾回收时被回收。HttpClientImpl通过检查对外观对象的弱引用是否为空,即可知道是否要停止处理I/O事件的守护线程。

4. 小结

HttpClient的初始化并非如我们想象的那般简单。由于拥抱了NIO,在客户端构建时,后台线程便已开始运行,处理未来的I/O事件。

理解客户端的构建和启动流程,有助于我们更深入地理解NIO,体会其哲学和魅力。

下篇,我们将看到,在HttpClient接受用户请求的调用后,会面临生成多个请求的问题,那么,它又是怎样处理的呢?

版权声明:本文版权归作者所有,遵循 CC 4.0 BY-SA 许可协议, 转载请注明原文链接
https://www.cnblogs.com/cavern-builder-zyx/p/15754686.html

JDK HttpClient 多重请求-响应的处理-编程思维

HttpClient 多重请求-响应的处理 目录HttpClient 多重请求-响应的处理1. 简述2. 请求响应流程图3. 用户请求的复制4. 多重请求处理概览5. 请求、响应过滤的执行:cookie,认证和重定向6. 小结 1. 简述 上篇介绍了JDK HttpClient客户端的构建和启动。在客户端构建完成时,后

C#网络编程入门之UDP-编程思维

一、概述 UDP和TCP是网络通讯常用的两个传输协议,C#一般可以通过Socket来实现UDP和TCP通讯,由于.NET框架通过UdpClient、TcpListener 、TcpClient这几个类对Socket进行了封装,使其使用更加方便, 本文就通过这几个封装过的类讲解一下相关应用。   二、UDP基本应用 与T

JDK Httpclient 使用和性能测试-编程思维

Httpclient 使用和性能测试 上篇,通过简介和架构图,我们对HttpClient有了初步的了解。 本篇我们展示HttpClient的简单使用,同时为了说明httpclient的使用性能,我们将Httpclient的同步和异步模式与apache的Httpclient4作比较。。 1. HttpClient示例代码