在监控应用程序的同时,持续地收集信息是一件非常重要的事情。那些影响网站响应速度以及网站所能服务的页面数量的代码改动、新的广告营销活动或者是刚刚接触系统的新用户,都有可能会彻底地改变网站载入页面的数量,并因此而影响网站的各项性能指标。 但如果我们平时不记录任何指标数据的话,我们就不可能知道指标发生了变化,也就不可能知道网站的性能是在提高还是在下降。 为了收集指标数据并进行监视和分析,我们将构建一个能够持续创建并维护计数器的工具,这个工具创建的每个计数器都有自己的名字(名字里带有网站点击量、销量或者数据库查询字样的计数器都是比较重要的计数器)。这些计数器会以不同的时间精度(如1秒、5秒、1分钟等)存储最新的120个数据样本,用户也可以根据自己的需要,对取样的数量和精度进行修改。 实现计数器首先要考虑的就是如何存储计数器信息,接下来将说明我们是如何将计数器信息存储到Redis里面的。
为了对计数器进行更新,我们需要存储实际的计数器信息。对于每个计数器以及每种精度,如网站点击量计数器和5秒,我们将使用个散列来存储网站在每个5秒时间片(time slice)之内获得的点击量,其中,散列的每个键都是某个时间片的开始时间,而键对应的值则存储了网站在该时间片之内获得的点击量。图5-1展示了一个点击量计数器存储的其中一部分数据,这个计数器以每5秒为一个时间片记录着网站的点击量。 为了能够清理计数器包含的旧数据,我们需要在使用计数器的同时,对被使用的计数器进行记录。为了做到这一点,我们需要一个有序序列(ordered sequence) ,这个序列不能包含任何重复元素,并且能够让我们一个接一个地遍历序列中包含的所有元素。虽然同时使用列表和集合可以实现这种序列,但同时使用两种数据结构需要编写更多代码,并且会增加客户端和Redis之间的通信往返次数。实际上,实现有序序列更好的办法是使用有序集合,有序集合的各个成员分别由计数器的精度以及计数器的名字组成,而所有成员的分值都为0。因为所有成员的分值都被设置成了0,所以Redis在尝试按分值对有序集合进行排序的时候,就会发现这一点,并改为使用成员名进行排序,这使得一组给定的成员总是具有固定的排列顺序,从而可以方便地对这些成员进行顺序性的扫描。图5-2展示了一个有序集合,这个有序集合记录了正在使用的计数器。 既然我们已经知道应该使用什么结构来记录并表示计数器了,现在是时候来考虑一下如何使用和更新这些计数器了。代码清单5-3展示了程序更新计数器的方法:对于每种时间片精度,程序都会将计数器的精度和名字作为引用信息添加到记录已有计数器的有序集合里面,并增加散列计数器在指定时间片内的计数值。
public static final int[] PRECISION = new int[]{1, 5, 60, 300, 3600, 18000, 86400}; /** * 更新计数器 * @param conn redis链接 * @param name 计数器名称 * @param count 新增次数 * @param now 当前时间单位秒 */ public void updateCounter(Jedis conn, String name, int count, long now){ Transaction trans = conn.multi(); for (int prec : PRECISION) { //计算为那个时间段增加数量,比如说name的值为test,count的值为1,prec为3, //now的值分别为9、10、11,那么pnow 的值都为9,都会为count:3:test里面的9所对应的值加上1 long pnow = (now / prec) * prec; String hash = String.valueOf(prec) + ':' + name; trans.zadd("known:", 0, hash); trans.hincrBy("count:" + hash, String.valueOf(pnow), count); } trans.exec(); }在处理(process)和清理(clean up)旧计数器的时候,有几件事情是需要我们格外留心的,其中包括以下几件。
任何时候都可能会有新的计数器被添加进来。同一时间可能会有多个不同的清理操作在执行。对于一个每天只更新一次的计数器来说,以每分钟一次的频率尝试清理这个计数器只会浪费计算资源。如果一个计数器不包含任何数据,那么程序就不应该尝试对它进行清理。 我们接下来要构建一个守护进程函数,这个函数的工作方式和第2章中展示的守护进程函数类似,并且会严格遵守上面列出的各个注意事项。和之前展示的守护进程函数一样,这个守护进程函数会不断地重复循环直到系统终止这个进程为止。为了尽可能地降低清理操作的执行负载,守护进程会以每分钟一次的频率清理那些每分钟更新一次或者每分钟更新多次的计数器,而对于那些更新频率低于每分钟一次的计数器,守护进程则会根据计数器自身的更新频率来决定对它们进行清理的频率。比如说,对于每秒更新一次或者每5秒更新一次的计数器,守护进程将以每分钟一次的频率清理这些计数器;而对于每5分钟更新一次的计数器,守护进程将以每5分钟一次的频率清理这些计数。 清理程序通过对记录已知计数器的有序集合执行ZRANGE命令来一个接一个的遍历所有己知的计数器。在对计数器执行清理操作的时候,程序会取出计数器记录的所有计数样本的开始时间,并移除那些开始时间位于指定截止时间之前的样本,清理之后的计数器最多只会保留最新的120个样本。如果一个计数器在执行清理操作之后不再包含任何样本,那么程序将从记录已知计数器的有序集合里面移除这个计数器的引用信息。以上给出的描述大致地说明了计数器清理函数的运作原理,至于程序的一些边界情况最好还是通过代码来说明,要了解该函数的所有细节。 public class CleanCountersThread extends Thread { private Jedis conn; //保留的样本数量 private int sampleCount = 100; private boolean quit; //清理当前时间之后多久的数据 private long timeOffset; // used to mimic a time in the future. public CleanCountersThread(int sampleCount, long timeOffset){ this.conn = new Jedis("localhost"); this.conn.select(14); this.sampleCount = sampleCount; this.timeOffset = timeOffset; } public void quit(){ quit = true; } public void run(){ //第几次清理(或者说第几分钟清理) int passes = 0; while (!quit){ long start = System.currentTimeMillis() + timeOffset; int index = 0; while (index < conn.zcard("known:")){ Set<String> hashSet = conn.zrange("known:", index, index); index++; if (hashSet.size() == 0) { break; } String hash = hashSet.iterator().next(); int prec = Integer.parseInt(hash.substring(0, hash.indexOf(':'))); //当前记录所属分钟 int bprec = (int)Math.floor(prec / 60); //统计时长不足1分钟按一分钟处理 if (bprec == 0){ bprec = 1; } //比如统计时长为3分钟,那么只有刚开始还有三分钟的倍数才会清理这个记录中的数据,其余时间不清理 if ((passes % bprec) != 0){ continue; } String hkey = "count:" + hash; String cutoff = String.valueOf( ((System.currentTimeMillis() + timeOffset) / 1000) - sampleCount * prec); ArrayList<String> samples = new ArrayList<String>(conn.hkeys(hkey)); Collections.sort(samples); //清理数据时刻的秒数在改记录中的位置,位置之前的都要被删除 int remove = bisectRight(samples, cutoff); if (remove != 0){ conn.hdel(hkey, samples.subList(0, remove).toArray(new String[0])); if (remove == samples.size()){ conn.watch(hkey); //如果该记录中没有了数据,从有序队列中删除改记录 if (conn.hlen(hkey) == 0) { Transaction trans = conn.multi(); trans.zrem("known:", hash); trans.exec(); index--; }else{ conn.unwatch(); } } } } //清理次数(或者分钟数)+1 passes++; //如果清理时长不足一分钟那么剩余时间休息,保证每分钟清理一次 long duration = Math.min( (System.currentTimeMillis() + timeOffset) - start + 1000, 60000); try { sleep(Math.max(60000 - duration, 1000)); }catch(InterruptedException ie){ ie.printStackTrace(); Thread.currentThread().interrupt(); } } } // mimic python's bisect.bisect_right public int bisectRight(List<String> values, String key) { int index = Collections.binarySearch(values, key); return index < 0 ? Math.abs(index) - 1 : index + 1; } }