当前位置:网站首页>分析 Flink 任务如何超过 YARN 容器内存限制

分析 Flink 任务如何超过 YARN 容器内存限制

2022-08-11 10:41:00 InfoQ

本文作者为中国移动云能力中心大数据团队软件开发工程师谢磊,文章针对Flink任务在YARN集群运行间隔一段时间就会重启的问题,进行逐步分析和模拟复现,并给出内存泄露修复方案,供大家参考。

问题背景

业务的 Flink 任务在 YARN 集群运行间隔一段时间就会自动重启,对于这类问题一般来讲,已经轻车熟路,有部分可以尝试的思路:
1.排查内存是否溢出(堆内/堆外)2.程序中偶发的 bug 导致3.YARN 集群节点上下线导致4.FLINK 程序中使用了大窗口,例如小时级别的窗口,数据超过内存5....
同时在我们的 YARN 集群开启了物理内存检测选项,当进程使用物理内存超过申请内存时,YARN 集群会主动 kill 掉任务的进程,来保证集群的稳定性。
<property>
<name>yarn.nodemanager.pmem-check-enabled</name><value>true</value>
</property>

异常信息

我们先从异常信息看起,通过排查 JobManager 的运行日志,我们会发现如下异常堆栈信息。
2020-04-15 01:59:33,000 INFO org.apache.flink.yarn.YarnResourceManager - Closing TaskExecutor connection container_e05_1585737758019_0901_01_000003 because: Container [pid=3156625,containerID=container_e05_1585737758019_0901_01_000003] is running beyond physical memory limits. Current usage: 6.1 GB of 6 GB physical memory used; 14.5 GB of 28 GB virtual memory used. Killing container.Dump of the process-tree for container_e05_1585737758019_0901_01_000003 :|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE|- 3156625 3156621 3156625 3156625 (bash) 0 0 15441920 698 /bin/bash -c /usr/java/default/bin/java -Xms4148m -Xmx4148m -XX:MaxDirectMemorySize=1996m -javaagent:lib/aspectjweaver-1.9.1.jar -Dlog.file=/data_sdh/nodemanager/log/application_1585737758019_0901/container_e05_1585737758019_0901_01_000003/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> /data_sdh/nodemanager/log/application_1585737758019_0901/container_e05_1585737758019_0901_01_000003/taskmanager.out 2> /data_sdh/nodemanager/log/application_1585737758019_0901/container_e05_1585737758019_0901_01_000003/taskmanager.err|- 3156696 3156625 3156625 3156625 (java) 12263 1319 15553892352 2119601 /usr/java/default/bin/java -Xms4148m -Xmx4148m -XX:MaxDirectMemorySize=1996m -javaagent:lib/aspectjweaver-1.9.1.jar -Dlog.file=/data_sdh/nodemanager/log/application_1585737758019_0901/container_e05_1585737758019_0901_01_000003/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
从异常信息中,关键信息 is running beyond physical memory limits. Current usage: 6.1 GB of 6 GB physical memory used; 14.5 GB of 28 GB virtual memory used. Killing container,显示物理内存超过 6GB 内存,被 YARN 的检测机制给 KILL 掉了。
既然是内存超了,先看看 Flink 任务执行的大概执行情况,简单从 UI 分析,如此简单的程序!
null

几点疑问待解决

1.如此简单的程序,为啥会导致进程超过 6GB(RSS),不可思议
2.YARN 的内存检测机制是是什么?如何获取进程内存信息?
3.JVM 参数 -Xms4148m -Xmx4148m -XX:MaxDirectMemorySize=1996m 堆内存 + 堆外内存 = 6GB,为什么在日志中并没有看到 OutofMemory 相关的异常信息?
4.RSS 怎么会超过 6GB 呢?

分析过程

YARN内存检测机制
通过Hadoop源码分析可以知道,YARN 是解析 /proc/<pid>/stat文件,获取 RSS 的值和 container 初始化时申请的内存作对比,如果 RSS 的值超过申请值,则 KILL 进程,并打印出信息。
JVM 相关常规检测
GC
检测仅通过 GC 日志状态,很正常的状态,没问题。
[[email protected] ~]$ jstat -gcutil 12984 1000S0 S1 E O M CCS YGC YGCT FGC FGCT GCT
99.96 0.00 79.06 4.78 94.92 89.38 2 0.164 0 0.000 0.16499.96 0.00 86.77 4.78 94.92 89.38 2 0.164 0 0.000 0.16499.96 0.00 94.48 4.78 94.92 89.38 2 0.164 0 0.000 0.1640.00 99.98 1.95 10.24 94.93 89.38 3 0.255 0 0.000 0.2550.00 99.98 9.77 10.24 94.93 89.38 3 0.255 0 0.000 0.2550.00 99.98 17.58 10.24 94.93 89.38 3 0.255 0 0.000 0.2550.00 99.98 25.40 10.24 94.93 89.38 3 0.255 0 0.000 0.2550.00 99.98 35.16 10.24 94.93 89.38 3 0.255 0 0.000 0.2550.00 99.98 41.02 10.24 94.93 89.38 3 0.255 0 0.000 0.255
Dump 内存
象征性 dump 内存看看,不能使用 -dump:live 会做一次 FullGC,导致结果不一定反应正确内存。
jmap -dump:format=b,file=heap1.bin 12984
先不通过 Eclipse-MAT 打开,直接看下文件大小。
[[email protected] ~]$ ll -lh heap1.bin
-rw------- 1 dcadmin datacentergroup 1016M Apr 15 09:15 heap1.bin
同时看下 linux,此程序真实用的物理内存是多少 top -p 12984
Tasks: 1 total, 0 running, 1 sleeping, 0 stopped, 0 zombie%Cpu(s): 0.6 us, 0.1 sy, 0.0 ni, 99.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 stKiB Mem : 16394040 total, 5676652 free, 8114832 used, 2602556 buff/cacheKiB Swap: 0 total, 0 free, 0 used. 7523032 avail Mem

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
12984 dcadmin 20 0 16.258g 7.053g 15104 S 13.3 45.1 1:27.92 java
划重点!
•JVM 的堆内存 1GB 以内,RSS 用掉了 7.053GB,粗浅的理解,那就是堆外内存用掉大概 6GB 左右,看上去没毛病,但别忘了 JVM 启动时,包含参数-XX:MaxDirectMemorySize=1996m 呃,没报 OutofMemoryError。
•RSS 内存仍然在缓慢增长中,没有下降趋势!

重点分析 RSS 内存的问题

•分析 JVM 堆外内存详细分配情况
•分析 RSS 中 7GB 内存中具体是哪些东西
分析堆外内存分配在
JVM 启动参数添加 -XX:NativeMemoryTracking=summary 分析堆外内存泄漏问题。其实从下面的根本分析不出什么东西,因为没有报 OutofMemoryError,所以需要从 RSS 内存入手。
[[email protected] ~]$ jcmd 21567 VM.native_memory summary
21567:
Native Memory Tracking:
Total: reserved=5815901KB, committed=4521641KB
- Java Heap (reserved=4247552KB, committed=4247552KB)
(mmap: reserved=4247552KB, committed=4247552KB) 


- Class (reserved=1076408KB, committed=26936KB)
 (classes #1206)
 (malloc=19640KB #792) 
 (mmap: reserved=1056768KB, committed=7296KB) 

- Thread (reserved=42193KB, committed=42193KB)
 (thread #42)
 (stack: reserved=42016KB, committed=42016KB)
 (malloc=129KB #225) 
 (arena=48KB #82)

- Code (reserved=250040KB, committed=5252KB)
 (malloc=440KB #1026) 
 (mmap: reserved=249600KB, committed=4812KB)

- GC (reserved=177105KB, committed=177105KB)
 (malloc=21913KB #164) 
 (mmap: reserved=155192KB, committed=155192KB) 

- Compiler (reserved=150KB, committed=150KB)
 (malloc=19KB #61) 
 (arena=131KB #3)

- Internal (reserved=19864KB, committed=19864KB)
 (malloc=19832KB #2577) 
 (mmap: reserved=32KB, committed=32KB)

- Symbol (reserved=2285KB, committed=2285KB)
 (malloc=1254KB #255) 
 (arena=1031KB #1)

- Native Memory Tracking (reserved=88KB, committed=88KB)
 (malloc=6KB #64) 
 (tracking overhead=83KB)

- Arena Chunk (reserved=215KB, committed=215KB)
 (malloc=215KB)
分析 RSS 中 7GB 内存中具体是哪些东西
分析 linux 内存的瑞士军刀,gdb 工具。
yum install -y gdb
通过 pmap 命令查看并排序,下面展示了内存地址空间,RSS 占用等信息。
[[email protected] ~]$ pmap -x 21567 | sort -n -k3 | more
---------------- ------- ------- ------- 
0000000000400000 0 0 0 r-x-- java
0000000000600000 0 0 0 rw--- java
0000000000643000 0 0 0 rw--- [ anon ]
00000006bcc00000 0 0 0 rw--- [ anon ]
00000007c00e0000 0 0 0 ----- [ anon ]
...
...
00007fb2ec000000 65508 36336 36336 rw--- [ anon ]
00007fb3c4000000 65536 41140 41140 rw--- [ anon ]
00007fb2d8000000 65508 46692 46692 rw--- [ anon ]
00007fb2e4000000 65508 47640 47640 rw--- [ anon ]
00007fb2e0000000 65508 48596 48596 rw--- [ anon ]
00007fb2dc000000 65512 49088 49088 rw--- [ anon ]
00007fb2cc000000 65508 50380 50380 rw--- [ anon ]
00007fb2d4000000 65508 53476 53476 rw--- [ anon ]
00007fb238000000 131056 59668 59668 rw--- [ anon ]
00000006bcc00000 4248448 1866536 1866536 rw--- [ anon ]
通过上面的信息,只有内存起始地址,并没有终止地址,需要到 maps 文件中。
[[email protected] ~]$ cat /proc/21567/maps | grep 7fb2dc
7fb2dbff9000-7fb2dc000000 ---p 00000000 00:00 0
7fb2dc000000-7fb2dfffa000 rw-p 00000000 00:00 0
通过 gdb 命令 attach 进程,并 dump 内存。
gdb attach 21567
dump memory mem.bin 0x7fb2dc000000 0x7fb2dfffa000
通过 strings 命令查看 mem.bin。
strings mem.bin | more
满屏幕都在刷类似配置文件的内容。
...
xcx.userprofile.kafkasource.bootstrap.servers=xxx-01:9096,xxx-02:9096,xxx-03:9096
xcx.userprofile.kafkasource.topic=CENTER_search_trajectory_xcx
xcx.userprofile.kafkasource.group=search_flink_xcx_userprofile_online
app.userprofile.kafkasource.bootstrap.servers=xxx-01:9096,xxx-02:9096,xxx-03:9096
app.userprofile.kafkasource.topic=CENTER_search_trajectory_app
app.userprofile.kafkasource.group=search_flink_app_userprofile_online
xcx.history.kafkasource.bootstrap.servers=xxx-01:9096,xxx-02:9096,xxx-03:9096
xcx.history.kafkasource.topic=CENTER_search_trajectory_xcx
...

分析/简化业务代码
从上述分析的结果看,应该是代码中有地方再不停地加载配置文件到内存中,简化下业务代码。迎来新的疑问,虽然代码中不应该每次都去加载配置文件,但不至于把物理内存消耗到 6GB 吧,所以这里并不存在逃逸分析,理论会主动释放,为什么没有呢?
public class StreamingJob {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 DataStream<String> text = env.socketTextStream(&quot;10.101.52.18&quot;, 9909);

 text.map(new MapFunction<String, String>() {
 @Override
 public String map(String value) throws Exception {
 TestFun testFun = new TestFun();
 testFun.update();
 return null;
 }
 });

 env.execute();
 }

 static class TestFun {
 Properties properties;
 public TestFun() throws IOException {
 properties = new Properties();
 properties.load(TestFun.class.getClassLoader().getResourceAsStream(&quot;application.properties&quot;));
 }

 public void update() {}
 }
}

模拟复现

不管怎样,虽然怀疑,试试吧,看上去没有问题。
Java 程序模拟
public class TestJar {

 public static void main(String[] args) throws Exception {

 while (true) {

 Properties properties = new Properties();

 properties.load(TestJar.class.getClassLoader().getResourceAsStream(&quot;application.properties&quot;));

 }

 }

}
启动测试程序
java -Xmx512m -Xms512m -XX:MaxDirectMemorySize=1996m -cp test.jar com.ly.search.job.TestJob
观察程序的 RSS 内存消耗可以看到 RSS 内存逐步增加,一点不下降,所以这就是问题所在了,泄漏了。

内存泄漏修复及扩展

修复方案
•及时关闭 stream
public class TestJar {
 public static void main(String[] args) throws Exception {
 while (true) {
 Properties properties = new Properties();
 InputStream inStream = TestJar.class.getClassLoader().getResourceAsStream(&quot;application.properties&quot;)
 properties.load(inStream);
 inStream.close();
 }
 }
}
•通过 System.gc() 也有效果,及时清理 Finalizer(这种是不可取,只作为讨论方案), 参考《JVM 源码分析之 FinalReference 完全解读》

public class TestJar {
 public static void main(String[] args) throws Exception {
 while (true) {
 Properties properties = new Properties();
 InputStream inStream = TestJar.class.getClassLoader().getResourceAsStream(&quot;application.properties&quot;)
 properties.load(inStream);
 System.gc();
 }
 }
}

null
扩展研究
•XXX.class.getClassLoader().getResourceAsStream 底层是 URLClassLoader + JarURLConnection,即

// 等价于,内存溢出
ClassLoader classLoader = TestJar.class.getClassLoader();
URL resource = classLoader.getResource(&quot;application.properties&quot;);
URLConnection urlConnection = resource.openConnection();
urlConnection.getInputStream();

// 等价于,内存溢出
URL url = new URL(&quot;jar:file:/home/dcadmin/test.jar!/com/ly/search/job/StreamingJob.class&quot;);
JarURLConnection conn = (JarURLConnection) url.openConnection();
conn.getInputStream();

// 不等价于,内存不溢出
URL url = new URL(&quot;jar:file:/home/dcadmin/test.jar!/com/ly/search/job/StreamingJob.class&quot;);
JarURLConnection conn = (JarURLConnection) url.openConnection();
conn.setDefaultUseCaches(false);
conn.getInputStream();

// 不等价于,内存不溢出
URL fileURL = new File(&quot;test.jar&quot;).toURI().toURL();

FileURLConnection fileUrlConn = (FileURLConnection) fileURL.openConnection();
fileUrlConn.connect();
fileUrlConn.getInputStream();
•JarURLConnection 和 FileURLConnection 的区别在于 JarURLConnection 底层需要调用 JarFile 打开 ZipFile 的 inputstream,涉及底层系统的调用,所以消耗了物理内存,导致 RSS 增大
•堆外内存一般排查 DirectByteBuffer但此情况中,堆外确实并没有溢出,只是操作系统崩了
•堆 / 堆外均不超内存,所以触发不了 GC,泄漏慢慢就扩展开了,直至超出机器内存
原网站

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://xie.infoq.cn/article/8e2706226acfeed21612f1ab2