# 核心onSend方法 privateFuture<RecordMetadata>doSend(ProducerRecord<K, V> record,Callback callback) { TopicPartition tp =null; try { //首先确保该主题的元数据可用 // first make sure the metadata for the topic is available long waitedOnMetadataMs =waitOnMetadata(record.topic(),this.maxBlockTimeMs); long remainingWaitMs =Math.max(0,this.maxBlockTimeMs- waitedOnMetadataMs); byte[] serializedKey; try { //序列化key serializedKey =keySerializer.serialize(record.topic(),record.key()); } catch (ClassCastException cce) { thrownewSerializationException("Can't convert key of class "+record.key().getClass().getName() +" to class "+producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer"); } byte[] serializedValue; try { //序列化value serializedValue =valueSerializer.serialize(record.topic(),record.value()); } catch (ClassCastException cce) { thrownewSerializationException("Can't convert value of class "+record.value().getClass().getName() +" to class "+producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer"); } //基于key 分区 int partition =partition(record, serializedKey, serializedValue,metadata.fetch()); //序列化后大小 int serializedSize =Records.LOG_OVERHEAD+Record.recordSize(serializedKey, serializedValue); //校验大小是否超过配置,max.request.size和buffer.memory ensureValidRecordSize(serializedSize); //得到TopicPartition tp =newTopicPartition(record.topic(), partition); //如果记录没有时间戳就新生成 long timestamp =record.timestamp() ==null?time.milliseconds() :record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback //得到回调方法 Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
//积累到积累类机器中 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
//如果batchIsFull或者新的batch已经创建,唤醒sender线程 if (result.batchIsFull||result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup(); } //然后结果的future returnresult.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback !=null) callback.onCompletion(null, e); this.errors.record(); if (this.interceptors!=null) this.interceptors.onSendError(record, tp, e); returnnewFutureFailure(e); } catch (InterruptedException e) { this.errors.record(); if (this.interceptors!=null) this.interceptors.onSendError(record, tp, e); thrownewInterruptException(e); } catch (BufferExhaustedException e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); if (this.interceptors!=null) this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) { this.errors.record(); if (this.interceptors!=null) this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method if (this.interceptors!=null) this.interceptors.onSendError(record, tp, e); throw e; } }
publicMap<Integer,List<RecordBatch>>drain(Cluster cluster,Set<Node> nodes,int maxSize,long now) { if (nodes.isEmpty()) returnCollections.emptyMap(); //转换后的数据,key nodeid value 发送的数据 Map<Integer,List<RecordBatch>> batches =newHashMap<>(); //遍历就绪节点 for (Node node : nodes) { int size =0; //根据node id得到分区相关信息集合 List<PartitionInfo> parts =cluster.partitionsForNode(node.id()); //就绪发送数据 List<RecordBatch> ready =newArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 *///使饥饿的可能性降低,该循环不会从0开始 //drainIndex 是batches的下标,记录上次发送停止时的位置,下次继续从此位置开始发送 int start = drainIndex = drainIndex %parts.size(); do { PartitionInfo part =parts.get(drainIndex); //创建TopicPartition TopicPartition tp =newTopicPartition(part.topic(),part.partition()); // Only proceed if the partition has no in-flight batches. 只处理这个分区没有在发送中批次 if (!muted.contains(tp)) { //获得发往该分区的RecordBatch Deque<RecordBatch> deque =getDeque(new TopicPartition(part.topic(),part.partition())); if (deque !=null) { synchronized (deque) { //得到第一个RecordBatch RecordBatch first =deque.peekFirst(); if (first !=null) { //是否还需要重试 boolean backoff =first.attempts>0&&first.lastAttemptMs+ retryBackoffMs > now; // Only drain the batch if it is not during backoff period. //如果不需要重试发送 if (!backoff) { //size+bytebuffer的大小如果大于最大大小并且就绪不为空 if (size +first.records.sizeInBytes() > maxSize &&!ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due
// to compression; in this case we will still eventually send this batch in a single
// request //数据量已满,结束循环,一般是一个请求的大小 break; } else { RecordBatch batch =deque.pollFirst(); //“关闭Compressor及底层输出流,并将MemoryRecords设置为只读” batch.records.close(); //计算size大小 size +=batch.records.sizeInBytes(); ready.add(batch); batch.drainedMs= now; } } } } } } //更新drainIndex this.drainIndex= (this.drainIndex+1) %parts.size(); //如果start!=drainIndex } while (start != drainIndex); batches.put(node.id(), ready); } return batches; }
/** * The set of requests which have been sent or are being sent but haven't yet received a response * 缓存已发完成发送但是未收到响应的ClientRequest */finalclassInFlightRequests { //最大InFlightRequests连接数量 privatefinalint maxInFlightRequestsPerConnection; //缓存完成发送的ClientRequest NodeId:Deque<ClientRequest> privatefinalMap<String,Deque<ClientRequest>> requests =newHashMap<String,Deque<ClientRequest>>(); publicInFlightRequests(int maxInFlightRequestsPerConnection) { this.maxInFlightRequestsPerConnection= maxInFlightRequestsPerConnection; } /** * 添加至requests中 * Add the given request to the queue for the connection it was directed to */publicvoidadd(ClientRequest request) { Deque<ClientRequest> reqs =this.requests.get(request.request().destination()); if (reqs ==null) { reqs =newArrayDeque<>(); this.requests.put(request.request().destination(), reqs); } //在头部加入 reqs.addFirst(request); } /** * Get the request queue for the given node * 根据给定的node得到请求队列 */privateDeque<ClientRequest> requestQueue(String node) { Deque<ClientRequest> reqs =requests.get(node); if (reqs ==null||reqs.isEmpty()) thrownewIllegalStateException("Response from server for which there are no in-flight requests."); return reqs; } /** * 得到最后的请求 * Get the oldest request (the one that that will be completed next) for the given node */publicClientRequestcompleteNext(String node) { returnrequestQueue(node).pollLast(); } /** * 得到给定节点的最后ClientRequest,不移除 * Get the last request we sent to the given node (but don't remove it from the queue) * @param node The node id */publicClientRequestlastSent(String node) { returnrequestQueue(node).peekFirst(); } /** * Complete the last request that was sent to a particular node. * @param node The node the request was sent to * @return The request */publicClientRequestcompleteLastSent(String node) { returnrequestQueue(node).pollFirst(); } /** * Can we send more requests to this node? * 这个节点是否还能发送 * @param node Node in question * @return true iff we have no requests still being sent to the given node */publicbooleancanSendMore(String node) { Deque<ClientRequest> queue =requests.get(node); return queue ==null||queue.isEmpty() ||(queue.peekFirst().request().completed() &&queue.size() <this.maxInFlightRequestsPerConnection); } /** * Return the number of inflight requests directed at the given node * @param node The node * @return The request count. */publicintinFlightRequestCount(String node) { Deque<ClientRequest> queue =requests.get(node); return queue ==null?0:queue.size(); } /** * Count all in-flight requests for all nodes */publicintinFlightRequestCount() { int total =0; for (Deque<ClientRequest> deque :this.requests.values()) total +=deque.size(); return total; } /** * Clear out all the in-flight requests for the given node and return them * * @param node The node * @return All the in-flight requests for that node that have been removed */publicIterable<ClientRequest> clearAll(String node) { Deque<ClientRequest> reqs =requests.get(node); if (reqs ==null) { returnCollections.emptyList(); } else { returnrequests.remove(node); } } /** * Returns a list of nodes with pending inflight request, that need to be timed out * 返回鸡诶单的集合等待的inflight request, * @param now current time in milliseconds * @param requestTimeout max time to wait for the request to be completed * @return list of nodes */publicList<String> getNodesWithTimedOutRequests(long now,int requestTimeout) { List<String> nodeIds =newLinkedList<String>(); for (String nodeId :requests.keySet()) { //如果该队列还有request if (inFlightRequestCount(nodeId)>0) { ClientRequest request =requests.get(nodeId).peekLast(); long timeSinceSend = now -request.sendTimeMs(); //得到超时请求的节点 if (timeSinceSend > requestTimeout) { nodeIds.add(nodeId); } } } return nodeIds; } }