| 论坛注册| 加入收藏 | 设为首页| RSS
您当前的位置:首页 > Linux频道 > Linux开发区 > 内核研究

Linux内核跟踪之ring buffer的实现

时间:2010-08-16 09:58:39  来源:Linux联盟  作者:
一: 前言

  Ring buffer是整个trace系统使用缓存管理的一种方式, 由于trace可能在内核运行的任何时候发生, 这种kernel的不确定状态决定了ring buffer的写操作中不能有任何引起睡眠的操作, 而且ring buffer的操作频率极高,所以在ring buffer实现里有很多高效的方式来处理多处理器, 读写同步的机制. 理解ring buffer是我们理解整个kernel trace的基础. 本文分析的源代码版本为linux kernel 2.6.30, 分析的代码基本位于kernel/trace/ring_buffer.c中.另外,为了描述的方便,下文ring buffer用RB来代替.

  二: ring buffer的基本数据结构

  在深入到代码之前,我们先来看一下RB所用到的几个基本的数据结构,这样我们对RB就会有一个全局性的了解.整个RB的数据结构框架如下所示:

  ring buffer用struct ring_buffer来表示,数据结构定义如下:

  struct ring_buffer {

  /*RB中的页面数*/

  unsigned            pages;

  /*RB的标志,目前只有RB_FL_OVERWRITE可用*/

  unsigned            flags;

  /*ring buffer中包含的cpu个数*/

  int             cpus;

  /*整个ring buffer的禁用标志,用原子操作了防止竞争*/

  atomic_t            record_disabled;

  /* cpu位图*/

  cpumask_var_t           cpumask;

  /*RB访问锁*/

  struct mutex            mutex;

  /*CPU的缓存区页面,每个CPU对应一项*/

  struct ring_buffer_per_cpu  **buffers;

  #ifdef CONFIG_HOTPLUG_CPU

  /*多CPU情况下的cpu hotplug 通知链表*/

  struct notifier_block       cpu_notify;

  #endif

  /*RB所用的时间,用来计数时间戳*/

  u64             (*clock)(void);

  }

  在RB的操作中,我们可以禁止全局的RB操作,例如,完全禁用掉Trace功能后,整个RB都是不允许再操做的,这时,就可以将原子变量record_disabled 加1.相反的,如果启用的话,将其减1即可.只有当record_disabled的值等于0时,才允许操作RB.

  同时,有些时候,要对RB的一些数据进行更新,比如,我要重新设置一下RB的缓存区大小,这都需要串行操作,因此,在ring_buffer结构中有mutex成员,用来避免这些更改RB的操作的竞争.

  每个cpu的缓存区结构为:

  struct ring_buffer_per_cpu {

  /*该cpu buffer所在的CPU*/

  int             cpu;

  /*cpu buffer所属的RB*/

  struct ring_buffer      *buffer;

  /*读锁,用了避免读者的操行操作,有时在

  *写者切换页面的时候,也需要持有此锁

  */

  spinlock_t          reader_lock; /* serialize readers */

  raw_spinlock_t          lock;

  struct lock_class_key       lock_key;

  /*cpu buffer的页面链表*/

  struct list_head        pages;

  /*起始读位置*/

  struct buffer_page      *head_page; /* read from head */

  /*写位置*/

  struct buffer_page      *tail_page; /* write to tail */

  /*提交位置,只有当被写的页面提交过后

  *才允许被读

  */

  struct buffer_page      *commit_page;   /* committed pages */

  /*reader页面, 用来交换读页面*/

  struct buffer_page      *reader_page;

  unsigned long           nmi_dropped;

  unsigned long           commit_overrun;

  unsigned long           overrun;

  unsigned long           read;

  local_t             entries;

  /*最新的页面commit时间*/

  u64             write_stamp;

  /*最新的页面read时间*/

  u64             read_stamp;

  /*cpu buffer的禁用启用标志*/

  atomic_t            record_disabled;

  }

  首先,对每一个cpu的操作限制是由ring_buffer_per_cpu->record_disabled来实现的.同ring_buffer一样,禁用加1,启用减1.

  从上图的全局结构关联图中,我们也可以看到,每个cpu都有一系列的页面,这样页面都链入在pages中.

  该页面的结构如下:

  struct buffer_page {

  /*用来形成链表*/

  struct list_head list;      /* list of buffer pages */

  /*写的位置*/

  local_t     write;     /* index for next write */

  /*读的位置*/

  unsigned    read;      /* index for next read */

  /*页面中有多少项数据*/

  local_t     entries;   /* entries on this page */

  struct buffer_data_page *page;  /* Actual data page */

  };

  具体的缓存区是由struct buffer_data_page指向的,实际上,它是具体页面的管理头部,结构如下:

  struct buffer_data_page {

  /*页面第一次被写时的时间戳*/

  u64     time_stamp;    /* page time stamp */

  /*提交的位置*/

  local_t     commit;    /* write committed index */

  /*用来存放数据的缓存区*/

  unsigned char   data[];    /* data of buffer page */

  };

  这里就有一个疑问了,为什么提交页面要放到struct buffer_date_page中,而不放到struct buffer_page呢?

  三: ring buffer的初始化

  Ring buffer的初始化函数为ring_buffer_alloc(). 代码如下:

  struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)

  {

  struct ring_buffer *buffer;

  int bsize;

  int cpu;

  /* Paranoid! Optimizes out when all is well */

  /*如果struct buffer_page的大小超过了struct page的大小,编译时会报错

  *因为ring_buffer_page_too_big()其实并不存在.

  */

  if (sizeof(struct buffer_page) > sizeof(struct page))

  ring_buffer_page_too_big();

  /* keep it in its own cache line */

  /*alloc and init struct ring_buffer*/

  buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),

  GFP_KERNEL);

  if (!buffer)

  return NULL;

  /*init cpumask*/

  if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))

  goto fail_free_buffer;

  /*BUF_PAGE_SIZE means the data size of per page,

  *size/BUF_PAGE_SIZE can calculate page number of per cpu.

  */

  buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);

  buffer->flags = flags;

  /* buffer->clock is the timestap of local cpu*/

  buffer->clock = trace_clock_local;

  /* need at least two pages */

  if (buffer->pages == 1)

  buffer->pages++;

  /*

  * In case of non-hotplug cpu, if the ring-buffer is allocated

  * in early initcall, it will not be notified of secondary cpus.

  * In that off case, we need to allocate for all possible cpus.

  */

  #ifdef CONFIG_HOTPLUG_CPU

  get_online_cpus();

  cpumask_copy(buffer->cpumask, cpu_online_mask);

  #else

  cpumask_copy(buffer->cpumask, cpu_possible_mask);

  #endif

  /*number of cpu*/

  buffer->cpus = nr_cpu_ids;

  /* alloc and init buffer for per cpu,Notice:buffer->buffers is a double pointer*/

  bsize = sizeof(void *) * nr_cpu_ids;

  buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),

  GFP_KERNEL);

  if (!buffer->buffers)

  goto fail_free_cpumask;

  for_each_buffer_cpu(buffer, cpu) {

  buffer->buffers[cpu] =

  rb_allocate_cpu_buffer(buffer, cpu);

  if (!buffer->buffers[cpu])

  goto fail_free_buffers;

  }

  #ifdef CONFIG_HOTPLUG_CPU

  buffer->cpu_notify.notifier_call = rb_cpu_notify;

  buffer->cpu_notify.priority = 0;

  register_cpu_notifier(&buffer->cpu_notify);

  #endif

  put_online_cpus();

  mutex_init(&buffer->mutex);

  return buffer;

  fail_free_buffers:

  for_each_buffer_cpu(buffer, cpu) {

  if (buffer->buffers[cpu])

  rb_free_cpu_buffer(buffer->buffers[cpu]);

  }

  kfree(buffer->buffers);

  fail_free_cpumask:

  free_cpumask_var(buffer->cpumask);

  put_online_cpus();

  fail_free_buffer:

  kfree(buffer);

  return NULL;

  }

 1/7    1 2 3 4 5 6 ›› ›|

来顶一下
返回首页
返回首页
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表
相关文章
栏目更新
栏目热门