poll有三个重载方法,最终都会调用poll(long timeout, long now, boolean executeDelayedTasks)
timeout:执行poll方法最长阻塞时间
now:当前时间戳;
executeDelayedTasks:是否执行delayedTasks队列中的定时任务。
privatevoidpoll(long timeout,long now,boolean executeDelayedTasks) { // send all the requests we can send now 在now发送全部请求,主要回去循环处理unsent中缓存的请求 trySend(now); // ensure we don't poll any longer than the deadline for // the next scheduled task //计算超时时间,此超时时间由timeout与delayedTasks队列中最近要执行的定时任务的时间共同决定 timeout =Math.min(timeout,delayedTasks.nextTimeout(now)); //调用NetworkClient的poll方法,超时时间传递计算的出来的超时时间 clientPoll(timeout, now); //拿到发送完请求的后的当前时间戳 now =time.milliseconds(); // handle any disconnects by failing the active requests. note that disconnects must // be checked immediately following poll since any subsequent call to client.ready() // will reset the disconnect status //检查是否断开连接 checkDisconnects(now); // execute scheduled tasks 如果executeDelayedTasks为true指定定时任务 if (executeDelayedTasks) delayedTasks.poll(now); // try again to send requests since buffer space may have been // cleared or a connect finished in the poll trySend(now); // fail requests that couldn't be sent if they have expired //失败的请求如果它们已经过期不能再发送 failExpiredRequests(now); }
首先执行trySend方法,会尝试将unsent中缓存的请求全部发送
privatebooleantrySend(long now) { // send any requests that can be sent now boolean requestsSent =false; //遍历缓存中的全部请求 for (Map.Entry<Node,List<ClientRequest>> requestEntry :unsent.entrySet()) { //拿到对应的node节点 Node node =requestEntry.getKey(); Iterator<ClientRequest> iterator =requestEntry.getValue().iterator(); //遍历发送ClientRequest请求 while (iterator.hasNext()) { ClientRequest request =iterator.next(); //如果node就绪 if (client.ready(node, now)) { //发送请求,将客户端请求放入InFlightRequests队列等待响应,也放入KafkaChannel的send字段中等待发送 client.send(request, now); //从unsent缓存中删除 iterator.remove(); //请求发送设置为true requestsSent =true; } } } return requestsSent; }
privatevoidcheckDisconnects(long now) { // any disconnects affecting requests that have already been transmitted will be handled // by NetworkClient, so we just need to check whether connections for any of the unsent // requests have been disconnected; if they have, then we complete the corresponding future // and set the disconnect flag in the ClientResponse //判断usent缓存中的每个node节点的连接状态 Iterator<Map.Entry<Node,List<ClientRequest>>> iterator =unsent.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<Node,List<ClientRequest>> requestEntry =iterator.next(); Node node =requestEntry.getKey(); //如果连接失败移除该数据,并且将对应的请求通过请求完成处理器传递给客户端 if (client.connectionFailed(node)) { // Remove entry before invoking request callback to avoid callbacks handling // coordinator failures traversing the unsent list again. iterator.remove(); for (ClientRequest request :requestEntry.getValue()) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); handler.onComplete(newClientResponse(request, now,true,null)); } } } }
根据executeDelayedTasks决定是否处理delayTasks
// execute scheduled tasks 如果executeDelayedTasks为true指定定时任务 if (executeDelayedTasks) delayedTasks.poll(now); ## delayedTask包括AutoCommitOffsetTask和HeartBeartTask
privatevoidfailExpiredRequests(long now) { // clear all expired unsent requests and fail their corresponding futures //遍历unsent缓存 Iterator<Map.Entry<Node,List<ClientRequest>>> iterator =unsent.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<Node,List<ClientRequest>> requestEntry =iterator.next(); Iterator<ClientRequest> requestIterator =requestEntry.getValue().iterator(); while (requestIterator.hasNext()) { ClientRequest request =requestIterator.next(); //如果请求已经超时 移除并且抛出移除 if (request.createdTimeMs() < now - unsentExpiryMs) { //将异常放入请求完成处理器中并且移除请求 RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); handler.raise(newTimeoutException("Failed to send request after "+ unsentExpiryMs +" ms.")); requestIterator.remove(); } elsebreak; } //如果请求集合为空,那么将其从unsent移除 if (requestEntry.getValue().isEmpty()) iterator.remove(); } }
pollNoWakeup方法
将待发送的请求封装成ClientRequest,然后保存到unsent集合中等待发送。
/** * 调用IO请求并且立即返回,这将不能触发中断,也不会执行任何延迟任务 * Poll for network IO and return immediately. This will not trigger wakeups, * nor will it execute any delayed tasks. */publicvoidpollNoWakeup() { //关闭中断处理请求,添加不可中断方法 disableWakeups(); try { //立即处理数据,并且不允许阻塞 poll(0,time.milliseconds(),false); } finally { enableWakeups(); } } publicvoidenableWakeups() { if (wakeupDisabledCount <=0) thrownewIllegalStateException("Cannot enable wakeups since they were never disabled"); wakeupDisabledCount--; // re-wakeup the client if the flag was set since previous wake-up call // could be cleared by poll(0) while wakeups were disabled //中断方法,唤醒当前阻塞IO if (wakeupDisabledCount ==0&&wakeup.get()) this.client.wakeup(); }