当前位置:网站首页>Flink Yarn Per Job - 启动TM,向RM注册,RM分配solt
Flink Yarn Per Job - 启动TM,向RM注册,RM分配solt
2022-08-05 11:05:00 【hyunbar】



启动TaskManager
YarnTaskExecutorRunner
public static void main(String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
runTaskManagerSecurely(args);
}
private static void runTaskManagerSecurely(String[] args) {
try {
... ...
TaskManagerRunner.runTaskManagerSecurely(configuration);
}
... ...
}
TaskManagerRunner
public static void runTaskManagerSecurely(Configuration configuration) throws Exception {
replaceGracefulExitWithHaltIfConfigured(configuration);
final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
FileSystem.initialize(configuration, pluginManager);
SecurityUtils.install(new SecurityConfiguration(configuration));
SecurityUtils.getInstalledContext().runSecured(() -> {
runTaskManager(configuration, pluginManager);
return null;
});
}
public static void runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);
taskManagerRunner.start();
}
public void start() throws Exception {
taskExecutorService.start();
}
TaskExecutorToServiceAdapter
implements TaskManagerRunner.TaskExecutorService
public void start() {
// 通过Rpc服务,启动 TaskExecutor,找 它的 onStart()方法
taskExecutor.start();
}
通过Rpc服务,启动 TaskExecutor,找 它的 onStart()方法
RpcEndpoint
public final void start() {
// 终端的启动,实际上是由 自身网关(RpcServer)来启动的
rpcServer.start();
}
终端的启动,实际上是由 自身网关(RpcServer)来启动的
TaskExecutor
public void onStart() throws Exception {
try {
//启动 TaskExecutor服务
startTaskExecutorServices();
} catch (Throwable t) {
final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
startRegistrationTimeout();
}


向ResourceManager注册
private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
// 连接 RM
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
StandaloneLeaderRetrievalService
public void start(LeaderRetrievalListener listener) {
checkNotNull(listener, "Listener must not be null.");
synchronized (startStopLock) {
checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
started = true;
// directly notify the listener, because we already know the leading JobManager's address
listener.notifyLeaderAddress(leaderAddress, leaderId);
}
}
JobMaster
private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
}
以下追踪步骤省略,直接到RegisteredRpcConnection中的start方法
RegisteredRpcConnection
public void start() {
checkState(!closed, "The RPC connection is already closed");
checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
// 创建注册对象
final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
// 开始注册,注册成功之后,调用 onRegistrationSuccess()
newRegistration.startRegistration();
} else {
// concurrent start operation
newRegistration.cancel();
}
}
创建注册对象
开始注册,注册成功之后,调用 onRegistrationSuccess()
ResourceManagerRegistrationListener
in TaskExecutor
@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
final ResourceID resourceManagerId = success.getResourceManagerId();
final InstanceID taskExecutorRegistrationId = success.getRegistrationId();
final ClusterInformation clusterInformation = success.getClusterInformation();
final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();
runAsync(
() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (resourceManagerConnection == connection) {
try {
establishResourceManagerConnection(
resourceManagerGateway,
resourceManagerId,
taskExecutorRegistrationId,
clusterInformation);
} catch (Throwable t) {
log.error("Establishing Resource Manager connection in Task Executor failed", t);
}
}
});
}
private void establishResourceManagerConnection(
ResourceManagerGateway resourceManagerGateway,
ResourceID resourceManagerResourceId,
InstanceID taskExecutorRegistrationId,
ClusterInformation clusterInformation) {
final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
getResourceID(),
taskExecutorRegistrationId,
taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
... ...
stopRegistrationTimeout();
}
sendSlotReport向RM申请slot
ResourceManager
public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
final WorkerRegistration<WorkerType> workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);
if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
if (slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) {
onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException(String.format("Unknown TaskManager registration id %s.", taskManagerRegistrationId)));
}
}
SlotManagerImpl
/**
* Registers a new task manager at the slot manager. This will make the task managers slots
* known and, thus, available for allocation.
*
* @param taskExecutorConnection for the new task manager
* @param initialSlotReport for the new task manager
* @return True if the task manager has not been registered before and is registered successfully; otherwise false
*/
@Override
public boolean registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
checkInit();
LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID().getStringWithMetadata(), taskExecutorConnection.getInstanceID());
// we identify task managers by their instance id
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
return false;
} else {
if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.", maxSlotNum);
resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new FlinkException("The total number of slots exceeds the max limitation."));
return false;
}
// first register the TaskManager
ArrayList<SlotID> reportedSlots = new ArrayList<>();
for (SlotStatus slotStatus : initialSlotReport) {
reportedSlots.add(slotStatus.getSlotID());
}
TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(
taskExecutorConnection,
reportedSlots);
taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
// next register the new slots
// 注册一个新的slot
for (SlotStatus slotStatus : initialSlotReport) {
registerSlot(
slotStatus.getSlotID(),
slotStatus.getAllocationID(),
slotStatus.getJobID(),
slotStatus.getResourceProfile(),
taskExecutorConnection);
}
return true;
}
}


ResourceManager分配slot
SlotManagerImpl
/**
* Registers a slot for the given task manager at the slot manager. The slot is identified by
* the given slot id. The given resource profile defines the available resources for the slot.
* The task manager connection can be used to communicate with the task manager.
*
* @param slotId identifying the slot on the task manager
* @param allocationId which is currently deployed in the slot
* @param resourceProfile of the slot
* @param taskManagerConnection to communicate with the remote task manager
*/
private void registerSlot(
SlotID slotId,
AllocationID allocationId,
JobID jobId,
ResourceProfile resourceProfile,
TaskExecutorConnection taskManagerConnection) {
if (slots.containsKey(slotId)) {
// remove the old slot first
// 移除旧的slot
removeSlot(
slotId,
new SlotManagerException(
String.format(
"Re-registration of slot %s. This indicates that the TaskExecutor has re-connected.",
slotId)));
}
// 创建和注册 新的这些 slot
final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
final PendingTaskManagerSlot pendingTaskManagerSlot;
if (allocationId == null) {
pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
} else {
pendingTaskManagerSlot = null;
}
if (pendingTaskManagerSlot == null) {
updateSlot(slotId, allocationId, jobId);
} else {
pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest();
// 分配slot
if (assignedPendingSlotRequest == null) {
// 表示 挂起的请求都已经满足了,你暂时没事
handleFreeSlot(slot);
} else {
// 表示 你要被分配给某个请求
assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
allocateSlot(slot, assignedPendingSlotRequest);
}
}
}
private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
Preconditions.checkState(taskManagerSlot.getState() == SlotState.FREE);
... ...
// RPC call to the task manager
//分配完之后,通知 TM提供 slot给 JM
CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getResourceProfile(),
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout);
... ...
}
移除旧的slot
创建和注册 新的这些 slot
分配slot
分配完之后,通知 TaskManager提供 slot给 JobMaster


TaskManager 提供slot
TaskExecutor
public CompletableFuture<Acknowledge> requestSlot(
... ...
try {
// 根据 RM的命令,分配taskmanager上的slot
allocateSlot(
slotId,
jobId,
allocationId,
resourceProfile);
} catch (SlotAllocationException sae) {
return FutureUtils.completedExceptionally(sae);
}
... ...
if (job.isConnected()) {
//连接上job, 向JobManager提供 slot
offerSlotsToJobManager(jobId);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
private void offerSlotsToJobManager(final JobID jobId) {
jobTable
.getConnection(jobId)
.ifPresent(this::internalOfferSlotsToJobManager);
}
private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnection) {
... ...
CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
getResourceID(),
reservedSlots,
taskManagerConfiguration.getTimeout());
... ...
}
根据 RM的命令,分配taskmanager上的slot
连接上job, 向JobManager提供 slot
JobMaster
public CompletableFuture<Collection<SlotOffer>> offerSlots(
... ...
return CompletableFuture.completedFuture(
slotPool.offerSlots(
taskManagerLocation,
rpcTaskManagerGateway,
slots));
}
SlotPoolImpl
public Collection<SlotOffer> offerSlots(
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
Collection<SlotOffer> offers) {
ArrayList<SlotOffer> result = new ArrayList<>(offers.size());
for (SlotOffer offer : offers) {
if (offerSlot(
taskManagerLocation,
taskManagerGateway,
offer)) {
result.add(offer);
}
}
return result;
}
/**
* Slot offering by TaskExecutor with AllocationID. The AllocationID is originally generated by this pool and
* transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
* we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
* request waiting for this slot (maybe fulfilled by some other returned slot).
*
* @param taskManagerLocation location from where the offer comes from
* @param taskManagerGateway TaskManager gateway
* @param slotOffer the offered slot
* @return True if we accept the offering
*/
boolean offerSlot(
final TaskManagerLocation taskManagerLocation,
final TaskManagerGateway taskManagerGateway,
final SlotOffer slotOffer) {
componentMainThreadExecutor.assertRunningInMainThread();
// check if this TaskManager is valid
final ResourceID resourceID = taskManagerLocation.getResourceID();
final AllocationID allocationID = slotOffer.getAllocationId();
if (!registeredTaskManagers.contains(resourceID)) {
log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
slotOffer.getAllocationId(), taskManagerLocation);
return false;
}
// check whether we have already using this slot
AllocatedSlot existingSlot;
if ((existingSlot = allocatedSlots.get(allocationID)) != null ||
(existingSlot = availableSlots.get(allocationID)) != null) {
// we need to figure out if this is a repeated offer for the exact same slot,
// or another offer that comes from a different TaskManager after the ResourceManager
// re-tried the request
// we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of
// the actual slots on the TaskManagers
// Note: The slotOffer should have the SlotID
final SlotID existingSlotId = existingSlot.getSlotId();
final SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex());
if (existingSlotId.equals(newSlotId)) {
log.info("Received repeated offer for slot [{}]. Ignoring.", allocationID);
// return true here so that the sender will get a positive acknowledgement to the retry
// and mark the offering as a success
return true;
} else {
// the allocation has been fulfilled by another slot, reject the offer so the task executor
// will offer the slot to the resource manager
return false;
}
}
final AllocatedSlot allocatedSlot = new AllocatedSlot(
allocationID,
taskManagerLocation,
slotOffer.getSlotIndex(),
slotOffer.getResourceProfile(),
taskManagerGateway);
// use the slot to fulfill pending request, in requested order
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
// we accepted the request in any case. slot will be released after it idled for
// too long and timed out
return true;
}

边栏推荐
- 365 days challenge LeetCode1000 questions - Day 050 add a row to the binary tree binary tree
- What are the standards for electrical engineering
- 提问题进不去。想问大家一个关于返回值的问题(图的遍历),求给小白解答啊
- sqlserver编写通用脚本实现获取一年前日期的方法
- SkiaSharp 之 WPF 自绘 投篮小游戏(案例版)
- Naive bayes
- Letter from Silicon Valley: Act fast, Facebook, Quora and other successful "artifacts"!
- Latex如何控制表格的宽度和高度
- DocuWare平台——文档管理的内容服务和工作流自动化的平台详细介绍(下)
- Integration testing of software testing
猜你喜欢
随机推荐
负载均衡应用场景
Detailed explanation of PPOCR detector configuration file parameters
例题 可达性统计+bitset的使用
巴比特 | 元宇宙每日必读:中国1775万件数字藏品分析报告显示,85%的已发行数藏开通了转赠功能...
#yyds干货盘点#【愚公系列】2022年08月 Go教学课程 001-Go语言前提简介
反射修改jsessionid实现Session共享
Common operations of oracle under linux and daily accumulation of knowledge points (functions, timed tasks)
提问题进不去。想问大家一个关于返回值的问题(图的遍历),求给小白解答啊
nyoj757 期末考试 (优先队列)
使用Windbg过程中两个使用细节分享
SQL Outer Join Intersection, Union, Difference Query
Go compilation principle series 6 (type checking)
What do T and Z in the time format 2020-01-13T16:00:00.000Z represent and how to deal with them
trie树模板
微服务结合领域驱动设计落地
flutter 服务器返回数据判断是否为空
SkiaSharp 之 WPF 自绘 投篮小游戏(案例版)
发现C语言的乐趣
【名词】什么是PV和UV?
The fuse: OAuth 2.0 four authorized login methods must read








