点赞模块中合并数据部分的实现

Posted by AlstonWilliams on February 17, 2019

昨天我们谈了一下如何设计点赞模块,最后给出了优化这个模块的方案.

我们提到,可以通过合并收到的数据,来进行优化.而如何合并数据,就成了一个尤为重要的问题.

针对这个问题,我们也给出了几种解决方案,一种是重写Tomcat,通过Tomcat进行拦截并定时处理,但是这种方案,实现起来难度有些大,第二种是通过服务器在处理请求时,是为其新建一个线程来处理的原理,来实现,这种方案相对较简单一些,第三种是通过消息队列来实现,这种方案挺复杂,但是难度不高.

今天我们就采用第二种方案来实现.实现起来也是一波三折.且听我慢慢道来.

我们打算这样实现:

  • 创建一个缓冲区,让所有线程共享,然后将一分钟内收到的数据,全部保存到这个变量中,并给前台返回一个成功的标记.
  • 过了一分钟后,当收到请求时,先将缓冲区中的数据,全部保存到数据库中,然后将这个缓冲区清空,并再次写入下个一分钟之内收到的数据.

过程就是这么简单.但是我们需要注意一些问题:

  • 如果在我们将数据保存到数据库之后,清空缓冲区之前的这个时间段中,还有线程在向这个缓冲区写入数据,那么这些新写入的数据没有被保存就会被清除.所以我们需要先等待将之前收到的请求写入到缓冲区结束后,阻塞后来收到的请求,再将数据保存到数据库中,并清空缓冲区.

  • 如果有多个线程执行将数据保存到数据库,并清空缓冲区的操作怎么办?

我们先来看第一版的源代码:

在第一版中,我们使用ReentrantLock来实现线程之间的同步.

我们先判断此请求到来的时间与上次保存数据到数据库的时间差,是否大于一分钟.也就是判断是否过去了一分钟.如果是这样,就获取到ReentrantLock,并在有锁的期间,将数据保存到数据库中,然后重置保存数据到数据库的时间点,并清空保存一分钟之内收到的全部数据的缓冲区.lastAggegate这个变量是AtomicLong类型的,这是一个线程安全的Long类型.likeData这个缓冲区,是ConcurrentHashMap类型的,这是一个线程安全的HashMap.

我们在做完上面的工作之后,释放掉ReentrantLock.

下面的那块代码中,我们判断是否已经有线程占有ReentrantLock了,如果是,我们就一直循环,等待它释放.实际上就是起到了阻塞一分钟之后收到的请求写数据的作用.然后执行后面的步骤.在后面,我们将收到的数据,保存或者更新到likeData这个缓冲区中.

这里我们拿两个同时到来的请求A和B,来分析一下上面的代码:

第一种情况,先假设A和B都是在一分钟之内到来的,则会直接执行下面的代码块.因为此时ReentrantLock并没有被任意一个线程占有,所以这两个线程A和B,会并发更新likeData这个缓存区.这个没有什么问题.

第二种情况,假设A和B都是一分钟之后到来的,先假设A先判断是否过去一分钟,并判断为true,然后在下面获得锁,重置时间点,这时,B在判断是否过去了一分钟,判断为false,然后B执行下面的代码块,而因为ReentrantLock已经被A获取,所以它只能在while循环中一直等待.当A清空likeData这个缓冲区并释放ReentrantLock之后,B才得以和A同时并行的将数据写入到likeData这个缓冲区中.如果单看这两个线程,这个过程也没有什么问题.结果也是正确的.可是,在A清空likeData这个缓冲区的时候,在高并发的情况下,很可能有一分钟之内到来的请求C,正在向缓冲区中写入数据!!这样,请求C的数据,就被悄无声息的删除了.

第三种情况,假设A和B都是一分钟之后到来的,它俩又同时判断是否过去了一分钟,并同时判断为true,然后它们同时请求锁,A或B其中一个获得了ReentrantLock,然后执行后面的流程.这个没有什么好解释的.同样,它也有第二种情况中我们说的那种特点,数据很可能被悄无声息的删除了.同时,它还有一种特点,就是在高并发情况下,假设有一百个线程是串行着请求锁,即第一个线程释放了锁,第二个线程才请求锁,等第二个线程释放了锁之后,第三个线程才请求锁,依次类推.我们可以看到,这样不仅会使意外清除数据的情况更加严重,还会有性能问题.我们需要执行获得一百次锁,执行一百次锁内需要执行的操作.

第四种情况,假设A是一分钟之内到来的,B是一分钟之后到来的,就可能出现我们上面第二种情况中,所说的那种意外.

这几种情况里面,意外清空数据的情况最难处理,因为我们无法做到让锁内需要执行的一系列操作,让其等到一分钟之内到来的请求都把数据写入到likeData这个缓冲区之内,再执行.

而多个线程同时判断是否过去一分钟,并判断为true这种情况,我们可以通过将判断及执行所内的操作放到**synchronized **块中,来解决.

其实意外清空数据的这种情况,我们可以通过ReadWriteLock或者StampedLock来解决.我们之前介绍过这两种锁.

这两种锁为何能够解决意外清空数据的情况呢?

各位应该都知道,ReadWriteLock的规则,即:

  • 如果没有线程持有写锁,那么可以有任意多个线程同时持有读锁,来读数据,因为读操作肯定是线程安全的.

  • 写锁最多只能被一个线程同时占有.

  • 不能同时占有读锁和写锁.如果线程A占有读锁,而线程B请求写锁,那么B必须等待A先释放读锁,才能占有写锁.同样,如果线程A占有写锁,而线程B请求读锁,那么B必须等待A先释放写锁,才能占有读锁.

这个规则是否跟我们这里的需求很相似呢?

于是,我们写出了第二版的代码:

这里我们将判断是否已经过去了一分钟,以及需要执行的对应的操作,都放到了synchronized同步代码块中,这样,当执行这个代码块的时候,因为是串行操作,所以同一时间只能有一个线程能够获得写锁,并执行相应的操作.

而可以并行执行的将数据写入缓冲区的操作,我们给其加一个读锁,让其并行执行.

这里我们可以看到,当一个线程A判断已经过去一分钟,并要将数据写入到数据库时,需要先获取写锁,而要占有写锁,必须没有线程持有读锁.即之前的所有请求已经将数据都写入了likeData这个缓冲区之后,读锁都释放了之后,线程A才能占有写锁并执行相应的操作.这就解决了数据被意外清除的问题.

同样,因为要获得读锁来将数据写入到缓冲区时,必须先等待写锁的释放,也就相当于阻塞了之后到来的请求的写数据操作,防止在获得写锁并执行操作的这段时间中,到来的请求意外的向缓冲区中写入数据并最终被清空.

用一百个线程,各发送了九次请求,没有发现问题.请求中的全部数据,可以被正确的保存到数据库中.

在上面我们使用循环来获取读锁和写锁,其实还有更好的写法,就是使用上面提到过的StampedLock.因为ReadWriteLocktryLock()方法是立即返回的,所以我们需要通过while循环,不断地测试是否能够获得锁.即使可以为其设置超时时间,也是极不方便的.如果设置的过小,我们无法保证在这个超时时间之内,会获得锁,如果设置的过大,又浪费时间,降低了效率.而StampedLock中的获得锁的方法,是会阻塞当前线程的.也就是说,如果获取不到锁,就会阻塞当前线程,一直到获取到.这种方式其实更好一些,减少了上面因为while循环中CPU空转造成的资源浪费.

还有一种更简单的方案,就是将全部的操作,都放在synchronized代码块中.这个应该也很好理解.这里不再详细叙述这种方案.

但是,使用这种方案,有一个致命缺陷,就是性能的问题.我们向缓冲区写数据的操作是可以并行的,如果全都放在synchronized里面,就只能是串行的,那全部的请求都得一个个的串行处理,对性能是极大的消耗.

而我们使用读写锁的方案,只是将判断以及写数据库的操作放入到synchronized块中,虽然是串行,但是相当轻量级.大多数情况下,实际上只有判断这条语句是需要并行执行的,汇编指令也就是三条.

上面的读写锁的代码中,synchronizedexpression中,我们用的是一个用final修饰的,Integer类型的变量.它是不可变的.synchronized会取得** expression中的对象的monitor,将其当做互斥条件,一个对象只有一个monitor与之对应,如果我们拿一个不是final的可变的对象来做expression,那么很可能并没有被正确的同步,得到的结果也是不正确的.当我用likeData这个ConcurrentHashMap对象时,以及lastAggegate**这个变量时,会出现错误的结果.

上面我们的缓冲区用的是ConcurrentHashMap这个容器,这个容器在读多写少时,性能很好,而我们现在是写多读少,跟它相反,虽然还没有遇到什么性能问题,但是这里应该选择一个合适的适合写多读少的线程安全的Map.不知道有没有.