如何使用ftrace实时获取系统中的spinlock快照

tech2022-09-24  73

接上文: https://blog.csdn.net/dog250/article/details/108349046 在这篇文章中,我给出了一个拯救panic的方法,其目的更多的是恶作剧性质。但仍然有不足,请看下面代码段中的注释:

void stub_panic(const char *fmt, ...) { ... local_irq_enable(); // 这个时候如果current持有自旋锁,那可怎么办??? printk("rq:%d %d\n", preempt_count(), irqs_disabled()); __set_current_state(TASK_UNINTERRUPTIBLE); schedule();

如果持有自旋锁的task被schedule出去了,由于该task不会再回来了,那么只要另一个task抢锁,系统立马就会死锁。

需求自然就出来了,我能不能在调用schedule之前遍历系统当前所有自旋锁的持锁情况,将自己持有的自旋锁给unlock了呢?

有需求就有方案。当然可以。

大秀hook手艺的时候来了。虽然作为手艺人用手工的方式拼接指令可以实现任何功能,但现在的目标已经不仅仅是秀手艺了,而是实现上述的需求,所以我尽量先使用stap/kprobe,ftrace这些场面还不是太宏大的工具。

需求图示如下:

很显然,直接的思路就是使用kretprobe了,于是我写出了如下的代码:

#include <linux/module.h> #include <linux/kallsyms.h> #include <linux/kprobes.h> struct lock_owner { struct raw_spinlock *lock; struct task_struct *owner; struct task_struct *lock_owner; int idx; }; struct lock_owner plane[256]; unsigned long bitmap[4]; int lock_handler(struct kretprobe_instance *ri, struct pt_regs *regs) { struct lock_owner *owner, **pdata; struct raw_spinlock *lock = (struct raw_spinlock *)regs->di; int i = -1, retry = 10; pdata = (struct lock_owner **)&ri->data; *pdata = NULL; again: i = find_first_zero_bit(&bitmap[0], 256); if (i > 256 || test_and_set_bit(i, bitmap)) { if (retry --) goto again; goto end; } owner->idx = i; owner = &plane[i]; owner->lock = lock; owner->owner = current; owner->lock_owner = NULL; *pdata = owner; end: return 0; } int lock_ret_handler(struct kretprobe_instance *ri, struct pt_regs *regs) { struct lock_owner *owner = (struct lock_owner *)&ri->data[0]; if (owner == NULL) return 0; owner->lock_owner = current; return 0; } static struct kretprobe lock_kretprobe = { .handler = lock_ret_handler, .entry_handler = lock_handler, .data_size = sizeof(struct lock_owner *), .maxactive = 20, }; int unlock_handler(struct kretprobe_instance *ri, struct pt_regs *regs) { struct lock_owner *owner, **pdata; struct raw_spinlock *lock = (struct raw_spinlock *)regs->di; int i = -1; pdata = (struct lock_owner **)&ri->data; *pdata = NULL; for (i = 0; i < 256; i++) { owner = &plane[i]; if (test_bit(owner->idx, &bitmap[0]) && owner->owner == current && owner->lock_owner == current && owner->lock == lock && owner->idx == i) { *pdata = owner; break; } } return 0; } int unlock_ret_handler(struct kretprobe_instance *ri, struct pt_regs *regs) { struct lock_owner *owner = (struct lock_owner *)&ri->data[0]; if (owner == NULL) return 0; owner->lock = NULL; owner->owner = NULL; owner->lock_owner = NULL; owner->idx = -1; clear_bit(owner->idx, &bitmap[0]); return 0; } static struct kretprobe unlock_kretprobe = { .handler = unlock_ret_handler, .entry_handler = unlock_handler, .data_size = sizeof(struct lock_owner *), .maxactive = 20, }; static int __init lock_detect_init(void) { memset((char *)&plane[0], 0, sizeof(plane)); memset((char *)&bitmap[0], 0, sizeof(plane)); lock_kretprobe.kp.symbol_name = "_raw_spin_lock"; register_kretprobe(&lock_kretprobe); unlock_kretprobe.kp.symbol_name = "_raw_spin_unlock"; register_kretprobe(&unlock_kretprobe); return 0; } static void __exit lock_detect_exit(void) { unregister_kretprobe(&lock_kretprobe); unregister_kretprobe(&unlock_kretprobe); } module_init(lock_detect_init); module_exit(lock_detect_exit);

请忽略具体的spinlock统计逻辑,现在仅仅关注框架,我敢说,这个玩法对于一般的wrap函数,简直就是模版:

这个框架可以实现几乎一切wrap函数。

所谓的wrap函数其实很简单:

my_function(void *param) { pre_process(param); orig_function(param); post_process(param); }

kretprobe的跳转逻辑如下:

然而,它偏偏不能用于_raw_spin_lock/_raw_spin_unlock的wrap!因为会死锁!

kprobe/kretprobe框架内部使用spinlock来进行同步,在ret_handler执行的时候,它已经持有了该spinlock,而属于kretprobe的ret_handler本身同样也需要该spinlock,因此会死锁。

啦啦啦,这就是为什么我喜欢纯手工活儿的原因了,因为它可控啊!

来来来,试试ftrace,相比于kretprobe而言,它更简单,我觉得它应该没问题,如果再不行,就只能上纯手工艺了。

先来试试框架,下面是一个什么都不做的框架代码:

#include <linux/module.h> #include <linux/kallsyms.h> #include <linux/ftrace.h> struct wrap_hook { char *name; struct ftrace_ops ops; unsigned long wrap_func; unsigned long entry; }; void (*real_raw_spin_lock)(spinlock_t *lock); // _raw_spin_lock的wrap函数! void my_raw_spin_lock(spinlock_t *lock) { real_raw_spin_lock(lock); } void (*real_raw_spin_unlock)(spinlock_t *lock); // _raw_spin_unlock的wrap函数! void my_raw_spin_unlock(spinlock_t *lock) { real_raw_spin_unlock(lock); } struct wrap_hook spinlock_hooks[2] = { { .name = "_raw_spin_lock", .wrap_func = (unsigned long)my_raw_spin_lock, }, { .name = "_raw_spin_unlock", .wrap_func = (unsigned long)my_raw_spin_unlock, } }; // 该函数是一个汇聚器,起到将逻辑return到new function的目的。 // 采用汇聚器可以避免为两个函数编写两个独立的hook函数,这是设计模式应用的例子。 void stub_func(unsigned long ip, unsigned long parent_ip, struct ftrace_ops *ops, struct pt_regs *regs) { struct wrap_hook *hook = container_of(ops, struct wrap_hook, ops); // 替换返回地址 regs->ip = hook->wrap_func; } static int wrap_spinlock_init(void) { int i; for (i = 0; i < 2; i++) { spinlock_hooks[i].ops.func = stub_func; spinlock_hooks[i].ops.flags = FTRACE_OPS_FL_SAVE_REGS|FTRACE_OPS_FL_RECURSION_SAFE; spinlock_hooks[i].entry = kallsyms_lookup_name(spinlock_hooks[i].name); if (i == 0) real_raw_spin_lock = (void *)(spinlock_hooks[i].entry + MCOUNT_INSN_SIZE); else real_raw_spin_unlock = (void *)(spinlock_hooks[i].entry + MCOUNT_INSN_SIZE); ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 0, 0); register_ftrace_function(&spinlock_hooks[i].ops); } return 0; } static void wrap_spinlock_exit(void) { int i; for (i = 0; i < 2; i++) { unregister_ftrace_function(&spinlock_hooks[i].ops); ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 1, 0); } } module_init(wrap_spinlock_init); module_exit(wrap_spinlock_exit); MODULE_LICENSE("GPL");

当我加载这个模块的时候,系统像什么都没有发生一样平静,而我用crash看_raw_spin_lock/_raw_spin_unlock的时候,显然它们已经被hook了,这意味着,wrap起作用了。

下图展示了ftrace实现wrap的原理: 这个已经和手工做法几乎无异了。可以拿这个ftrace和上面的kretprobe对比一下,感受一下雷同和差异:

kretprobe一般也是用来probe函数而不是指令,这一点和ftrace一致。ftrace机制上更加直接,而kretprobe则更加trick一点。如果要完成一个业务需求,我选择用ftrace,如果要表演,我会照抄一套kretprobe的机制,如果纯自己玩,我选择纯手工艺。

好了,现在把spinlock统计逻辑放进去,代码简单,自己感受:

#include <linux/module.h> #include <linux/kallsyms.h> #include <linux/ftrace.h> struct wrap_hook { char *name; struct ftrace_ops ops; unsigned long wrap_func; unsigned long entry; }; struct lock_owner { int idx; spinlock_t *lock; struct task_struct *owner; struct task_struct *lock_owner; }; struct lock_owner plane[32768]; unsigned long bitmap[512]; void (*real_raw_spin_lock)(spinlock_t *lock); void (*real_raw_spin_unlock)(spinlock_t *lock); #pragma GCC optimize ("O0") void my_raw_spin_lock(spinlock_t *lock) { struct lock_owner *owner = NULL; int i = -1, retry = 0; // 通篇的header和tailer不要有spinlock,因此即便是printk也不能使用,否则会造成嵌套死锁(毕竟把spin_lock/unlock给hook了)。 // 因此,下面实现了一个简单的lock机制,使用原子的test_and_set操作。 again: i = find_first_zero_bit(bitmap, 32768); if (test_and_set_bit(i, bitmap)) { if (retry ++ > 10) goto real; goto again; } owner = &plane[i]; owner->idx = i; owner->lock = lock; owner->owner = current; owner->lock_owner = NULL; set_bit(i, bitmap); real: barrier(); real_raw_spin_lock(lock); barrier(); if (owner) owner->lock_owner = current; } #if 0 void test() { struct lock_owner *owner = NULL; int i = -1; int cnt = 4; again: spin_lock(&maplock); i = find_first_zero_bit(bitmap, 32768); set_bit(i, bitmap); spin_unlock(&maplock); owner = &plane[i]; printk("i is:%d at :%p\n", i, owner); if (cnt --) goto again; } #endif #pragma GCC optimize ("O0") void my_raw_spin_unlock(spinlock_t *lock) { struct lock_owner *owner = NULL; int i = -1; real_raw_spin_unlock(lock); barrier(); for (i = 0; i < 32768; i++) { // 注意,这里没有原子性保证,因此我的代码无法cover 100%要求原子的场景,可能会统计不准。 if (test_bit(i, bitmap) && plane[i].owner == current && plane[i].lock_owner == current && plane[i].lock == lock) { owner = &plane[i]; owner->lock = NULL; owner->owner = NULL; owner->lock_owner = NULL; owner->idx = -1; barrier(); clear_bit(i, bitmap); break; } } } #pragma GCC optimize ("O0") struct wrap_hook spinlock_hooks[2] = { { .name = "_raw_spin_lock", .wrap_func = (unsigned long)my_raw_spin_lock, }, { .name = "_raw_spin_unlock", .wrap_func = (unsigned long)my_raw_spin_unlock, } }; void stub_func(unsigned long ip, unsigned long parent_ip, struct ftrace_ops *ops, struct pt_regs *regs) { struct wrap_hook *hook = container_of(ops, struct wrap_hook, ops); regs->ip = hook->wrap_func; } #pragma GCC optimize ("O0") static int wrap_spinlock_init(void) { int i; memset((char *)plane, 0, sizeof(plane)); memset((char *)bitmap, 0, sizeof(bitmap)); // 打印出位图的地址,以便panic_resched模块使用! printk("plane:%p bitmap:%p\n", &plane[0], &bitmap[0]); #if 0 test(); return 0; #endif for (i = 0; i < 2; i++) { spinlock_hooks[i].ops.func = stub_func; spinlock_hooks[i].ops.flags = FTRACE_OPS_FL_SAVE_REGS|FTRACE_OPS_FL_RECURSION_SAFE; spinlock_hooks[i].entry = kallsyms_lookup_name(spinlock_hooks[i].name); if (i == 0) real_raw_spin_lock = (void *)(spinlock_hooks[i].entry + 5); else real_raw_spin_unlock = (void *)(spinlock_hooks[i].entry + 5); ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 0, 0); register_ftrace_function(&spinlock_hooks[i].ops); } return 0; } static void wrap_spinlock_exit(void) { int i; struct lock_owner *owner; #if 0 for (i = 0; i < 32768; i++) { owner = &plane[i]; if (test_bit(i, bitmap)) { if (owner->lock == NULL || owner->idx != i) continue; printk("[%d] lock:%p owner:%s[%d] lock_owner:%s[%d]\n", owner->idx, owner->lock, owner->owner?owner->owner->comm:"noone", owner->owner?owner->owner->pid:-2, owner->lock_owner?owner->lock_owner->comm:"null", owner->lock_owner?owner->lock_owner->pid:-1); } } #endif for (i = 0; i < 2; i++) { unregister_ftrace_function(&spinlock_hooks[i].ops); ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 1, 0); } } module_init(wrap_spinlock_init); module_exit(wrap_spinlock_exit); MODULE_LICENSE("GPL");

值得注意的是,wrap函数中不能再调用任何会使用spinlock的函数(避免循环嵌套),因此一开始我决定自己用原子原语实现一个自己的自旋锁:

void lock(unsigned long *lock) { while (test_and_set_bit(0, lock)); } void unlock(unsigned long *lock) { *lock = 0; // 比clear_bit更帅 }

然而系统很快就卡死了:

把系统所有的spinlock操作全部在此处唯一的自旋锁上串行化,不卡死才怪!

于是,只能退而求其次,采用宽松的约束了:

实在没有slot了,就不统计该次记录了。

所以,我这个spinlock快照记录机制, 它是不准的。

以上就是一个简单的spinlock快照机制的代码和说明,它可以展示:

当前系统中都有哪些task在争抢哪一把spinlock。当前系统中某个spinlock被哪一个task所持有。

这个机制有什么用呢?

回到本文的开头,如果想让panic被schedule出去而不是宕机,我在担心current持有锁怎么办,我希望有一个办法让我知道current是否持有spinlock以及持有了哪些spinlock,然后将它们unlock。

现在有办法了:

// 通过lock_detect模块init函数中printk出来的bitmap地址来设置。 unsigned long pbitmap; module_param(pbitmap, ulong, 0644); // 通过lock_detect模块init函数中printk出来的plane地址来设置。 unsigned long pplane; module_param(pplane, ulong, 0644); void stub_panic(const char *fmt, ...) { int i; unsigned long *bitmap = (unsigned long *)pbitmap; struct lock_owner *plane = (struct lock_owner *)pplane; // 循环遍历所有当前系统当事的spinlock,解锁current所持有的spinlock。 for (i = 0; i < 32768; i++) { if (test_bit(i, bitmap) && plane[i].owner == current && plane[i].lock_owner == current && plane[i].lock && plane[i].idx == i) { printk("lock hold:%p %s %d\n", plane[i].lock, plane[i].lock_owner?plane[i].lock_owner->comm:"aabb", plane[i].lock_owner?plane[i].lock_owner->pid:-1); spin_unlock(plane[i].lock); } } if (preempt_count()) return; local_irq_enable(); // 安全地退场!! __set_current_state(TASK_UNINTERRUPTIBLE); schedule(); } ...

强调一点,本文介绍的把戏无法揪出所有的spinlock状态,因为Linux内核spinlock的lock/unlock操作并非_raw_spin_lock/_raw_spin_unlock入口的,比如spin_lock_irqsave/spin_unlock_irqrestore就不使用_raw_spin_lock/_raw_spin_unlock入口,它们有自己的入口。因此你需要把所有这些入口都给ftrace hook了,才能全咯。

不过,这仅仅是一个把戏,何必去留这么多TODO,权当玩吧。


浙江温州皮鞋湿,下雨进水不会胖。

最新回复(0)