这篇文章主要讲解了“kafka数据源Flink Kafka Consumer分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“kafka数据源Flink Kafka Consumer分析”吧!
成都创新互联公司秉承专业、诚信、服务、进取的价值观,坚持以客户为中心、客户至上的服务理念,以“关注企业需求,实现企业价值”为导向,努力为企业提供全面优质的互联网应用服务。服务包括域名注册、网络空间、企业邮箱、网站建设、网站优化、网络营销解决方案和咨询服务,以帮助企业客户应用互联网。
一、open()方法调用时机
FlinkKafkaConsumer继承自RichFunction,具有生命周期方法open()。那么flink是何时调用FlinkKafkaConsumer的open()方法呢?
StreamTask在调用算子程序之前,会执行beforeInvoke()方法,在该方法中会初始化算子的算子并且执行open()方法:
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
initializeStateAndOpenOperators()方法中循环对算子初始化:
protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
for (StreamOperatorWrapper, ?> operatorWrapper : getAllOperators(true)) {
StreamOperator> operator = operatorWrapper.getStreamOperator();
operator.initializeState(streamTaskStateInitializer);
operator.open();
}
}kafka source对应的operator为StreamSource,其open()方法为
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}FunctionUtils的openFunction()即执行算子(要继承RichFunction)的open()方法:
public static void openFunction(Function function, Configuration parameters) throws Exception{
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.open(parameters);
}
}二、运行时上下文RuntimeContext何时赋值?
在 StreamTask.beforeInvoke() -> new OperatorChain() -> StreamOperatorFactoryUtil.createOperator(),在OperatorChain的构造函数中,通过工厂类StreamOperatorFactory来创建StreamOperator。kafka source对应的StreamOperatorFactory为SimpleOperatorFactory,createStreamOperator()方法中调用StreamOperator的setup()方法:
public> T createStreamOperator(StreamOperatorParameters parameters) { if (operator instanceof AbstractStreamOperator) { ((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService); } if (operator instanceof SetupableStreamOperator) { ((SetupableStreamOperator) operator).setup( parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); } return (T) operator; }
kafka source对应的StreamOperator为StreamSource,其实现了SetupableStreamOperator接口。其setup方法在父类AbstractUdfStreamOperator:
public void setup(StreamTask, ?> containingTask, StreamConfig config, Output> output) { super.setup(containingTask, config, output); FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext()); }
FunctionUtils.setFunctionRuntimeContext()来给算子设置RuntimeContext。设置的RuntimeContext在AbstractStreamOperator的setup()方法中,为StreamingRuntimeContext:
this.runtimeContext = new StreamingRuntimeContext( environment, environment.getAccumulatorRegistry().getUserMap(), getMetricGroup(), getOperatorID(), getProcessingTimeService(), null, environment.getExternalResourceInfoProvider());
三、FlinkKafkaConsumer的run()方法
Flink调用FlinkKafkaConsumer的run()方法来生产数据。run()方法的处理逻辑:
①创建KafkaFetcher,来拉取数据
this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics);
②KafkaFetcher的runFetchLoop()中创建KafkaConsumerThread线程来循环拉取kafka数据。KafkaConsumerThread通过KafkaConsumer拉取kafka数据,并交给Handover
if (records == null) {
try {
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}
try {
handover.produce(records);
records = null;
}KafkaFetcher通过Handover获取拉取的kafka数据
while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer thread
final ConsumerRecords records = handover.pollNext();
// get the records for each topic partition
for (KafkaTopicPartitionState partition : subscribedPartitionStates()) {
List> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
partitionConsumerRecordsHandler(partitionRecords, partition);
}
} ③通过SourceContext中的Output
public void collect(T element) {
synchronized (lock) {
output.collect(reuse.replace(element));
}
}SourceContext在StreamSource的run()方法中通过StreamSourceContexts.getSourceContext()创建。Output
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings("unchecked")
RecordWriterOutput output = (RecordWriterOutput) streamOutputs.get(outputEdge);
allOutputs.add(new Tuple2<>(output, outputEdge));
} 当有一个输出时,是RecordWriterOutput;多个时,是CopyingDirectedOutput或DirectedOutput
④单个输出RecordWriterOutput时,是通过成员属性RecordWriter实例来输出。RecordWriter通过StreamTask的createRecordWriterDelegate()创建,RecordWriterDelegate为RecordWriter的代理类,内部持有RecordWriter实例:
public staticRecordWriterDelegate >> createRecordWriterDelegate( StreamConfig configuration, Environment environment) { List >>> recordWrites = createRecordWriters( configuration, environment); if (recordWrites.size() == 1) { return new SingleRecordWriter<>(recordWrites.get(0)); } else if (recordWrites.size() == 0) { return new NonRecordWriter<>(); } else { return new MultipleRecordWriters<>(recordWrites); } } private static List >>> createRecordWriters( StreamConfig configuration, Environment environment) { List >>> recordWriters = new ArrayList<>(); List outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader()); for (int i = 0; i < outEdgesInOrder.size(); i++) { StreamEdge edge = outEdgesInOrder.get(i); recordWriters.add( createRecordWriter( edge, i, environment, environment.getTaskInfo().getTaskName(), edge.getBufferTimeout())); } return recordWriters; }
outEdgesInOrder来源于StreamGraph中的StreamNode的List
创建RecordWriter时,根据StreamEdge的StreamPartitioner> outputPartitioner的isBroadcast()方法判断是BroadcastRecordWriter还是ChannelSelectorRecordWriter:
public RecordWriterbuild(ResultPartitionWriter writer) { if (selector.isBroadcast()) { return new BroadcastRecordWriter<>(writer, timeout, taskName); } else { return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName); } }
outputPartitioner是根据上下游节点并行度是否一致来确定:
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitionerBroadcastRecordWriter和ChannelSelectorRecordWriter最终都会调用成员属性ResultPartitionWriter targetPartition的flush()方法来输出数据。ResultPartitionWriter 在ConsumableNotifyingResultPartitionWriterDecorator的decorate()生成。根据对应的ResultPartitionDeploymentDescriptor来判断是ConsumableNotifyingResultPartitionWriterDecorator还是直接传入的partitionWriters。ConsumableNotifyingResultPartitionWriterDecorator会把消息直接传给下个节点消费,通过ResultPartitionConsumableNotifier来通知:
public static ResultPartitionWriter[] decorate( Collectiondescs, ResultPartitionWriter[] partitionWriters, TaskActions taskActions, JobID jobId, ResultPartitionConsumableNotifier notifier) { ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length]; int counter = 0; for (ResultPartitionDeploymentDescriptor desc : descs) { if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) { consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriterDecorator( taskActions, jobId, partitionWriters[counter], notifier); } else { consumableNotifyingPartitionWriters[counter] = partitionWriters[counter]; } counter++; } return consumableNotifyingPartitionWriters; }
partitionWriters通过 NettyShuffleEnvironment的createResultPartitionWriters() -> ResultPartitionFactory的create() 创建。 ResultPartition的输出是通过成员属性ResultSubpartition[] subpartitions完成。subpartitions在ResultPartitionFactory的createSubpartitions()生成:
private void createSubpartitions(
ResultPartition partition,
ResultPartitionType type,
BoundedBlockingSubpartitionType blockingSubpartitionType,
ResultSubpartition[] subpartitions) {
// Create the subpartitions.
if (type.isBlocking()) {
initializeBoundedBlockingPartitions(
subpartitions,
partition,
blockingSubpartitionType,
networkBufferSize,
channelManager);
} else {
for (int i = 0; i < subpartitions.length; i++) {
subpartitions[i] = new PipelinedSubpartition(i, partition);
}
}
}流式任务时,ResultSubpartition为PipelinedSubpartition。
四、数据写出
4.1 ResultPartitionConsumableNotifier通知
ResultPartitionConsumableNotifier在TaskExecutor的associateWithJobManager()中生成:
private JobTable.Connection associateWithJobManager(
JobTable.Job job,
ResourceID resourceID,
JobMasterGateway jobMasterGateway) {
......
......
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
jobMasterGateway,
getRpcService().getExecutor(),
taskManagerConfiguration.getTimeout());
......
......
}RpcResultPartitionConsumableNotifier远程调用JobMaster的scheduleOrUpdateConsumers()方法,传入ResultPartitionID partitionId
4.1.1 JobMaster的scheduleOrUpdateConsumers()
JobMaster通过ExecutionGraph的scheduleOrUpdateConsumers()通知下游消费算子。
这里有两个关键代码:
①从本算子ExecutionVertex的成员Map
void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
.......
final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId());
.......
if (partition.getIntermediateResult().getResultType().isPipelined()) {
// Schedule or update receivers of this partition
execution.scheduleOrUpdateConsumers(partition.getConsumers());
}
else {
throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" +
"pipelined partitions.");
}
}从IntermediateResultPartition取出消费者List> allConsumers;
从ExecutionEdge的ExecutionVertex target的Execution currentExecution中取出执行任务;
②Execution的sendUpdatePartitionInfoRpcCall()方法通过rpc调用TaskExcutor的updatePartitions()方法来执行下游消费者算子
private void sendUpdatePartitionInfoRpcCall( final IterablepartitionInfos) { final LogicalSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation(); CompletableFuture updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout); updatePartitionsResultFuture.whenCompleteAsync( (ack, failure) -> { // fail if there was a failure if (failure != null) { fail(new IllegalStateException("Update to task [" + getVertexWithAttempt() + "] on TaskManager " + taskManagerLocation + " failed", failure)); } }, getVertex().getExecutionGraph().getJobMasterMainThreadExecutor()); } }
4.1.2 TaskExecutor的updatePartitions()
TaskExecutor的updatePartitions()来更新分区信息。如果之前InputChannel是未知的,则进行更新。SimpleInputGate的updateInputChannel():
public void updateInputChannel(
ResourceID localLocation,
NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException {
synchronized (requestLock) {
if (closeFuture.isDone()) {
// There was a race with a task failure/cancel
return;
}
IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId();
InputChannel current = inputChannels.get(partitionId);
if (current instanceof UnknownInputChannel) {
UnknownInputChannel unknownChannel = (UnknownInputChannel) current;
boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
InputChannel newChannel;
if (isLocal) {
newChannel = unknownChannel.toLocalInputChannel();
} else {
RemoteInputChannel remoteInputChannel =
unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId());
remoteInputChannel.assignExclusiveSegments();
newChannel = remoteInputChannel;
}
LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);
inputChannels.put(partitionId, newChannel);
channels[current.getChannelIndex()] = newChannel;
if (requestedPartitionsFlag) {
newChannel.requestSubpartition(consumedSubpartitionIndex);
}
for (TaskEvent event : pendingEvents) {
newChannel.sendTaskEvent(event);
}
if (--numberOfUninitializedChannels == 0) {
pendingEvents.clear();
}
}
}
}4.2 PipelinedSubpartition写出

记录先写到缓存ArrayDeque
4.2.1 BufferAvailabilityListener创建时机?
①TaskManagerServices在创建ShuffleEnvironment时,通过 NettyShuffleServiceFactory的createNettyShuffleEnvironment() -> new NettyConnectionManager() -> new NettyServer() -> ServerChannelInitializer的initChannel() -> NettyProtocol的getServerChannelHandlers() 获取Netty服务端的处理器PartitionRequestServerHandler:
public ChannelHandler[] getServerChannelHandlers() {
PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
partitionProvider,
taskEventPublisher,
queueOfPartitionQueues);
return new ChannelHandler[] {
messageEncoder,
new NettyMessage.NettyMessageDecoder(),
serverHandler,
queueOfPartitionQueues
};
}②PartitionRequestServerHandler在获取到客户端发送的PartitionRequest 消息时, 创建CreditBasedSequenceNumberingViewReader,并通过 requestSubpartitionView() -> ResultPartitionManager的createSubpartitionView() -> ResultPartition的createSubpartitionView() 来设置CreditBasedSequenceNumberingViewReader
③CreditBasedSequenceNumberingViewReader的notifyDataAvailable()方法调用PartitionRequestQueue的notifyReaderNonEmpty(),通知下游算子:
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
// The notification might come from the same thread. For the initial writes this
// might happen before the reader has set its reference to the view, because
// creating the queue and the initial notification happen in the same method call.
// This can be resolved by separating the creation of the view and allowing
// notifications.
// TODO This could potentially have a bad performance impact as in the
// worst case (network consumes faster than the producer) each buffer
// will trigger a separate event loop task being scheduled.
ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
}感谢各位的阅读,以上就是“kafka数据源Flink Kafka Consumer分析”的内容了,经过本文的学习后,相信大家对kafka数据源Flink Kafka Consumer分析这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!
分享文章:kafka数据源FlinkKafkaConsumer分析
本文地址:http://www.lzwzjz.cn/article/iiddji.html


咨询
建站咨询
