|
一: 前言 Ring buffer是整个trace系统使用缓存管理的一种方式, 由于trace可能在内核运行的任何时候发生, 这种kernel的不确定状态决定了ring buffer的写操作中不能有任何引起睡眠的操作, 而且ring buffer的操作频率极高,所以在ring buffer实现里有很多高效的方式来处理多处理器, 读写同步的机制. 理解ring buffer是我们理解整个kernel trace的基础. 本文分析的源代码版本为linux kernel 2.6.30, 分析的代码基本位于kernel/trace/ring_buffer.c中.另外,为了描述的方便,下文ring buffer用RB来代替. 二: ring buffer的基本数据结构 在深入到代码之前,我们先来看一下RB所用到的几个基本的数据结构,这样我们对RB就会有一个全局性的了解.整个RB的数据结构框架如下所示: 
ring buffer用struct ring_buffer来表示,数据结构定义如下: struct ring_buffer { /*RB中的页面数*/ unsigned pages; /*RB的标志,目前只有RB_FL_OVERWRITE可用*/ unsigned flags; /*ring buffer中包含的cpu个数*/ int cpus; /*整个ring buffer的禁用标志,用原子操作了防止竞争*/ atomic_t record_disabled; /* cpu位图*/ cpumask_var_t cpumask; /*RB访问锁*/ struct mutex mutex; /*CPU的缓存区页面,每个CPU对应一项*/ struct ring_buffer_per_cpu **buffers; #ifdef CONFIG_HOTPLUG_CPU /*多CPU情况下的cpu hotplug 通知链表*/ struct notifier_block cpu_notify; #endif /*RB所用的时间,用来计数时间戳*/ u64 (*clock)(void); } 在RB的操作中,我们可以禁止全局的RB操作,例如,完全禁用掉Trace功能后,整个RB都是不允许再操做的,这时,就可以将原子变量record_disabled 加1.相反的,如果启用的话,将其减1即可.只有当record_disabled的值等于0时,才允许操作RB. 同时,有些时候,要对RB的一些数据进行更新,比如,我要重新设置一下RB的缓存区大小,这都需要串行操作,因此,在ring_buffer结构中有mutex成员,用来避免这些更改RB的操作的竞争. 每个cpu的缓存区结构为: struct ring_buffer_per_cpu { /*该cpu buffer所在的CPU*/ int cpu; /*cpu buffer所属的RB*/ struct ring_buffer *buffer; /*读锁,用了避免读者的操行操作,有时在 *写者切换页面的时候,也需要持有此锁 */ spinlock_t reader_lock; /* serialize readers */ raw_spinlock_t lock; struct lock_class_key lock_key; /*cpu buffer的页面链表*/ struct list_head pages; /*起始读位置*/ struct buffer_page *head_page; /* read from head */ /*写位置*/ struct buffer_page *tail_page; /* write to tail */ /*提交位置,只有当被写的页面提交过后 *才允许被读 */ struct buffer_page *commit_page; /* committed pages */ /*reader页面, 用来交换读页面*/ struct buffer_page *reader_page; unsigned long nmi_dropped; unsigned long commit_overrun; unsigned long overrun; unsigned long read; local_t entries; /*最新的页面commit时间*/ u64 write_stamp; /*最新的页面read时间*/ u64 read_stamp; /*cpu buffer的禁用启用标志*/ atomic_t record_disabled; } 首先,对每一个cpu的操作限制是由ring_buffer_per_cpu->record_disabled来实现的.同ring_buffer一样,禁用加1,启用减1. 从上图的全局结构关联图中,我们也可以看到,每个cpu都有一系列的页面,这样页面都链入在pages中. 该页面的结构如下: struct buffer_page { /*用来形成链表*/ struct list_head list; /* list of buffer pages */ /*写的位置*/ local_t write; /* index for next write */ /*读的位置*/ unsigned read; /* index for next read */ /*页面中有多少项数据*/ local_t entries; /* entries on this page */ struct buffer_data_page *page; /* Actual data page */ }; 具体的缓存区是由struct buffer_data_page指向的,实际上,它是具体页面的管理头部,结构如下: struct buffer_data_page { /*页面第一次被写时的时间戳*/ u64 time_stamp; /* page time stamp */ /*提交的位置*/ local_t commit; /* write committed index */ /*用来存放数据的缓存区*/ unsigned char data[]; /* data of buffer page */ }; 这里就有一个疑问了,为什么提交页面要放到struct buffer_date_page中,而不放到struct buffer_page呢? 三: ring buffer的初始化 Ring buffer的初始化函数为ring_buffer_alloc(). 代码如下: struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags) { struct ring_buffer *buffer; int bsize; int cpu; /* Paranoid! Optimizes out when all is well */ /*如果struct buffer_page的大小超过了struct page的大小,编译时会报错 *因为ring_buffer_page_too_big()其实并不存在. */ if (sizeof(struct buffer_page) > sizeof(struct page)) ring_buffer_page_too_big(); /* keep it in its own cache line */ /*alloc and init struct ring_buffer*/ buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), GFP_KERNEL); if (!buffer) return NULL; /*init cpumask*/ if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) goto fail_free_buffer; /*BUF_PAGE_SIZE means the data size of per page, *size/BUF_PAGE_SIZE can calculate page number of per cpu. */ buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); buffer->flags = flags; /* buffer->clock is the timestap of local cpu*/ buffer->clock = trace_clock_local; /* need at least two pages */ if (buffer->pages == 1) buffer->pages++; /* * In case of non-hotplug cpu, if the ring-buffer is allocated * in early initcall, it will not be notified of secondary cpus. * In that off case, we need to allocate for all possible cpus. */ #ifdef CONFIG_HOTPLUG_CPU get_online_cpus(); cpumask_copy(buffer->cpumask, cpu_online_mask); #else cpumask_copy(buffer->cpumask, cpu_possible_mask); #endif /*number of cpu*/ buffer->cpus = nr_cpu_ids; /* alloc and init buffer for per cpu,Notice:buffer->buffers is a double pointer*/ bsize = sizeof(void *) * nr_cpu_ids; buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), GFP_KERNEL); if (!buffer->buffers) goto fail_free_cpumask; for_each_buffer_cpu(buffer, cpu) { buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, cpu); if (!buffer->buffers[cpu]) goto fail_free_buffers; } #ifdef CONFIG_HOTPLUG_CPU buffer->cpu_notify.notifier_call = rb_cpu_notify; buffer->cpu_notify.priority = 0; register_cpu_notifier(&buffer->cpu_notify); #endif put_online_cpus(); mutex_init(&buffer->mutex); return buffer; fail_free_buffers: for_each_buffer_cpu(buffer, cpu) { if (buffer->buffers[cpu]) rb_free_cpu_buffer(buffer->buffers[cpu]); } kfree(buffer->buffers); fail_free_cpumask: free_cpumask_var(buffer->cpumask); put_online_cpus(); fail_free_buffer: kfree(buffer); return NULL; }
|