Java中,获得ResultSet的总行数与总列数

Posted on

Java中,获得ResultSet的总行数与总列数

Java中,获得ResultSet的总行数与总列数

在Java中,获得ResultSet的总行数的方法有以下几种。

第一种:利用ResultSet的getRow方法来获得ResultSet的总行数

Statement stmt = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_UPDATABLE); ResultSet rset = stmt.executeQuery("select /* from yourTableName"); rset.last(); int rowCount = rset.getRow(); //获得ResultSet的总行数

第二种:利用循环ResultSet的元素来获得ResultSet的总行数

ResultSet rset = stmt.executeQuery("select /* from yourTableName"); int rowCount = 0; while(rset.next()) { rowCount++; }

rowCount就是ResultSet的总行数。

第三种:利用sql语句中的count函数获得ResultSet的总行数

ResultSet rset = stmt.executeQuery("select count(/*) totalCount from yourTableName"); int rowCount = 0; if(rset.next()) { rowCount=rset .getInt("totalCount "); }

rowCount就是ResultSet的总行数。

  • /////////////////////////////////////////////////////////////////////////////////////*
  • Java中获得ResultSet的总列数是非常简单事情,因为Java中ResultSet提供了ResultSetMetaData工具类,ResultSetMetaData 是ResultSet的元数据的集合说明。

java获得ResultSet总列数的代码如下:

Statement stmt = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_UPDATABLE); ResultSet rset = stmt.executeQuery("select /* from yourtable"); ResultSetMetaData rsmd = rset.getMetaData() ; int columnCount = rsmd.getColumnCount();

columnCount 就是ResultSet的总列数。


例子: Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); ResultSet rs = stmt.executeQuery(sql); rs.last(); int length = rs.getRow();

如上,length的值,就是行数了。如果在获取了行数后,还需要继续使用当前数据集rs,则需要rs.beforeFirst();一次,将游标回到初始位置。 ResultSet.TYPE_SCROLL_INSENSITIVE 结果集的游标可以上下移动,当数据库变化时,当前结果集不变。 ResultSet.CONCUR_READ_ONLY 不能用结果集更新数据库中的表。

此外,给出Statement创建时的其他说明: 通用格式为:Statement stmt=con.createStatement(int type,int concurrency);我们在访问数据库的时候,在读取返回结果的时候,可能要前后移动指针,比如我们先计算有多少条信息,这是我们就需要把指针移到最后来计算,然后再把指针移到最前面,逐条读取,有时我们只需要逐条读取就可以了。还有就是有只我们只需要读取数据,为了不破坏数据,我们可采用只读模式,有时我们需要望数据库里添加记录,这是我们就要采用可更新数据库的模式。 下面是所有参数的说明: 参数 int type ResultSet.TYPE_FORWORD_ONLY 结果集的游标只能向下滚动。 ResultSet.TYPE_SCROLL_INSENSITIVE 结果集的游标可以上下移动,当数据库变化时,当前结果集不变。 ResultSet.TYPE_SCROLL_SENSITIVE 返回可滚动的结果集,当数据库变化时,当前结果集同步改变。

参数 int concurrency

ResultSet.CONCUR_READ_ONLY 不能用结果集更新数据库中的表。 ResultSet.CONCUR_UPDATETABLE 能用结果集更新数据库中的表。

此外,当我们使用ResultSet re=stmt.executeQuery(SQL语句)查询后,我们可以使用下列方法获得信息:

public boolean previous() 将游标向上移动,该方法返回boolean型数据,当移到结果集第一行之前时,返回false。 public void beforeFirst 将游标移动到结果集的初始位置,即在第一行之前。 public void afterLast() 将游标移到结果集最后一行之后。 public void first() 将游标移到结果集的第一行。 public void last() 将游标移到结果集的最后一行。 public boolean isAfterLast() 判断游标是否在最后一行之后。 public boolean isBeforeFirst() 判断游标是否在第一行之前。 public boolean ifFirst() 判断游标是否指向结果集的第一行。 public boolean isLast() 判断游标是否指向结果集的最后一行。 public int getRow() 得到当前游标所指向行的行号,行号从1开始,如果结果集没有行,返回0。 public boolean absolute(int row) 将游标移到参数row指定的行号。如果row取负值,就是倒数的行数,absolute(-1)表示移到最后一行,absolute(-2)表示移到倒数第2行。当移动到第一行前面或最后一行的后面时,该方法返回false。

ResultSetMetaData rsmd = this.rs.getMetaData(); this.columnCount = rsmd.getColumnCount();

聊聊并发(一)——深入分析Volatile的实现原理 (2)

Posted on

聊聊并发(一)——深入分析Volatile的实现原理 (2)

分享到

百度分享

聊聊并发(一)——深入分析Volatile的实现原理

作者 方腾飞 发布于 二月 21, 2012 | 27 评论

在多线程并发编程中synchronized和Volatile都扮演着重要的角色,Volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的“可见性”。可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。

它在某些情况下比synchronized的开销更小,本文将深入分析在硬件层面上Inter处理器是如何实现Volatile的,通过深入分析能帮助我们正确的使用Volatile变量。

术语定义

术语

英文单词

描述 共享变量

在多个线程之间能够被共享的变量被称为共享变量。共享变量包括所有的实例变量,静态变量和数组元素。他们都被存放在堆内存中,Volatile只作用于共享变量。 内存屏障

Memory Barriers

是一组处理器指令,用于实现对内存操作的顺序限制。 缓冲行

Cache line

缓存中可以分配的最小存储单位。处理器填写缓存线时会加载整个缓存线,需要使用多个主内存读周期。 原子操作

Atomic operations

不可中断的一个或一系列操作。 缓存行填充

cache line fill

当处理器识别到从内存中读取操作数是可缓存的,处理器读取整个缓存行到适当的缓存(L1,L2,L3的或所有) 缓存命中

cache hit

如果进行高速缓存行填充操作的内存位置仍然是下次处理器访问的地址时,处理器从缓存中读取操作数,而不是从内存。 写命中

write hit

当处理器将操作数写回到一个内存缓存的区域时,它首先会检查这个缓存的内存地址是否在缓存行中,如果存在一个有效的缓存行,则处理器将这个操作数写回到缓存,而不是写回到内存,这个操作被称为写命中。 写缺失

write misses the cache

一个有效的缓存行被写入到不存在的内存区域。

Volatile的官方定义

Java语言规范第三版中对volatile的定义如下: java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁单独获得这个变量。Java语言提供了volatile,在某些情况下比锁更加方便。如果一个字段被声明成volatile,java线程内存模型确保所有线程看到这个变量的值是一致的。

为什么要使用Volatile

Volatile变量修饰符如果使用恰当的话,它比synchronized的使用和执行成本会更低,因为它不会引起线程上下文的切换和调度。

Volatile的实现原理

那么Volatile是如何来保证可见性的呢?在x86处理器下通过工具获取JIT编译器生成的汇编指令来看看对Volatile进行写操作CPU会做什么事情。 Java代码:

instance = new Singleton();//instance是volatile变量 汇编代码:

0x01a3de1d: movb $0x0,0x1104800(%esi);

0x01a3de24: lock addl $0x0,(%esp);

有volatile变量修饰的共享变量进行写操作的时候会多第二行汇编代码,通过查IA-32架构软件开发者手册可知,lock前缀的指令在多核处理器下会引发了两件事情。

  • 将当前处理器缓存行的数据会写回到系统内存。
  • 这个写回内存的操作会引起在其他CPU里缓存了该内存地址的数据无效。

处理器为了提高处理速度,不直接和内存进行通讯,而是先将系统内存的数据读到内部缓存(L1,L2或其他)后再进行操作,但操作完之后不知道何时会写到内存,如果对声明了Volatile变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题,所以在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器要对这个数据进行修改操作的时候,会强制重新从系统内存里把数据读到处理器缓存里。

这两件事情在IA-32软件开发者架构手册的第三册的多处理器管理章节(第八章)中有详细阐述。

Lock前缀指令会引起处理器缓存回写到内存。Lock前缀指令导致在执行指令期间,声言处理器的 LOCK/# 信号。在多处理器环境中,LOCK/# 信号确保在声言该信号期间,处理器可以独占使用任何共享内存。(因为它会锁住总线,导致其他CPU不能访问总线,不能访问总线就意味着不能访问系统内存),但是在最近的处理器里,LOCK#信号一般不锁总线,而是锁缓存,毕竟锁总线开销比较大。在8.1.4章节有详细说明锁定操作对处理器缓存的影响,对于Intel486和Pentium处理器,在锁操作时,总是在总线上声言LOCK/#信号。但在P6和最近的处理器中,如果访问的内存区域已经缓存在处理器内部,则不会声言LOCK/#信号。相反地,它会锁定这块内存区域的缓存并回写到内存,并使用缓存一致性机制来确保修改的原子性,此操作被称为“缓存锁定”,缓存一致性机制会阻止同时修改被两个以上处理器缓存的内存区域数据

一个处理器的缓存回写到内存会导致其他处理器的缓存无效。IA-32处理器和Intel 64处理器使用MESI(修改,独占,共享,无效)控制协议去维护内部缓存和其他处理器缓存的一致性。在多核处理器系统中进行操作的时候,IA-32 和Intel 64处理器能嗅探其他处理器访问系统内存和它们的内部缓存。它们使用嗅探技术保证它的内部缓存,系统内存和其他处理器的缓存的数据在总线上保持一致。例如在Pentium和P6 family处理器中,如果通过嗅探一个处理器来检测其他处理器打算写内存地址,而这个地址当前处理共享状态,那么正在嗅探的处理器将无效它的缓存行,在下次访问相同内存地址时,强制执行缓存行填充。

Volatile的使用优化

著名的Java并发编程大师Doug lea在JDK7的并发包里新增一个队列集合类LinkedTransferQueue,他在使用Volatile变量时,用一种追加字节的方式来优化队列出队和入队的性能。

追加字节能优化性能?这种方式看起来很神奇,但如果深入理解处理器架构就能理解其中的奥秘。让我们先来看看LinkedTransferQueue这个类,它使用一个内部类类型来定义队列的头队列(Head)和尾节点(tail),而这个内部类PaddedAtomicReference相对于父类AtomicReference只做了一件事情,就将共享变量追加到64字节。我们可以来计算下,一个对象的引用占4个字节,它追加了15个变量共占60个字节,再加上父类的Value变量,一共64个字节。 /// head of the queue // private transient final PaddedAtomicReference < QNode > head; /// tail of the queue // private transient final PaddedAtomicReference < QNode > tail; static final class PaddedAtomicReference < T > extends AtomicReference < T > { // enough padding for 64bytes with 4byte refs Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; PaddedAtomicReference(T r) { super(r); } } public class AtomicReference < V > implements java.io.Serializable { private volatile V value; //省略其他代码 }

为什么追加64字节能够提高并发编程的效率呢? 因为对于英特尔酷睿i7,酷睿, Atom和NetBurst, Core Solo和Pentium M处理器的L1,L2或L3缓存的高速缓存行是64个字节宽,不支持部分填充缓存行,这意味着如果队列的头节点和尾节点都不足64字节的话,处理器会将它们都读到同一个高速缓存行中,在多处理器下每个处理器都会缓存同样的头尾节点,当一个处理器试图修改头接点时会将整个缓存行锁定,那么在缓存一致性机制的作用下,会导致其他处理器不能访问自己高速缓存中的尾节点,而队列的入队和出队操作是需要不停修改头接点和尾节点,所以在多处理器的情况下将会严重影响到队列的入队和出队效率。Doug lea使用追加到64字节的方式来填满高速缓冲区的缓存行,避免头接点和尾节点加载到同一个缓存行,使得头尾节点在修改时不会互相锁定。

那么是不是在使用Volatile变量时都应该追加到64字节呢?不是的。在两种场景下不应该使用这种方式。第一:缓存行非64字节宽的处理器,如P6系列和奔腾处理器,它们的L1和L2高速缓存行是32个字节宽。第二:共享变量不会被频繁的写。因为使用追加字节的方式需要处理器读取更多的字节到高速缓冲区,这本身就会带来一定的性能消耗,共享变量如果不被频繁写的话,锁的几率也非常小,就没必要通过追加字节的方式来避免相互锁定。

参考资料

关于作者

方腾飞,阿里巴巴资深软件开发工程师,致力于高性能网络编程,目前在公司从事询盘管理和长连接服务器OpenComet的开发工作。博客地址:http://ifeve.com

感谢郑柯对本文的审校。

给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(@InfoQ)或者腾讯微博(@InfoQ)关注我们,并与我们的编辑和其他读者朋友交流。

相关内容

您好,陌生人!

您需要 注册一个InfoQ账号 或者 登录 才能进行评论。在您完成注册后还需要进行一些设置。

获得来自InfoQ的更多体验。

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

社区评论 Watch Thread

fantastic by chuanmin zuo Posted 22/02/2012 01:19 Re: fantastic by chuanmin zuo Posted 22/02/2012 01:21

Re: fantastic by 方 腾飞 Posted 22/02/2012 01:55 I have a question to ask you by chuanmin zuo Posted 22/02/2012 07:23

Re: I have a question to ask you by chuanmin zuo Posted 22/02/2012 07:24 疑问 by Wang Frank Posted 24/02/2012 01:12

Re: 疑问 by 方 腾飞 Posted 26/02/2012 10:35 Re: 两个不同的线程看到某个成员变量的值是不相同的,怎么写个例子 by zhang qinghua Posted 08/05/2013 11:43

Re: 疑问 by 史 墨轩 Posted 19/12/2012 10:06 Re: 疑问 by 方 腾飞 Posted 27/12/2012 01:07

Re: 疑问 by 方 腾飞 Posted 31/12/2012 10:33 so wonderful by hu depin Posted 24/02/2012 02:56

Re: so wonderful by 方 腾飞 Posted 26/02/2012 10:41 汇编代码的角度去考虑 by Liu Xia Posted 28/02/2012 00:09

如何在实际的项目中应用 by www www Posted 28/02/2012 09:18 Re: 如何在实际的项目中应用 by 方 腾飞 Posted 28/02/2012 07:08

为什么volatile 变量不能保证写操作的原子性 by Haixia Chen Posted 11/08/2012 10:09 Re: 为什么volatile 变量不能保证写操作的原子性 by Haixia Chen Posted 11/08/2012 11:01

Re: 为什么volatile 变量不能保证写操作的原子性 by 方 腾飞 Posted 02/09/2012 07:36 好啊 by s zh Posted 31/12/2012 05:08

疑问 by zhao chiva Posted 17/02/2013 03:34 改字 by 吴 杰峰 Posted 07/03/2013 08:07

一个有效的缓存行被写入到不存在的内存区域 by 高 海军 Posted 02/04/2013 10:19 java中对象或者数组用volatile修饰有什么用? by sc lv Posted 03/04/2013 03:00

请教一个悖论问题 by sun shanghai Posted 26/04/2013 10:43 Re: 请教一个悖论问题 by zhang qinghua Posted 08/05/2013 11:44

文章导出 by tony zarric Posted 12/05/2013 11:05

fantastic 22/02/2012 01:19 by chuanmin zuo

Thanks for your sharing,I think it's pretty wonderful technical article. It's very useful for java developers to understand volatile. But I also want to know the performance difference between volatile and synchronized,don't misunderstand,I mean precise,or to prove performance difference with some data.If you can add this,that's perfect,coz we pay close attention to that.Maybe that is the key point of your article. thanks again.

Re: fantastic 22/02/2012 01:21 by chuanmin zuo

Sorry,Maybe that is the key point of your article.-->Maybe that is not the key point of your article

Re: fantastic 22/02/2012 01:55 by 方 腾飞

Thank you for your attention! i will add it to another article.

I have a question to ask you 22/02/2012 07:23 by chuanmin zuo

After reading your article,I made a test like this: I downloaded java language specification,and test a example from 8.3.1.4 volatile field in this document. But I found even if a field(i and j in that example) is declared volatile,the value of j is still greater than that for i.Actually I know why,coz volatile just ensure other thread can see the latest value ,not ensure synchronization.But that document(I mean java langguage specification) tells me:"Therefore,the shared value for j is never greater than that of j",I can not understand,Have you tested that example. I look forward to your reply.

Re: I have a question to ask you 22/02/2012 07:24 by chuanmin zuo

"Therefore,the shared value for j is never greater than that of i",sorry,my mistake.

疑问 24/02/2012 01:12 by Wang Frank

当处理器将操作数写回到一个内存缓存的区域时,它首先会检查这个缓存的内存地址是否在缓存行中,如果不存在一个有效的缓存行,则处理器将这个操作数写回到缓存,而不是写回到内存,这个操作被称为写命中。 “如果不存在一个有效的缓存行” 这里是否应该是“如果存在一个有效的缓存行”?

so wonderful 24/02/2012 02:56 by hu depin

受益匪浅,感谢分享

Re: 疑问 26/02/2012 10:35 by 方 腾飞

是的,感谢纠正。

Re: so wonderful 26/02/2012 10:41 by 方 腾飞

谢谢关注。

汇编代码的角度去考虑 28/02/2012 00:09 by Liu Xia

Java 语言实际上是有自己的内存模型的。这种内存模型实际上就是希望隐藏各种不同的处理器对于缓存一致性的处理。volatile 在 Java 的内存模型还真是没有太关注,在 CLR 的内存模型上是 volatile 的读具有 load acuqire 语义(这和一般的读不同),从而防止了一定程度上的代码调整,而保证在 load 完成后的一致性。 但是从汇编上那就复杂了,例如 IA-32 或者 Intel 64 架构的处理器上使用了 lock 或者,但是在 IA-64 的处理器上就需要一个 load acquire 的 inter-lock 操作(IA-64 上的这种操作指令还挺丰富)。

如何在实际的项目中应用 28/02/2012 09:18 by www www

在单位的电脑上跑了一下 确实快很多。也确实证明了他的可用性。但问题是我们如何在实际的项目中应用这个东西。 或者说如何在现行的项目中确定false sharing产生的瓶颈,然后用这种方法来提高性能?

Re: 如何在实际的项目中应用 28/02/2012 07:08 by 方 腾飞

文中有提到,应用场景是首先要确认该共享变量是否会被频繁的写?比如队列的入队和出队,需要不停的修改头节点和尾节点,所以这里建议使用Padding的方式进行优化,当然即使是频繁的写也不一定会成为性能的瓶颈。

为什么volatile 变量不能保证写操作的原子性 11/08/2012 10:09 by Haixia Chen

大师,文章中提到volatile变量的写操作,处理器写完会更新到内存,其他处理器缓存会失效,既然这样,为什么多线程i++,100次的时候,最后i的值可能<100呢,i++的分析见:wk.baidu.com/view/bc890df5f61fb7360b4c654b&... Pad输入不容易,盼大师解惑

Re: 为什么volatile 变量不能保证写操作的原子性 11/08/2012 11:01 by Haixia Chen

明白了,那篇文章已经讲的很清楚了

Re: 为什么volatile 变量不能保证写操作的原子性 02/09/2012 07:36 by 方 腾飞

:)

Re: 疑问 19/12/2012 10:06 by 史 墨轩

我刚看到的时候也有这个疑问,看了回复果然是翻译问题,希望作者做一下修改吧

Re: 疑问 27/12/2012 01:07 by 方 腾飞

好的

Re: 疑问 31/12/2012 10:33 by 方 腾飞

已经修改!

好啊 31/12/2012 05:08 by s zh

感谢你的分享

疑问 17/02/2013 03:34 by zhao chiva

JKD7里面LinkedTransferQueue的实现跟你说的不一样,head跟tail的内部实现类是Node.也不是采用移位实现的。 /// head of the queue; null until first enqueue // transient volatile Node head; /// tail of the queue; null until first append // private transient volatile Node tail;

改字 07/03/2013 08:07 by 吴 杰峰

例如在Pentium和P6 family处理器中,如果通过嗅探一个处理器来检测其他处理器打算写内存地址,而这个地址当前处理共享状态 "而这个地址当前处理共享状态":应该是“而这个地址当前处于共享状态”

一个有效的缓存行被写入到不存在的内存区域 02/04/2013 10:19 by 高 海军

write misses the cache 表示"一个有效的缓存行被写入到不存在的内存区域。" 这段话是是不是有点问题? 应该是写缓存未命中巴? 可以再解释一下吗?

java中对象或者数组用volatile修饰有什么用? 03/04/2013 03:00 by sc lv

上文说到共享变量包括所有的实例变量,静态变量和数组元素,有看到其它资料说对于数组,volatile修饰的只是数组的引用,例如,java.io.BufferedInputStream类中protected volatile byte buf[]; 数组buf用volatile修饰; java.io.FilterInputStream类中protected volatile InputStream in; in用volatile修饰。 jdk中这样修饰有什么好处呢?这两个地方如果不用volatile修饰会有什么影响?

请教一个悖论问题 26/04/2013 10:43 by sun shanghai

import java.lang./; /// / 我觉得如果不加volatile修饰符,则第1个线程是永远不会停止的, / 但是实际上线程2把flag设置为true后,线程1就停止了 / 这个是什么原因呢?不明白。 /*/ public class Counter { // private static Boolean flag = new Boolean(false); // private volatile static Boolean flag = new Boolean(false); public static void main(String[] args) { // 线程1 new Thread() { int i = 0; public void run() { while (!flag.booleanValue()) { System.out.println(i++); } } }.start(); // 线程2 new Thread() { public void run() { try { Thread.sleep(1000); flag = new Boolean(true); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); } }

Re: 两个不同的线程看到某个成员变量的值是不相同的,怎么写个例子 08/05/2013 11:43 by zhang qinghua

private int i = 0; //a线程调用 public void foo1(){ try { while (true) { Thread.sleep(10); i++; } } catch (InterruptedException e) { //not to do; } } //b线程调用 public void foo2(){ try { while(true){ Thread.sleep(1000); System.out.println("第二个:"+i); } } catch (InterruptedException e) { //not to do; } } 为什么foo2打印的i的值会随着foo1修改的值变化。。。。。。 请见凉,自从看了这篇文章,感觉以前写的代码都有线程缓存共享变量的问题,但实际没有出现过 两个不同的线程看到某个成员变量的值是不相同的。 请lz,给个例子, 两个不同的线程(通过线程缓存共享变量)看到某个成员变量的值是不相同的,怎么写个例子? 请楼主回答一下。

Re: 请教一个悖论问题 08/05/2013 11:44 by zhang qinghua

你的困惑解决没,我的困惑和你一样。望回答一下,在你的楼下。

文章导出 12/05/2013 11:05 by tony zarric

页面内容能提供导出功能,如: 聊聊并发(一)——深入分析Volatile的实现原理 能导出 保存为pdf或word文件

关闭

**by

发布于

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p 当有人回复此评论时请E-mail通知我

关闭 主题 您的回复

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p 当有人回复此评论时请E-mail通知我 关闭

深度内容

Juergen Fesslmeier谈端到端的JavaScript开发

Juergen Fesslmeier 七月 02, 2013

设计模式自动化

Gael Fraiteur and Yan Cui 七月 01, 2013

设计指尖上的世界:移动用户界面一瞥

Forrest Shull 六月 28, 2013

成功的根本—集成的ALM工具

Dave West 六月 28, 2013

书评:验收测试驱动开发实践指南

Manuel Pais 六月 26, 2013

跨终端的web

舒文亮 六月 26, 2013

语言 & 开发

Juergen Fesslmeier谈端到端的JavaScript开发

MobileCloud for TFS支持测试Windows Phone,Android,iOS及BlackBerry应用

百度技术沙龙第39期回顾:前端快速开发实践(含资料下载)

架构 & 设计

内存与本机代码的性能

设计模式自动化

连接设备编程 过程 & 实践

成功的根本—集成的ALM工具

ThoughtWorks全球CEO郭晓谈软件人才的招聘与培养

书评:验收测试驱动开发实践指南

运维 & 基础架构

在传统企业中引入DevOps

安全性——“DevOpS”中的S

书评:验收测试驱动开发实践指南 企业架构

设计指尖上的世界:移动用户界面一瞥

Stratos 2.0已发布,支持所有运行时环境和30个IaaS

让1.5亿移动端用户第一时间获取消息

语言 & 开发

Juergen Fesslmeier谈端到端的JavaScript开发

MobileCloud for TFS支持测试Windows Phone,Android,iOS及BlackBerry应用

百度技术沙龙第39期回顾:前端快速开发实践(含资料下载) 架构 & 设计

内存与本机代码的性能

设计模式自动化

连接设备编程

过程 & 实践

成功的根本—集成的ALM工具

ThoughtWorks全球CEO郭晓谈软件人才的招聘与培养

书评:验收测试驱动开发实践指南 运维 & 基础架构

在传统企业中引入DevOps

安全性——“DevOpS”中的S

书评:验收测试驱动开发实践指南

企业架构

设计指尖上的世界:移动用户界面一瞥

Stratos 2.0已发布,支持所有运行时环境和30个IaaS

让1.5亿移动端用户第一时间获取消息 Close E-mail 密码

使用Google账号登录 使用Microsoft账号登录 忘记密码? InfoQ账号使用的E-mail 发送邮件

重新登录 重新发送激活信息 重新发送

重新登录 没有用户名?

点击注册 ")")")")")")")")

深入浅出 Java Concurrency (4)

Posted on

深入浅出 Java Concurrency (4): 并发容器

从这一节开始正式进入并发容器的部分,来看看JDK 6带来了哪些并发容器。

在JDK 1.4以下只有Vector和Hashtable是线程安全的集合(也称并发容器,Collections.synchronized/*系列也可以看作是线程安全的实现)。从JDK 5开始增加了线程安全的Map接口ConcurrentMap和线程安全的队列BlockingQueue(尽管Queue也是同时期引入的新的集合,但是规范并没有规定一定是线程安全的,事实上一些实现也不是线程安全的,比如PriorityQueue、ArrayDeque、LinkedList等,在Queue章节中会具体讨论这些队列的结构图和实现)。

在介绍ConcurrencyMap之前先来回顾下Map的体系结构。下图描述了Map的体系结构,其中蓝色字体的是JDK 5以后新增的并发容器。

image

针对上图有以下几点说明:

  1. Hashtable是JDK 5之前Map唯一线程安全的内置实现(Collections.synchronizedMap不算)。特别说明的是Hashtable的t是小写的(不知道为啥),Hashtable继承的是Dictionary(Hashtable是其唯一公开的子类),并不继承AbstractMap或者HashMap。尽管Hashtable和HashMap的结构非常类似,但是他们之间并没有多大联系。
  2. ConcurrentHashMap是HashMap的线程安全版本,ConcurrentSkipListMap是TreeMap的线程安全版本。
  3. 最终可用的线程安全版本Map实现是ConcurrentHashMap/ConcurrentSkipListMap/Hashtable/Properties四个,但是Hashtable是过时的类库,因此如果可以的应该尽可能的使用ConcurrentHashMap和ConcurrentSkipListMap。

回到正题来,这个小节主要介绍ConcurrentHashMap的API以及应用,下一节才开始将原理和分析。

ConcurrentMap API

除了实现Map接口里面对象的方法外,ConcurrentHashMap还实现了ConcurrentMap里面的四个方法。

V putIfAbsent(K key,V value)

如果不存在key对应的值,则将value以key加入Map,否则返回key对应的旧值。这个等价于清单1 的操作:

清单1 putIfAbsent的等价操作 if (!map.containsKey(key)) return map.put(key, value); else return map.get(key);

在前面的章节中提到过,连续两个或多个原子操作的序列并不一定是原子操作。比如上面的操作即使在Hashtable中也不是原子操作。而putIfAbsent就是一个线程安全版本的操作的。

有些人喜欢用这种功能来实现单例模式,例如清单2。

清单2 一种单例模式的实现 package xylz.study.concurrency;

import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap;

public class ConcurrentDemo1 {

private static final ConcurrentMap<String, ConcurrentDemo1> map = new ConcurrentHashMap<String, ConcurrentDemo1>();
private static ConcurrentDemo1 instance;
public static ConcurrentDemo1 getInstance() {
    if (instance == null) {

        map.putIfAbsent("INSTANCE", new ConcurrentDemo1());

        instance = map.get("INSTANCE");
    }
    return instance;
}

private ConcurrentDemo1() {
}

}

当然这里只是一个操作的例子,实际上在单例模式文章中有很多的实现和比较。清单2 在存在大量单例的情况下可能有用,实际情况下很少用于单例模式。但是这个方法避免了向Map中的同一个Key提交多个结果的可能,有时候在去掉重复记录上很有用(如果记录的格式比较固定的话)。

boolean remove(Object key,Object value)

只有目前将键的条目映射到给定值时,才移除该键的条目。这等价于清单3 的操作。

清单3 remove(Object,Object)的等价操作 if (map.containsKey(key) && map.get(key).equals(value)) { map.remove(key); return true; } return false;

由于集合类通常比较的hashCode和equals方法,而这两个方法是在Object对象里面,因此两个对象如果hashCode一致,并且覆盖了equals方法后也一致,那么这两个对象在集合类里面就是“相同”的,不管是否是同一个对象或者同一类型的对象。也就是说只要key1.hashCode()==key2.hashCode() && key1.equals(key2),那么key1和key2在集合类里面就认为是一致,哪怕他们的Class类型不一致也没关系,所以在很多集合类里面允许通过Object来类型来比较(或者定位)。比如说Map尽管添加的时候只能通过制定的类型,但是删除的时候却允许通过一个Object来操作,而不必是K类型。

既然Map里面有一个remove(Object)方法,为什么ConcurrentMap还需要remove(Object,Object)方法呢?这是因为尽管Map里面的key没有变化,但是value可能已经被其他线程修改了,如果修改后的值是我们期望的,那么我们就不能拿一个key来删除此值,尽管我们的期望值是删除此key对于的旧值。

这种特性在原子操作章节的AtomicMarkableReferenceAtomicStampedReference里面介绍过。

boolean replace(K key,V oldValue,V newValue)

只有目前将键的条目映射到给定值时,才替换该键的条目。这等价于清单4 的操作。

清单4 replace(K,V,V)的等价操作 if (map.containsKey(key) && map.get(key).equals(oldValue)) { map.put(key, newValue); return true; } return false;

V replace(K key,V value)

只有当前键存在的时候更新此键对于的值。这等价于清单5 的操作。

清单5 replace(K,V)的等价操作 if (map.containsKey(key)) { return map.put(key, value); } return null;

replace(K,V,V)相比replace(K,V)而言,就是增加了匹配oldValue的操作。

其实这4个扩展方法,是ConcurrentMap附送的四个操作,其实我们更关心的是Map本身的操作。当然如果没有这4个方法,要完成类似的功能我们可能需要额外的锁,所以有总比没有要好。比如清单6,如果没有putIfAbsent内置的方法,我们如果要完成此操作就需要完全锁住整个Map,这样就大大降低了ConcurrentMap的并发性。这在下一节中有详细的分析和讨论。

清单6 putIfAbsent的外部实现 public V putIfAbsent(K key, V value) { synchronized (map) { if (!map.containsKey(key)) return map.put(key, value); return map.get(key); } }

参考资料:

来源: [http://www.blogjava.net/xylz/archive/2010/07/19/326527.html](http://www.blogjava.net/xylz/archive/2010/07/19/326527.html)

本来想比较全面和深入的谈谈ConcurrentHashMap的,发现网上有很多对HashMap和ConcurrentHashMap分析的文章,因此本小节尽可能的分析其中的细节,少一点理论的东西,多谈谈内部设计的原理和思想。

要谈ConcurrentHashMap的构造,就不得不谈HashMap的构造,因此先从HashMap开始简单介绍。


HashMap原理

我们从头开始设想。要将对象存放在一起,如何设计这个容器。目前只有两条路可以走,一种是采用分格技术,每一个对象存放于一个格子中,这样通过对格子的编号就能取到或者遍历对象;另一种技术就是采用串联的方式,将各个对象串联起来,这需要各个对象至少带有下一个对象的索引(或者指针)。显然第一种就是数组的概念,第二种就是链表的概念。所有的容器的实现其实都是基于这两种方式的,不管是数组还是链表,或者二者俱有。HashMap采用的就是数组的方式。

有了存取对象的容器后还需要以下两个条件才能完成Map所需要的条件。

  • 能够快速定位元素:Map的需求就是能够根据一个查询条件快速得到需要的结果,所以这个过程需要的就是尽可能的快。
  • 能够自动扩充容量:显然对于容器而然,不需要人工的去控制容器的容量是最好的,这样对于外部使用者来说越少知道底部细节越好,不仅使用方便,也越安全。

首先条件1,快速定位元素。快速定位元素属于算法和数据结构的范畴,通常情况下哈希(Hash)算法是一种简单可行的算法。所谓哈希算法,是将任意长度的二进制值映射为固定长度的较小二进制值。常见的MD2,MD4,MD5,SHA-1等都属于Hash算法的范畴。具体的算法原理和介绍可以参考相应的算法和数据结构的书籍,但是这里特别提醒一句,由于将一个较大的集合映射到一个较小的集合上,所以必然就存在多个元素映射到同一个元素上的结果,这个叫“碰撞”,后面会用到此知识,暂且不表。

条件2,如果满足了条件1,一个元素映射到了某个位置,现在一旦扩充了容量,也就意味着元素映射的位置需要变化。因为对于Hash算法来说,调整了映射的小集合,那么原来映射的路径肯定就不复存在,那么就需要对现有重新计算映射路径,也就是所谓的rehash过程。

好了有了上面的理论知识后来看HashMap是如何实现的。

在HashMap中首先由一个对象数组table是不可避免的,修饰符transient只是表示序列号的时候不被存储而已。size描述的是Map中元素的大小,threshold描述的是达到指定元素个数后需要扩容,loadFactor是扩容因子(loadFactor>0),也就是计算threshold的。那么元素的容量就是table.length,也就是数组的大小。换句话说,如果存取的元素大小达到了整个容量(table.length)的loadFactor倍(也就是table.length/*loadFactor个),那么就需要扩充容量了。在HashMap中每次扩容就是将扩大数组的一倍,使数组大小为原来的两倍。

HashMap数据结构

然后接下来看如何将一个元素映射到数组table中。显然要映射的key是一个无尽的超大集合,而table是一个较小的有限集合,那么一种方式就是将key编码后的hashCode值取模映射到table上,这样看起来不错。但是在Java中采用了一种更高效的办法。由于与(&)是比取模(%)更高效的操作,因此Java中采用hash值与数组大小-1后取与来确定数组索引的。为什么这样做是更有效的?参考资料7对这一块进行非常详细的分析,这篇文章的作者非常认真,也非常仔细的分析了里面包含的思想。

清单1 indexFor片段 static int indexFor(int h, int length) { return h & (length-1); }

前面说明,既然是大集合映射到小集合上,那么就必然存在“碰撞”,也就是不同的key映射到了相同的元素上。那么HashMap是怎么解决这个问题的?

在HashMap中采用了下面方式,解决了此问题。

  1. 同一个索引的数组元素组成一个链表,查找允许时循环链表找到需要的元素。
  2. 尽可能的将元素均匀的分布在数组上。

Map.Entry结构对于问题1,HashMap采用了上图的一种数据结构。table中每一个元素是一个Map.Entry,其中Entry包含了四个数据,key,value,hash,next。key和value是存储的数据;hash是元素key的Hash后的表现形式(最终要映射到数组上),这里链表上所有元素的hash经过清单1 的indexFor后将得到相同的数组索引;next是指向下一个元素的索引,同一个链表上的元素就是通过next串联起来的。

再来看问题2 尽可能的将元素均匀的分布在数组上这个问题是怎么解决的。首先清单2 是将key的hashCode经过一系列的变换,使之更符合小数据集合的散列模型。

清单2 hashCode的二次散列 static int hash(int h) { // This function ensures that hashCodes that differ only by // constant multiples at each bit position have a bounded // number of collisions (approximately 8 at default load factor). h ^= (h >>> 20) ^ (h >>> 12); return h ^ (h >>> 7) ^ (h >>> 4); }

至于清单2 为什么这样散列我没有找到依据,也没有什么好的参考资料。参考资料1 分析了此过程,认为是一种比较有效的方式,有兴趣的可以研究下。

第二点就是在清单1 的描述中,尽可能的与数组的长度减1的数与操作,使之分布均匀。这在参考资料7 中有介绍。

第三点就是构造数组时数组的长度是2的倍数。清单3 反映了这个过程。为什么要是2的倍数?在参考资料7 中分析说是使元素尽可能的分布均匀。

清单3 HashMap 构造数组 // Find a power of 2 >= initialCapacity int capacity = 1; while (capacity < initialCapacity) capacity <<= 1;

this.loadFactor = loadFactor; threshold = (int)(capacity /* loadFactor); table = new Entry[capacity];

另外loadFactor的默认值0.75和capacity的默认值16是经过大量的统计分析得出的,很久以前我见过相关的数据分析,现在找不到了,有兴趣的可以查询相关资料。这里不再叙述了。

有了上述原理后再来分析HashMap的各种方法就不是什么问题的。

清单4 HashMap的get操作 public V get(Object key) { if (key == null) return getForNullKey(); int hash = hash(key.hashCode()); for (Entry e = table[indexFor(hash, table.length)]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) return e.value; } return null; }

清单4 描述的是HashMap的get操作,在这个操作中首先判断key是否为空,因为为空的话总是映射到table的第0个元素上(可以看上面的清单2和清单1)。然后就需要查找table的索引。一旦找到对应的Map.Entry元素后就开始遍历此链表。由于不同的hash可能映射到同一个table[index]上,而相同的key却同时映射到相同的hash上,所以一个key和Entry对应的条件就是hash(key)==e.hash 并且key.equals(e.key)。从这里我们看到,Object.hashCode()只是为了将相同的元素映射到相同的链表上(Map.Entry),而Object.equals()才是比较两个元素是否相同的关键!这就是为什么总是成对覆盖hashCode()和equals()的原因。

清单5 HashMap的put操作 public V put(K key, V value) { if (key == null) return putForNullKey(value); int hash = hash(key.hashCode()); int i = indexFor(hash, table.length); for (Entry e = table[i]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } }

modCount++;
addEntry(hash, key, value, i);
return null;

} void addEntry(int hash, K key, V value, int bucketIndex) { Entry e = table[bucketIndex]; table[bucketIndex] = new Entry(hash, key, value, e); if (size++ >= threshold) resize(2 /* table.length); }

清单5 描述的是HashMap的put操作。对比get操作,可以发现,put实际上是先查找,一旦找到key对应的Entry就直接修改Entry的value值,否则就增加一个元素。增加的元素是在链表的头部,也就是占据table中的元素,如果table中对应索引原来有元素的话就将整个链表添加到新增加的元素的后面。也就是说新增加的元素再次查找的话是优于在它之前添加的同一个链表上的元素。这里涉及到就是扩容,也就是一旦元素的个数达到了扩容因子规定的数量(threhold=table.length/*loadFactor),就将数组扩大一倍。

清单6 HashMap扩容过程 void resize(int newCapacity) { Entry[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity == MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return; }

Entry[] newTable = new Entry[newCapacity];
transfer(newTable);
table = newTable;
threshold = (int)(newCapacity /* loadFactor);

}

void transfer(Entry[] newTable) { Entry[] src = table; int newCapacity = newTable.length; for (int j = 0; j < src.length; j++) { Entry e = src[j]; if (e != null) { src[j] = null; do { Entry next = e.next; int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } while (e != null); } } }

清单6 描述的是HashMap扩容的过程。可以看到扩充过程会导致元素数据的所有元素进行重新hash计算,这个过程也叫rehash。显然这是一个非常耗时的过程,否则扩容都会导致所有元素重新计算hash。因此尽可能的选择合适的初始化大小是有效提高HashMap效率的关键。太大了会导致过多的浪费空间,太小了就可能会导致繁重的rehash过程。在这个过程中loadFactor也可以考虑。

举个例子来说,如果要存储1000个元素,采用默认扩容因子0.75,那么1024显然是不够的,因为1000>0.75/*1024了,所以选择2048是必须的,显然浪费了1048个空间。如果确定最多只有1000个元素,那么扩容因子为1,那么1024是不错的选择。另外需要强调的一点是扩容因此越大,从统计学角度讲意味着链表的长度就也大,也就是在查找元素的时候就需要更多次的循环。所以凡事必然是一个平衡的过程。

这里可能有人要问题,一旦我将Map的容量扩大后(也就是数组的大小),这个容量还能减小么?比如说刚开始Map中可能有10000个元素,运行一旦时间以后Map的大小永远不会超过10个,那么Map的容量能减小到10个或者16个么?答案就是不能,这个capacity一旦扩大后就不能减小了,只能通过构造一个新的Map来控制capacity了。

HashMap的几个内部迭代器也是非常重要的,这里限于篇幅就不再展开了,有兴趣的可以自己研究下。

Hashtable的原理和HashMap的原理几乎一样,所以就不讨论了。另外LinkedHashMap是在Map.Entry的基础上增加了before/after两个双向索引,用来将所有Map.Entry串联起来,这样就可以遍历或者做LRU Cache等。这里也不再展开讨论了。

memcached 内部数据结构就是采用了HashMap类似的思想来实现的,有兴趣的可以参考资料8,9,10。

为了不使这篇文章过长,因此将ConcurrentHashMap的原理放到下篇讲。需要说明的是,尽管ConcurrentHashMap与HashMap的名称有些渊源,而且实现原理有些相似,但是为了更好的支持并发,ConcurrentHashMap在内部也有一些比较大的调整,这个在下篇会具体介绍。

参考资料:

  1. HashMap hash方法分析
  2. 通过分析 JDK 源代码研究 Hash 存储机制
  3. Java 理论与实践: 哈希
  4. Java 理论与实践: 构建一个更好的 HashMap
  5. jdk1.6 ConcurrentHashMap
  6. ConcurrentHashMap之实现细节
  7. 深入理解HashMap
  8. memcached-数据结构
  9. memcached存储管理 数据结构
  10. memcached 来源: [http://www.blogjava.net/xylz/archive/2010/07/20/326584.html](http://www.blogjava.net/xylz/archive/2010/07/20/326584.html)

在上一篇中介绍了HashMap的原理,这一节是ConcurrentMap的最后一节,所以会完整的介绍ConcurrentHashMap的实现。

ConcurrentHashMap原理

读写锁章节部分介绍过一种是用读写锁实现Map的方法。此种方法看起来可以实现Map响应的功能,而且吞吐量也应该不错。但是通过前面对读写锁原理的分析后知道,读写锁的适合场景是读操作>>写操作,也就是读操作应该占据大部分操作,另外读写锁存在一个很严重的问题是读写操作不能同时发生。要想解决读写同时进行问题(至少不同元素的读写分离),那么就只能将锁拆分,不同的元素拥有不同的锁,这种技术就是“锁分离”技术。

默认情况下ConcurrentHashMap是用了16个类似HashMap 的结构,其中每一个HashMap拥有一个独占锁。也就是说最终的效果就是通过某种Hash算法,将任何一个元素均匀的映射到某个HashMap的Map.Entry上面,而对某个一个元素的操作就集中在其分布的HashMap上,与其它HashMap无关。这样就支持最多16个并发的写操作。

image

上图就是ConcurrentHashMap的类图。参考上面的说明和HashMap的原理分析,可以看到ConcurrentHashMap将整个对象列表分为segmentMask+1个片段(Segment)。其中每一个片段是一个类似于HashMap的结构,它有一个HashEntry的数组,数组的每一项又是一个链表,通过HashEntry的next引用串联起来。

这个类图上面的数据结构的定义非常有学问,接下来会一个个有针对性的分析。

首先如何从ConcurrentHashMap定位到HashEntry。在HashMap的原理分析部分说过,对于一个Hash的数据结构来说,为了减少浪费的空间和快速定位数据,那么就需要数据在Hash上的分布比较均匀。对于一次Map的查找来说,首先就需要定位到Segment,然后从过Segment定位到HashEntry链表,最后才是通过遍历链表得到需要的元素。

在不讨论并发的前提下先来讨论如何定位到HashEntry的。在ConcurrentHashMap中是通过hash(key.hashCode())和segmentFor(hash)来得到Segment的。清单1 描述了如何定位Segment的过程。其中hash(int)是将key的hashCode进行二次编码,使之能够在segmentMask+1个Segment上均匀分布(默认是16个)。可以看到的是这里和HashMap还是有点不同的,这里采用的算法叫Wang/Jenkins hash,有兴趣的可以参考资料1参考资料2。总之它的目的就是使元素能够均匀的分布在不同的Segment上,这样才能够支持最多segmentMask+1个并发,这里segmentMask+1是segments的大小。

清单1 定位Segment private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } final Segment segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }

显然在不能够对Segment扩容的情况下,segments的大小就应该是固定的。所以在ConcurrentHashMap中segments/segmentMask/segmentShift都是常量,一旦初始化后就不能被再次修改,其中segmentShift是查找Segment的一个常量偏移量。

有了Segment以后再定位HashEntry就和HashMap中定位HashEntry一样了,先将hash值与Segment中HashEntry的大小减1进行与操作定位到HashEntry链表,然后遍历链表就可以完成相应的操作了。

能够定位元素以后ConcurrentHashMap就已经具有了HashMap的功能了,现在要解决的就是如何并发的问题。要解决并发问题,加锁是必不可免的。再回头看Segment的类图,可以看到Segment除了有一个volatile类型的元素大小count外,Segment还是集成自ReentrantLock的。另外在前面的原子操作和锁机制中介绍过,要想最大限度的支持并发,那么能够利用的思路就是尽量读操作不加锁,写操作不加锁。如果是读操作不加锁,写操作加锁,对于竞争资源来说就需要定义为volatile类型的。volatile类型能够保证happens-before法则,所以volatile能够近似保证正确性的情况下最大程度的降低加锁带来的影响,同时还与写操作的锁不产生冲突。

同时为了防止在遍历HashEntry的时候被破坏,那么对于HashEntry的数据结构来说,除了value之外其他属性就应该是常量,否则不可避免的会得到ConcurrentModificationException。这就是为什么HashEntry数据结构中key,hash,next是常量的原因(final类型)。

有了上面的分析和条件后再来看Segment的get/put/remove就容易多了。

get操作


清单2 Segment定位元素 V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; } HashEntry getFirst(int hash) { HashEntry[] tab = table; return tab[hash & (tab.length - 1)]; }

V readValueUnderLock(HashEntry e) { lock(); try { return e.value; } finally { unlock(); } }

清单2 描述的是Segment如何定位元素。首先判断Segment的大小count>0,Segment的大小描述的是HashEntry不为空(key不为空)的个数。如果Segment中存在元素那么就通过getFirst定位到指定的HashEntry链表的头节点上,然后遍历此节点,一旦找到key对应的元素后就返回其对应的值。但是在清单2 中可以看到拿到HashEntry的value后还进行了一次判断操作,如果为空还需要加锁再读取一次(readValueUnderLock)。为什么会有这样的操作?尽管ConcurrentHashMap不允许将value为null的值加入,但现在仍然能够读到一个为空的value就意味着此值对当前线程还不可见(这是因为HashEntry还没有完全构造完成就赋值导致的,后面还会谈到此机制)。

put操作


清单3 描述的是Segment的put操作。首先就需要加锁了,修改一个竞争资源肯定是要加锁的,这个毫无疑问。需要说明的是Segment集成的是ReentrantLock,所以这里加的锁也就是独占锁,也就是说同一个Segment在同一时刻只有能一个put操作。

接下来来就是检查是否需要扩容,这和HashMap一样,如果需要的话就扩大一倍,同时进行rehash操作。

查找元素就和get操作是一样的,得到元素就直接修改其值就好了。这里onlyIfAbsent只是为了实现ConcurrentMap的putIfAbsent操作而已。需要说明以下几点:

  • 如果找到key对于的HashEntry后直接修改就好了,如果找不到那么就需要构造一个新的HashEntry出来加到hash对于的HashEntry的头部,同时就的头部就加到新的头部后面。这是因为HashEntry的next是final类型的,所以只能修改头节点才能加元素加入链表中。
  • 如果增加了新的操作后,就需要将count+1写回去。前面说过count是volatile类型,而读取操作没有加锁,所以只能把元素真正写回Segment中的时候才能修改count值,这个要放到整个操作的最后。
  • 在将新的HashEntry写入table中时是通过构造函数来设置value值的,这意味对table的赋值可能在设置value之前,也就是说得到了一个半构造完的HashEntry。这就是重排序可能引起的问题。所以在读取操作中,一旦读到了一个value为空的value是就需要加锁重新读取一次。为什么要加锁?加锁意味着前一个写操作的锁释放,也就是前一个锁的数据已经完成写完了了,根据happens-before法则,前一个写操作的结果对当前读线程就可见了。当然在JDK 6.0以后不一定存在此问题。
  • 在Segment中table变量是volatile类型,多次读取volatile类型的开销要不非volatile开销要大,而且编译器也无法优化,所以在put操作中首先建立一个临时变量tab指向table,多次读写tab的效率要比volatile类型的table要高,JVM也能够对此进行优化。

清单3 Segment的put操作 V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; if (c++ > threshold) // ensure capacity rehash(); HashEntry[] tab = table; int index = hash & (tab.length - 1); HashEntry first = tab[index]; HashEntry e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next;

    V oldValue;
    if (e != null) {
        oldValue = e.value;
        if (!onlyIfAbsent)
            e.value = value;
    }
    else {
        oldValue = null;
        ++modCount;
        tab[index] = new HashEntry<K,V>(key, hash, first, value);
        count = c; // write-volatile
    }
    return oldValue;
} finally {
    unlock();
}

}

remove 操作

清单4 描述了Segment删除一个元素的过程。同put一样,remove也需要加锁,这是因为对table可能会有变更。由于HashEntry的next节点是final类型的,所以一旦删除链表中间一个元素,就需要将删除之前或者之后的元素重新加入新的链表。而Segment采用的是将删除元素之前的元素一个个重新加入删除之后的元素之前(也就是链表头结点)来完成新链表的构造。

清单4 Segment的remove操作 V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry[] tab = table; int index = hash & (tab.length - 1); HashEntry first = tab[index]; HashEntry e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next;

    V oldValue = null;
    if (e != null) {
        V v = e.value;
        if (value == null || value.equals(v)) {
            oldValue = v;
            // All entries following removed node can stay
            // in list, but all preceding ones need to be
            // cloned.
            ++modCount;
            HashEntry<K,V> newFirst = e.next;
            for (HashEntry<K,V> p = first; p != e; p = p.next)
                newFirst = new HashEntry<K,V>(p.key, p.hash,
                                              newFirst, p.value);
            tab[index] = newFirst;
            count = c; // write-volatile
        }
    }
    return oldValue;
} finally {
    unlock();
}

}

下面的示意图描述了如何删除一个已经存在的元素的。假设我们要删除B3元素。首先定位到B3所在的Segment,然后再定位到Segment的table中的B1元素,也就是Bx所在的链表。然后遍历链表找到B3,找到之后就从头结点B1开始构建新的节点B1(蓝色)加到B4的前面,继续B1后面的节点B2构造B2(蓝色),加到由蓝色的B1和B4构成的新的链表。继续下去,直到遇到B3后终止,这样就构造出来一个新的链表B2(蓝色)->B1(蓝色)->B4->B5,然后将此链表的头结点B2(蓝色)设置到Segment的table中。这样就完成了元素B3的删除操作。需要说明的是,尽管就的链表仍然存在(B1->B2->B3->B4->B5),但是由于没有引用指向此链表,所以此链表中无引用的(B1->B2->B3)最终会被GC回收掉。这样做的一个好处是,如果某个读操作在删除时已经定位到了旧的链表上,那么此操作仍然将能读到数据,只不过读取到的是旧数据而已,这在多线程里面是没有问题的。

imageimage

除了对单个元素操作外,还有对全部的Segment的操作,比如size()操作等。

size操作

size操作涉及到统计所有Segment的大小,这样就会遍历所有的Segment,如果每次加锁就会导致整个Map都被锁住了,任何需要锁的操作都将无法进行。这里用到了一个比较巧妙的方案解决此问题。

在Segment中有一个变量modCount,用来记录Segment结构变更的次数,结构变更包括增加元素和删除元素,每增加一个元素操作就+1,每进行一次删除操作+1,每进行一次清空操作(clear)就+1。也就是说每次涉及到元素个数变更的操作modCount都会+1,而且一直是增大的,不会减小。

遍历两次ConcurrentHashMap中的segments,每次遍历是记录每一个Segment的modCount,比较两次遍历的modCount值的和是否相同,如果相同就返回在遍历过程中获取的Segment的count的和,也就是所有元素的个数。如果不相同就重复再做一次。重复一次还不相同就将所有Segment锁住,一个一个的获取其大小(count),最后将这些count加起来得到总的大小。当然了最后需要将锁一一释放。清单5 描述了这个过程。

这里有一个比较高级的话题是为什么在读取modCount的时候总是先要读取count一下。为什么不是先读取modCount然后再读取count的呢?也就是说下面的两条语句能否交换下顺序? sum += segments[i].count; mcsum += mc[i] = segments[i].modCount;

答案是不能!为什么?这是因为modCount总是在加锁的情况下才发生变化,所以不会发生多线程同时修改的情况,也就是没必要时volatile类型。另外总是在count修改的情况下修改modCount,而count是一个volatile变量。于是这里就充分利用了volatile的特性。

根据happens-before法则,第(3)条:对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作。也就是说一个操作C在volatile字段的写操作之后,那么volatile写操作之前的所有操作都对此操作C可见。所以修改modCount总是在修改count之前,也就是说如果读取到了一个count的值,那么在count变化之前的modCount也就能够读取到,换句话说就是如果看到了count值的变化,那么就一定看到了modCount值的变化。而如果上面两条语句交换下顺序就无法保证这个结果一定存在了。

在ConcurrentHashMap.containsValue中,可以看到每次遍历segments时都会执行int c = segments[i].count;,但是接下来的语句中又不用此变量c,尽管如此JVM仍然不能将此语句优化掉,因为这是一个volatile字段的读取操作,它保证了一些列操作的happens-before顺序,所以是至关重要的。在这里可以看到: ConcurrentHashMap将volatile发挥到了极致!

另外isEmpty操作于size操作类似,不再累述。

清单5 ConcurrentHashMap的size操作 public int size() { final Segment[] segments = this.segments; long sum = 0; long check = 0; int[] mc = new int[segments.length]; // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { check = 0; sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { sum += segments[i].count; mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { check += segments[i].count; if (mc[i] != segments[i].modCount) { check = -1; // force retry break; } } } if (check == sum) break; } if (check != sum) { // Resort to locking all segments sum = 0; for (int i = 0; i < segments.length; ++i) segments[i].lock(); for (int i = 0; i < segments.length; ++i) sum += segments[i].count; for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } if (sum > Integer.MAX_VALUE) return Integer.MAX_VALUE; else return (int)sum; }

ConcurrentSkipListMap/Set

本来打算介绍下ConcurrentSkipListMap的,结果打开源码一看,彻底放弃了。那里面的数据结构和算法我估计研究一周也未必能够完全弄懂。很久以前我看TreeMap的时候就头大,想想那些复杂的“红黑二叉树”我头都大了。这些都归咎于从前没有好好学习《数据结构和算法》,现在再回头看这些复杂的算法感觉非常头疼,为了减少脑细胞的死亡,暂且还是不要惹这些“玩意儿”。有兴趣的可以看看参考资料4 中对TreeMap的介绍。

参考资料:

  1. Hash this
  2. Single-word Wang/Jenkins Hash in ConcurrentHashMap
  3. 指令重排序与happens-before法则
  4. 通过分析 JDK 源代码研究 TreeMap 红黑树算法实现

来源: [http://www.blogjava.net/xylz/archive/2010/07/20/326661.html](http://www.blogjava.net/xylz/archive/2010/07/20/326661.html)

Queue是JDK 5以后引入的新的集合类,它属于Java Collections Framework的成员,在Collection集合中和List/Set是同一级别的接口。通常来讲Queue描述的是一种FIFO的队列,当然不全都是,比如PriorityQueue是按照优先级的顺序(或者说是自然顺序,借助于Comparator接口)。

下图描述了Java Collections Framework中Queue的整个家族体系。

对于Queue而言是在Collection的基础上增加了offer/remove/poll/element/peek方法,另外重新定义了add方法。对于这六个方法,有不同的定义。


抛出异常

返回特殊值

操作描述 插入

add(e)

offer(e)

将元素加入到队列尾部 移除

remove()

poll()

移除队列头部的元素 检查

element()

peek()

返回队列头部的元素而不移除此元素

特别说明的是对于Queue而言,规范并没有规定是线程安全的,为了解决这个问题,引入了可阻塞的队列BlockingQueue。对于BlockingQueue而言所有操作的是线程安全的,并且队列的操作可以被阻塞,直到满足某种条件。Queue的另一个子接口Deque描述的是一个双向的队列。与Queue不同的是,Deque允许在队列的头部增加元素和在队列的尾部删除元素。也就是说Deque是一个双向队列。二者功能都有的队列就是BlockingDeque,这种阻塞队列允许在队列的头和尾部分别操作元素,应该说是Queue中功能最强大的实现。

image

在JDK 5之前LinkedList就已经存在,而且本身实现都是一种双向队列。所以到了JDK 5以后就将LinkedList同时实现Deque接口,这样LinkedList就又属于Queue的一部分了。

通常情况下Queue都是靠链表结构实现的,但是链表意味着有一些而外的引用开销,如果是双向链表开销就更大了。所以为了节省内存,一种方式就是使用固定大小的数组来实现队列。在这种情况下队列的大小是固定,元素的遍历通过数组的索引进行,很显然这是一种双向链表的模型。ArrayDeque就是这样一种实现。

另外ArrayBlockingQueue也是一种数组实现的队列,但是却没有改造成双向,仅仅实现了BlockingQueue的模型。理论上和ArrayDeque一样也应该容易改造成双向的实现。

PriorityQueue和PriorityBlockingQueue实现了一种排序的队列模型。这很类似与SortedSet,通过队列的Comparator接口或者Comparable元素来排序元素。这种情况下元素在队列中的出入就不是按照FIFO的形式,而是根据比较后的自然顺序来进行。

CocurrentLinkedQueue是一种线程安全却非阻塞的FIFO队列,这种队列通常实现起来比较简单,但是却很有效。在接下来的章节会详细的描述它。

SynchronousQueue是一种特别的BlockingQueue,它只是把一个add/offer操作的元素直接移交给remove/take操作。也就是说它本身不会缓存任何元素,所以严格意义上说来讲并不是一种真正的队列。此队列维护一个线程列表,这些线程等待从队列中加入元素或者移除元素。简单的说,至少有一个remove/take操作时add/offer操作才能成功,同样至少有一个add/offer操作时remove/take操作才能成功。这是一种双向等待的队列模型,出队列等待加入等列,而入队列又等待出队列。这种队列的好处在于能够最大线程的保持吞吐量却又是线程安全的。所以对于一个需要快速处理的任务队列,SynchronousQueue是一个不错的选择。

BlockingQueue还有一种实现DelayQueue,这种实现允许每一个元素(Delayed)带有一个延时时间,当调用take/poll的时候会检测队列头元素这个时间是否<=0,如果满足就是说已经超时了,那么此元素就可以被移除了,否则就会等待。特别说明的是这个头元素应该是最先被超时的元素(这个时间是绝对时间)。这个类设计很巧妙,被用于ScheduledFutureTask来进行定时操作。希望后面会开辟一个章节讲讲这里面的想法。实在不行在讲线程池部分肯定会提到这个。 来源: [http://www.blogjava.net/xylz/archive/2010/07/21/326723.html](http://www.blogjava.net/xylz/archive/2010/07/21/326723.html)

ConcurrentLinkedQueue是Queue的一个线程安全实现。先来看一段文档说明。

一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。

由于ConcurrentLinkedQueue只是简单的实现了一个队列Queue,因此从API的角度讲,没有多少值的介绍,使用起来也很简单,和前面遇到的所有FIFO队列都类似。出队列只能操作头节点,入队列只能操作尾节点,任意节点操作就需要遍历完整的队列。

重点放在解释ConcurrentLinkedQueue的原理和实现上。

在继续探讨之前,结合前面线程安全的相关知识,我来分析设计一个线程安全的队列哪几种方法。

第一种:使用synchronized同步队列,就像Vector或者Collections.synchronizedList/Collection那样。显然这不是一个好的并发队列,这会导致吞吐量急剧下降。

第二种:使用Lock。一种好的实现方式是使用ReentrantReadWriteLock来代替ReentrantLock提高读取的吞吐量。但是显然ReentrantReadWriteLock的实现更为复杂,而且更容易导致出现问题,另外也不是一种通用的实现方式,因为ReentrantReadWriteLock适合哪种读取量远远大于写入量的场合。当然了ReentrantLock是一种很好的实现,结合Condition能够很方便的实现阻塞功能,这在后面介绍BlockingQueue的时候会具体分析。

第三种:使用CAS操作。尽管Lock的实现也用到了CAS操作,但是毕竟是间接操作,而且会导致线程挂起。一个好的并发队列就是采用某种非阻塞算法来取得最大的吞吐量。

ConcurrentLinkedQueue采用的就是第三种策略。它采用了参考资料1 中的算法。

在锁机制中谈到过,要使用非阻塞算法来完成队列操作,那么就需要一种“循环尝试”的动作,就是循环操作队列,直到成功为止,失败就会再次尝试。这在前面的章节中多次介绍过。

针对各种功能深入分析。

在开始之前先介绍下ConcurrentLinkedQueue的数据结构。

image在上面的数据结构中,ConcurrentLinkedQueue只有头结点、尾节点两个元素,而对于一个节点Node而言除了保存队列元素item外,还有一个指向下一个节点的引用next。 看起来整个数据结构还是比较简单的。但是也有几点是需要说明:

  1. 所有结构(head/tail/item/next)都是volatile类型。 这是因为ConcurrentLinkedQueue是非阻塞的,所以只有volatile才能使变量的写操作对后续读操作是可见的(这个是有happens-before法则保证的)。同样也不会导致指令的重排序。
  2. 所有结构的操作都带有原子操作,这是由AtomicReferenceFieldUpdater保证的,这在原子操作中介绍过。它能保证需要的时候对变量的修改操作是原子的。
  3. 由于队列中任何一个节点(Node)只有下一个节点的引用,所以这个队列是单向的,根据FIFO特性,也就是说出队列在头部(head),入队列在尾部(tail)。头部保存有进入队列最长时间的元素,尾部是最近进入的元素。
  4. 没有对队列长度进行计数,所以队列的长度是无限的,同时获取队列的长度的时间不是固定的,这需要遍历整个队列,并且这个计数也可能是不精确的。
  5. 初始情况下队列头和队列尾都指向一个空节点,但是非null,这是为了方便操作,不需要每次去判断head/tail是否为空。但是head却不作为存取元素的节点,tail在不等于head情况下保存一个节点元素。也就是说head.item这个应该一直是空,但是tail.item却不一定是空(如果head!=tail,那么tail.item!=null)。

对于第5点,可以从ConcurrentLinkedQueue的初始化中看到。这种头结点也叫“伪节点”,也就是说它不是真正的节点,只是一标识,就像c中的字符数组后面的\0以后,只是用来标识结束,并不是真正字符数组的一部分。 private transient volatile Node head = new Node(null, null); private transient volatile Node tail = head;

有了上述5点再来解释相关API操作就容易多了。

在上一节中列出了add/offer/remove/poll/element/peek等价方法的区别,所以这里就不再重复了。

清单1 入队列操作 public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node n = new Node(e, null); for (;;) { Node t = tail; Node s = t.getNext(); if (t == tail) { if (s == null) { if (t.casNext(s, n)) { casTail(t, n); return true; } } else { casTail(t, s); } } } }

清单1 描述的是入队列的过程。整个过程是这样的。

  1. 获取尾节点t,以及尾节点的下一个节点s。如果尾节点没有被别人修改,也就是t==tail,进行2,否则进行1。
  2. 如果s不为空,也就是说此时尾节点后面还有元素,那么就需要把尾节点往后移,进行1。否则进行3。
  3. 修改尾节点的下一个节点为新节点,如果成功就修改尾节点,返回true。否则进行1。

从操作3中可以看到是先修改尾节点的下一个节点,然后才修改尾节点位置的,所以这才有操作2中为什么获取到的尾节点的下一个节点不为空的原因。

特别需要说明的是,对尾节点的tail的操作需要换成临时变量t和s,一方面是为了去掉volatile变量的可变性,另一方面是为了减少volatile的性能影响。

清单2 描述的出队列的过程,这个过程和入队列相似,有点意思。

头结点是为了标识队列起始,也为了减少空指针的比较,所以头结点总是一个item为null的非null节点。也就是说head!=null并且head.item==null总是成立。所以实际上获取的是head.next,一旦将头结点head设置为head.next成功就将新head的item设置为null。至于以前就的头结点h,h.item=null并且h.next为新的head,但是由于没有对h的引用,所以最终会被GC回收。这就是整个出队列的过程。

清单2 出队列操作 public E poll() { for (;;) { Node h = head; Node t = tail; Node first = h.getNext(); if (h == head) { if (h == t) { if (first == null) return null; else casTail(t, first); } else if (casHead(h, first)) { E item = first.getItem(); if (item != null) { first.setItem(null); return item; } // else skip over deleted item, continue loop, } } } }

另外对于清单3 描述的获取队列大小的过程,由于没有一个计数器来对队列大小计数,所以获取队列的大小只能通过从头到尾完整的遍历队列,显然这个代价是很大的。所以通常情况下ConcurrentLinkedQueue需要和一个AtomicInteger搭配才能获取队列大小。后面介绍的BlockingQueue正是使用了这种思想。

清单3 遍历队列大小 public int size() { int count = 0; for (Node p = first(); p != null; p = p.getNext()) { if (p.getItem() != null) { // Collections.size() spec says to max out if (++count == Integer.MAX_VALUE) break; } } return count; }

参考资料:

  1. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
  2. 多线程基础总结十一—ConcurrentLinkedQueue
  3. 对ConcurrentLinkedQueue进行的并发测试

来源: [http://www.blogjava.net/xylz/archive/2010/07/23/326934.html](http://www.blogjava.net/xylz/archive/2010/07/23/326934.html)

在《并发容器 part 4 并发队列与Queue简介》节中的类图中可以看到,对于Queue来说,BlockingQueue是主要的线程安全版本。这是一个可阻塞的版本,也就是允许添加/删除元素被阻塞,直到成功为止。

BlockingQueue相对于Queue而言增加了两个操作:put/take。下面是一张整理的表格。

image看似简单的API,非常有用。这在控制队列的并发上非常有好处。既然加入队列和移除队列能够被阻塞,这在实现生产者-消费者模型上就简单多了。

清单1 是生产者-消费者模型的一个例子。这个例子是一个真实的场景。服务端(ICE服务)接受客户端的请求(accept),请求计算此人的好友生日,然后将计算的结果存取缓存中(Memcache)中。在这个例子中采用了ExecutorService实现多线程的功能,尽可能的提高吞吐量,这个在后面线程池的部分会详细说明。目前就可以理解为new Thread(r).start()就可以了。另外这里阻塞队列使用的是LinkedBlockingQueue。

清单1 一个生产者-消费者例子 package xylz.study.concurrency;

import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque;

public class BirthdayService {

final int workerNumber;

final Worker[] workers;

final ExecutorService threadPool;

static volatile boolean running = true;

public BirthdayService(int workerNumber, int capacity) {
    if (workerNumber <= 0) throw new IllegalArgumentException();
    this.workerNumber = workerNumber;
    workers = new Worker[workerNumber];
    for (int i = 0; i < workerNumber; i++) {
        workers[i] = new Worker(capacity);
    }
    //
    boolean b = running;// kill the resorting
    threadPool = Executors.newFixedThreadPool(workerNumber);
    for (Worker w : workers) {
        threadPool.submit(w);
    }
}

Worker getWorker(int id) {
    return workers[id % workerNumber];

}

class Worker implements Runnable {

    final BlockingQueue<Integer> queue;

    public Worker(int capacity) {
        queue = new LinkedBlockingQueue<Integer>(capacity);
    }

    public void run() {
        while (true) {
            try {
                consume(queue.take());
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    void put(int id) {
        try {
            queue.put(id);
        } catch (InterruptedException e) {
            return;
        }
    }
}

public void accept(int id) {
    //accept client request
    getWorker(id).put(id);
}

protected void consume(int id) {
    //do the work
    //get the list of friends and save the birthday to cache
}

}

在清单1 中可以看到不管是put()还是get(),都抛出了一个InterruptedException。我们就从这里开始,为什么会抛出这个异常。

上一节中提到实现一个并发队列有三种方式。显然只有第二种 Lock 才能实现阻塞队列。在锁机制中提到过,Lock结合Condition就可以实现线程的阻塞,这在锁机制部分的很多工具中都详细介绍过,而接下来要介绍的LinkedBlockingQueue就是采用这种方式。

LinkedBlockingQueue 原理

image对比ConcurrentLinkedQueue的结构图,LinkedBlockingQueue多了两个ReentrantLock和两个Condition以及用于计数的AtomicInteger,显然这会导致LinkedBlockingQueue的实现有点复杂。对照此结构,有以下几点说明:

  1. 但是整体上讲,LinkedBlockingQueue和ConcurrentLinkedQueue的结构类似,都是采用头尾节点,每个节点指向下一个节点的结构,这表示它们在操作上应该类似。
  2. LinkedBlockingQueue引入了原子计数器count,这意味着获取队列大小size()已经是常量时间了,不再需要遍历队列。每次队列长度有变更时只需要修改count即可。
  3. 有了修改Node指向有了锁,所以不需要volatile特性了。既然有了锁Node的item为什么需要volatile在后面会详细分析,暂且不表。
  4. 引入了两个锁,一个入队列锁,一个出队列锁。当然同时有一个队列不满的Condition和一个队列不空的Condition。其实参照锁机制前面介绍过的生产者-消费者模型就知道,入队列就代表生产者,出队列就代表消费者。为什么需要两个锁?一个锁行不行?其实一个锁完全可以,但是一个锁意味着入队列和出队列同时只能有一个在进行,另一个必须等待其释放锁。而从ConcurrentLinkedQueue的实现原理来看,事实上head和last (ConcurrentLinkedQueue中是tail)是分离的,互相独立的,这意味着入队列实际上是不会修改出队列的数据的,同时出队列也不会修改入队列,也就是说这两个操作是互不干扰的。更通俗的将,这个锁相当于两个写入锁,入队列是一种写操作,操作head,出队列是一种写操作,操作tail。可见它们是无关的。但是并非完全无关,后面详细分析。

在没有揭示入队列和出队列过程前,暂且猜测下实现原理。

根据前面学到的锁机制原理结合ConcurrentLinkedQueue的原理,入队列的阻塞过程大概是这样的:

  1. 获取入队列的锁putLock,检测队列大小,如果队列已满,那么就挂起线程,等待队列不满信号notFull的唤醒。
  2. 将元素加入到队列尾部,同时修改队列尾部引用last。
  3. 队列大小加1。
  4. 释放锁putLock。
  5. 唤醒notEmpty线程(如果有挂起的出队列线程),告诉消费者,已经有了新的产品。

对比入队列,出队列的阻塞过程大概是这样的:

  1. 获取出队列的锁takeLock,检测队列大小,如果队列为空,那么就挂起线程,等待队列不为空notEmpty的唤醒。
  2. 将元素从头部移除,同时修改队列头部引用head。
  3. 队列大小减1。
  4. 释放锁takeLock。
  5. 唤醒notFull线程(如果有挂起的入队列线程),告诉生产者,现在还有空闲的空间。

下面来验证上面的过程。

入队列过程(put/offer)

清单2 阻塞的入队列过程 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { try { while (count.get() == capacity) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to a non-interrupted thread throw ie; } insert(e); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }

清单2 描述的是入队列的阻塞过程。可以看到和上面描述的入队列的过程基本相同。但是也有以下几个问题:

  1. 如果在入队列的时候线程被中断,那么就需要发出一个notFull的信号,表示下一个入队列的线程能够被唤醒(如果阻塞的话)。
  2. 入队列成功后如果队列不满需要补一个notFull的信号。为什么?队列不满的时候其它入队列的阻塞线程难道不知道么?有可能。这是因为为了减少上下文切换的次数,每次唤醒一个线程(不管是入队列还是出队列)都是只随机唤醒一个(notify),而不是唤醒所有的(notifyall())。这会导致其它阻塞的入队列线程不能够即使处理队列不满的情况。
  3. 如果队列不为空并且可能有一个元素的话就唤醒一个出队列线程。这么做说明之前队列一定为空,因为在加入队列之后队列最多只能为1,那么说明未加入之前是0,那么就可能有被阻塞的出队列线程,所以就唤醒一个出队列线程。特别说明的是为什么使用一个临时变量c,而不用count。这是因为读取一个count的开销比读取一个临时一个变量大,而此处c又能够完成确认队列最多只有一个元素的判断。首先c默认为-1,如果加入队列后获取原子计数器的结果为0,说明之前队列为空,不可能消费(出队列),也不可能入队列,因为此时锁还在当前线程上,那么加入一个后队列就不为空了,所以就可以安全的唤醒一个消费(出对立)线程。
  4. 入队列的过程允许被中断,所以总是抛出InterruptedException 异常。

针对第2点,特别补充说明下。本来这属于锁机制中条件队列的范围,由于没有应用场景,所以当时没有提。

前面提高notifyall总是比notify更可靠,因为notify可能丢失通知,为什么不适用notifyall呢?

先解释下notify丢失通知的问题。

notify丢失通知问题

假设线程A因为某种条件在条件队列中等待,同时线程B因为另外一种条件在同一个条件队列中等待,也就是说线程A/B都被同一个Conditon.await()挂起,但是等待的条件不同。现在假设线程B的线程被满足,线程C执行一个notify操作,此时JVM从Conditon.await()的多个线程(A/B)中随机挑选一个唤醒,不幸的是唤醒了A。此时A的条件不满足,于是A继续挂起。而此时B仍然在傻傻的等待被唤醒的信号。也就是说本来给B的通知却被一个无关的线程持有了,真正需要通知的线程B却没有得到通知,而B仍然在等待一个已经发生过的通知。

如果使用notifyall,则能够避免此问题。notifyall会唤醒所有正在等待的线程,线程C发出的通知线程A同样能够收到,但是由于对于A没用,所以A继续挂起,而线程B也收到了此通知,于是线程B正常被唤醒。

既然notifyall能够解决单一notify丢失通知的问题,那么为什么不总是使用notifyall替换notify呢?

假设有N个线程在条件队列中等待,调用notifyall会唤醒所有线程,然后这N个线程竞争同一个锁,最多只有一个线程能够得到锁,于是其它线程又回到挂起状态。这意味每一次唤醒操作可能带来大量的上下文切换(如果N比较大的话),同时有大量的竞争锁的请求。这对于频繁的唤醒操作而言性能上可能是一种灾难。

如果说总是只有一个线程被唤醒后能够拿到锁,那么为什么不使用notify呢?所以某些情况下使用notify的性能是要高于notifyall的。

如果满足下面的条件,可以使用单一的notify取代notifyall操作: 相同的等待者,也就是说等待条件变量的线程操作相同,每一个从wait放回后执行相同的逻辑,同时一个条件变量的通知至多只能唤醒一个线程。

也就是说理论上讲在put/take中如果使用sinallAll唤醒的话,那么在清单2 中的notFull.singal就是多余的。

出队列过程(poll/take)

再来看出队列过程。清单3 描述了出队列的过程。可以看到这和入队列是对称的。从这里可以看到,出队列使用的是和入队列不同的锁,所以入队列、出队列这两个操作才能并行进行。

清单3 阻塞的出队列过程 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { try { while (count.get() == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to a non-interrupted thread throw ie; }

    x = extract();
    c = count.getAndDecrement();
    if (c > 1)
        notEmpty.signal();
} finally {
    takeLock.unlock();
}
if (c == capacity)
    signalNotFull();
return x;

}

为什么有异常?

有了入队列、出队列的过程后再来回答前面的几个问题。

为什么总是抛出InterruptedException 异常? 这是很大一块内容,其实是Java对线程中断的处理问题,希望能够在系列文章的最后能够对此开辟单独的篇章来谈谈。

在锁机制里面也是总遇到,这是因为,Java里面没有一种直接的方法中断一个挂起的线程,所以通常情况下等于一个处于WAITING状态的线程,允许设置一个中断位,一旦线程检测到这个中断位就会从WAITING状态退出,以一个InterruptedException 的异常返回。所以只要是对一个线程挂起操作都会导致InterruptedException 的可能,比如Thread.sleep()、Thread.join()、Object.wait()。尽管LockSupport.park()不会抛出一个InterruptedException 异常,但是它会将当前线程的的interrupted状态位置上,而对于Lock/Condition而言,当捕捉到interrupted状态后就认为线程应该终止任务,所以就抛出了一个InterruptedException 异常。

又见volatile

还有一个不容易理解的问题。为什么Node.item是volatile类型的?

起初我不大明白,因为对于一个进入队列的Node,它的item是不变,当且仅当出队列的时候会将头结点元素的item 设置为null。尽管在remove(o)的时候也是设置为null,但是那时候是加了putLock/takeLock两个锁的,所以肯定是没有问题的。那么问题出在哪?

我们知道,item的值是在put/offer的时候加入的。这时候都是有putLock锁保证的,也就是说它保证使用putLock锁的读取肯定是没有问题的。那么问题就只可能出在一个不适用putLock却需要读取Node.item的地方。

peek操作时获取头结点的元素而不移除它。显然他不会操作尾节点,所以它不需要putLock锁,也就是说它只有takeLock锁。清单4 描述了这个过程。

清单4 查询队列头元素过程 public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }

清单4 描述了peek的过程,最后返回一个非null节点的结果是Node.item。这里读取了Node的item值,但是整个过程却是使用了takeLock而非putLock。换句话说putLock对Node.item的操作,peek()线程可能不可见!

清单5 队列尾部加入元素 private void insert(E x) { last = last.next = new Node(x); }

清单5 是入队列offer/put的一部分,这里关键在于last=new Node(x)可能发生重排序。Node构造函数是这样的:Node(E x) { item = x; }。在这一步里面我们可能得到以下一种情况:

  1. 构建一个Node对象n;
  2. 将Node的n赋给last
  3. 初始化n,设置item=x

在执行步骤2 的时候一个peek线程可能拿到了新的Node n,这时候它读取item,得到了一个null。显然这是不可靠的。

对item采用volatile之后,JMM保证对item=x的赋值一定在last=n之前,也就是说last得到的一个是一个已经赋值了的新节点n。这就不会导致读取空元素的问题的。

出对了poll/take和peek都是使用的takeLock锁,所以不会导致此问题。

删除操作和遍历操作由于同时获取了takeLock和putLock,所以也不会导致此问题。

总结:当前仅当元素加入队列时读取此元素才可能导致不一致的问题。采用volatile正式避免此问题。

附加功能

BlockingQueue有一个额外的功能,允许批量从队列中异常元素。这个API是: int drainTo(Collection<? super E> c, int maxElements); 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。

int drainTo(Collection<? super E> c); 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。

清单6 描述的是最多移除指定数量元素的过程。由于批量操作只需要一次获取锁,所以效率会比每次获取锁要高。但是需要说明的,需要同时获取takeLock/putLock两把锁,因为当移除完所有元素后这会涉及到尾节点的修改(last节点仍然指向一个已经移走的节点)。

由于迭代操作contains()/remove()/iterator()也是获取了两个锁,所以迭代操作也是线程安全的。

清单6 批量移除操作 public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); fullyLock(); try { int n = 0; Node p = head.next; while (p != null && n < maxElements) { c.add(p.item); p.item = null; p = p.next; ++n; } if (n != 0) { head.next = p; assert head.item == null; if (p == null) last = head; if (count.getAndAdd(-n) == capacity) notFull.signalAll(); } return n; } finally { fullyUnlock(); } }

来源: [http://www.blogjava.net/xylz/archive/2010/07/24/326988.html](http://www.blogjava.net/xylz/archive/2010/07/24/326988.html)

上一节中详细分析了LinkedBlockingQueue 的实现原理。实现一个可扩展的队列通常有两种方式:一种方式就像LinkedBlockingQueue一样使用链表,也就是每一个元素带有下一个元素的引用,这样的队列原生就是可扩展的;另外一种就是通过数组实现,一旦队列的大小达到数组的容量的时候就将数组扩充一倍(或者一定的系数倍),从而达到扩容的目的。常见的ArrayList就属于第二种。前面章节介绍过的HashMap确是综合使用了这两种方式。

对于一个Queue而言,同样可以使用数组实现。使用数组的好处在于各个元素之间原生就是通过数组的索引关联起来的,一次元素之间就是有序的,在通过索引操作数组就方便多了。当然也有它不利的一面,扩容起来比较麻烦,同时删除一个元素也比较低效。

ArrayBlockingQueue 就是Queue的一种数组实现。

ArrayBlockingQueue 原理

在没有介绍ArrayBlockingQueue原理之前可以想象下,一个数组如何实现Queue的FIFO特性。首先,数组是固定大小的,这个是毫无疑问的,那么初始化就是所有元素都为null。假设数组一段为头,另一端为尾。那么头和尾之间的元素就是FIFO队列。

  1. 入队列就将尾索引往右移动一个,新元素加入尾索引的位置;
  2. 出队列就将头索引往尾索引方向移动一个,同时将旧头索引元素设为null,返回旧头索引的元素。
  3. 一旦数组已满,那么就不允许添加新元素(除非扩充容量)
  4. 如果尾索引移到了数组的最后(最大索引处),那么就从索引0开始,形成一个“闭合”的数组。
  5. 由于头索引和尾索引之间的元素都不能为空(因为为空不知道take出来的元素为空还是队列为空),所以删除一个头索引和尾索引之间的元素的话,需要移动删除索引前面或者后面的所有元素,以便填充删除索引的位置。
  6. 由于是阻塞队列,那么显然需要一个锁,另外由于只是一份数据(一个数组),所以只能有一个锁,也就是同时只能有一个线程操作队列。

有了上述几点分析,设计一个可阻塞的数组队列就比较容易了。

image

上图描述的ArrayBlockingQueue的数据结构。首先有一个数组E[],用来存储所有的元素。由于ArrayBlockingQueue最终设置为一个不可扩展大小的Queue,所以这里items就是初始化就固定大小的数组(final类型);另外有两个索引,头索引takeIndex,尾索引putIndex;一个队列的大小count;要支持阻塞就必须需要一个锁lock和两个条件(非空、非满),这三个元素都是不可变更类型的(final)。

由于只有一把锁,所以任何时刻对队列的操作都只有一个线程,这意味着对索引和大小的操作都是线程安全的,所以可以看到这个takeIndex/putIndex/count就不需要原子操作和volatile语义了。

清单1 描述的是一个可阻塞的添加元素过程。这与前面介绍的消费者、生产者模型相同。如果队列已经满了就挂起等待,否则就插入元素,同时唤醒一个队列已空的线程。对比清单2 可以看到是完全相反的两个过程。这在前面几种实现生产者-消费者模型的时候都介绍过了。

清单1 可阻塞的添加元素 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }

清单2 可阻塞的移除元素 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }

需要注意到的是,尽管每次加入、移除一个元素使用的都是signal()通知,而不是signalAll()通知。我们参考上一节中notify替换notifyAll的原则:每一个await醒来的动作相同,每次最多唤醒一个线程来操作。显然这里符合这两种条件,因此使用signal要比使用signalAll要高效,并且是可靠的。

image上图描述了take()/put()的索引位置示意图。

一开始takeIndex/putIndex都在E/0位置,然后每加入一个元素offer/put,putIndex都增加1,也就是往后边移动一位;每移除一个元素poll/take,takeIndex都增加1,也是往后边移动一位,显然takeIndex总是在putIndex的“后边”,因为当队列中没有元素的时候takeIndex和putIndex相等,同时当前位置也没有元素,takeIndex也就是无法再往右边移动了;一旦putIndex/takeIndex移动到了最后面,也就是size-1的位置(这里size是指数组的长度),那么就移动到0,继续循环。循环的前提是数组中元素的个数小于数组的长度。整个过程就是这样的。可见putIndex同时指向头元素的下一个位置(如果队列已经满了,那么就是尾元素位置,否则就是一个元素为null的位置)。

比较复杂的操作时删除任意一个元素。清单3 描述的是删除任意一个元素的过程。显然删除任何一个元素需要遍历整个数组,也就是它的复杂度是O(n),这与根据索引从ArrayList中查找一个元素的复杂度O(1)相比开销要大得多。参考声明的结构图,一旦删除的是takeIndex位置的元素,那么只需要将takeIndex往“右边”移动一位即可;如果删除的是takeIndex和putIndex之间的元素怎么办?这时候就从删除的位置i开始,将i后面的所有元素位置都往“左”移动一位,直到putIndex为止。最终的结果是删除位置的所有元素都“后退”了一个位置,同时putIndex也后退了一个位置。

清单3 删除任意一个元素 public boolean remove(Object o) { if (o == null) return false; final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int k = 0; for (;;) { if (k++ >= count) return false; if (o.equals(items[i])) { removeAt(i); return true; } i = inc(i); }

} finally {
    lock.unlock();
}

} void removeAt(int i) { final E[] items = this.items; // if removing front item, just advance if (i == takeIndex) { items[takeIndex] = null; takeIndex = inc(takeIndex); } else { // slide over all others up through putIndex. for (;;) { int nexti = inc(i); if (nexti != putIndex) { items[i] = items[nexti]; i = nexti; } else { items[i] = null; putIndex = i; break; } } } --count; notFull.signal(); }

对于其他的操作,由于都是带着Lock的操作,所以都比较简单就不再展开了。

下一篇中将介绍另外两个BlockingQueue, PriorityBlockingQueue和SynchronousQueue 然后对这些常见的Queue进行一个小范围的对比。 来源: [http://www.blogjava.net/xylz/archive/2010/07/27/327265.html](http://www.blogjava.net/xylz/archive/2010/07/27/327265.html)

在Set中有一个排序的集合SortedSet,用来保存按照自然顺序排列的对象。Queue中同样引入了一个支持排序的FIFO模型。

并发队列与Queue简介 中介绍了,PriorityQueue和PriorityBlockingQueue就是支持排序的Queue。显然一个支持阻塞的排序Queue要比一个非线程安全的Queue实现起来要复杂的多,因此下面只介绍PriorityBlockingQueue,至于PriorityQueue只需要去掉Blocking功能就基本相同了。

排序的BlockingQueue — PriorityBlockingQueue

先简单介绍下PriorityQueue,因为PriorityBlockingQueue内部就是通过PriorityQueue适配实现的,只不过通过锁进行同步和阻塞而已。

PriorityQueue是一个数组实现的,是一个二叉树的实现,这个二叉树的任意一个节点都比其子节点要小,这样顶点就是最小的节点。每一个元素或者节点要么本身是可比较的(Comparable),或者队列本身带有一个比较器(Comparator<? super E>),所有元素就是靠比较自身的大小来确定顺序的。而数组中顶点就是数组的第0个元素,因此出队列的话总是取第0个元素。对于第0个元素,其子节点是第1个元素和第2个元素,对于第1个元素,其子元素又是第3/4个元素,以此类推,第i个元素的父节点就是(i-1)/2。这样任意一个元素加入队列就从其父节点(i-1)/2开始比较,一旦新节点比父节点小就交换两个节点,然后继续比较新节点与其新的父节点。知道所有节点都是按照父节点一定比子节点小的顺序排列。这是一个有点复杂的算法,此处不再讨论更多的细节。不管是删除还是查找,我们只需要了解的顶点(索引为0的元素)总是最小的。

特别需要说明的是PriorityQueue是一个无界的队列,也就是说一旦元素的个数达到了数组的大小,那么就将数组扩大50%,这样这个数组就是无穷大的。当然了如果达到了整数的最大值就会得到一个OutOfMemoryError,这个是由逻辑保证的。

对于PriorityBlockingQueue而言,由于是无界的,因此就只有非空的信号,也就是说只有take()才能阻塞,put是永远不会阻塞(除非达到Integer.MAX_VALUE直到抛出一个OutOfMemoryError异常)。

只有take()操作的时候才可能因为队列为空而挂起。同时其它需要操作队列变化和大小的只需要使用独占锁ReentrantLock就可以了,非常方便。需要说明的是PriorityBlockingQueue采用了一个公平的锁。

总的来说PriorityBlockingQueue 不是一个FIFO的队列,而是一个有序的队列,这个队列总是取“自然顺序”最小的对象,同时又是一个只能出队列阻塞的BlockingQueue,对于入队列却不是阻塞的。所有操作都是线程安全的。

直接交换的BlockingQueue — SynchronousQueue

这是一个很有意思的阻塞队列,其中每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此此队列内部其实没有任何一个元素,或者说容量是0,严格说并不是一种容器。由于队列没有容量,因此不能调用peek操作,因为只有移除元素时才有元素。

一个没有容量的并发队列有什么用了?或者说存在的意义是什么?

SynchronousQueue 的实现非常复杂,当然了如果真要去分析还是能够得到一些经验的,但是前面分析了过多的结构后,发现越来越陷于数据结构与算法里面了。我的初衷是通过研究并发实现的原理来更好的利用并发来最大限度的利用可用资源。所以在后面的章节中尽可能的少研究数据结构和算法,但是为了弄清楚里面的原理,必不可免的会涉及到一些这方面的知识,希望后面能够适可而止。

再回到话题。SynchronousQueue 内部没有容量,但是由于一个插入操作总是对应一个移除操作,反过来同样需要满足。那么一个元素就不会再SynchronousQueue 里面长时间停留,一旦有了插入线程和移除线程,元素很快就从插入线程移交给移除线程。也就是说这更像是一种信道(管道),资源从一个方向快速传递到另一方向。

需要特别说明的是,尽管元素在SynchronousQueue 内部不会“停留”,但是并不意味之SynchronousQueue 内部没有队列。实际上SynchronousQueue 维护者线程队列,也就是插入线程或者移除线程在不同时存在的时候就会有线程队列。既然有队列,同样就有公平性和非公平性特性,公平性保证正在等待的插入线程或者移除线程以FIFO的顺序传递资源。

显然这是一种快速传递元素的方式,也就是说在这种情况下元素总是以最快的方式从插入着(生产者)传递给移除着(消费者),这在多任务队列中是最快处理任务的方式。在线程池的相关章节中还会更多的提到此特性。

事实上在《并发队列与Queue简介》中介绍了还有一种BlockingQueue的实现DelayQueue,它描述的是一种延时队列。这个队列的特性是,队列中的元素都要延迟时间(超时时间),只有一个元素达到了延时时间才能出队列,也就是说每次从队列中获取的元素总是最先到达延时的元素。这种队列的场景就是计划任务。比如以前要完成计划任务,很有可能是使用Timer/TimerTask,这是一种循环检测的方式,也就是在循环里面遍历所有元素总是检测元素是否满足条件,一旦满足条件就执行相关任务。显然这中方式浪费了很多的检测工作,因为大多数时间总是在进行无谓的检测。而DelayQueue 却能避免这种无谓的检测。在线程池的计划任务部分还有更加详细的讨论此队列实现。

下面就对常见的BlockingQueue进行小节下,这里不包括双向的队列,尽管ConcurrentLinkedQueue不是可阻塞的Queue,但是这里还是将其放在一起进行对比。

并发队列比较

如果不需要阻塞队列,优先选择ConcurrentLinkedQueue;如果需要阻塞队列,队列大小固定优先选择ArrayBlockingQueue,队列大小不固定优先选择LinkedBlockingQueue;如果需要对队列进行排序,选择PriorityBlockingQueue;如果需要一个快速交换的队列,选择SynchronousQueue;如果需要对队列中的元素进行延时操作,则选择DelayQueue。 来源: [http://www.blogjava.net/xylz/archive/2010/07/30/327582.html](http://www.blogjava.net/xylz/archive/2010/07/30/327582.html)

有一段时间没有更新了。接着上节继续吧。

Queue除了前面介绍的实现外,还有一种双向的Queue实现Deque。这种队列允许在队列头和尾部进行入队出队操作,因此在功能上比Queue显然要更复杂。下图描述的是Deque的完整体系图。需要说明的是LinkedList也已经加入了Deque的一部分(LinkedList是从jdk1.2 开始就存在数据结构)。

Deque体系结构

Deque在Queue的基础上增加了更多的操作方法。

Deque操作方法

从上图可以看到,Deque不仅具有FIFO的Queue实现,也有FILO的实现,也就是不仅可以实现队列,也可以实现一个堆栈。

同时在Deque的体系结构图中可以看到,实现一个Deque可以使用数组(ArrayDeque),同时也可以使用链表(LinkedList),还可以同实现一个支持阻塞的线程安全版本队列LinkedBlockingDeque。

image对于数组实现的Deque来说,数据结构上比较简单,只需要一个存储数据的数组以及头尾两个索引即可。由于数组是固定长度的,所以很容易就得到数组的头和尾,那么对于数组的操作只需要移动头和尾的索引即可。

特别说明的是ArrayDeque并不是一个固定大小的队列,每次队列满了以后就将队列容量扩大一倍(doubleCapacity()),因此加入一个元素总是能成功,而且也不会抛出一个异常。也就是说ArrayDeque是一个没有容量限制的队列。

同样继续性能的考虑,使用System.arraycopy复制一个数组比循环设置要高效得多。

image

对于LinkedList本身而言,数据结构就更简单了,除了一个size用来记录大小外,只有head一个元素Entry。对比Map和Queue的其它数据结构可以看到这里的Entry有两个引用,是双向的队列。

在示意图中,LinkedList总是有一个“傀儡”节点,用来描述队列“头部”,但是并不表示头部元素,它是一个执行null的空节点。

队列一开始只有head一个空元素,然后从尾部加入E1(add/addLast),head和E1之间建立双向链接。然后继续从尾部加入E2,E2就在head和E1之间建立双向链接。最后从队列的头部加入E3(push/addFirst),于是E3就在E1和head之间链接双向链接。

双向链表的数据结构比较简单,操作起来也比较容易,从事从“傀儡”节点开始,“傀儡”节点的下一个元素就是队列的头部,前一个元素是队列的尾部,换句话说,“傀儡”节点在头部和尾部之间建立了一个通道,是整个队列形成一个循环,这样就可以从任意一个节点的任意一个方向能遍历完整的队列。

同样LinkedList也是一个没有容量限制的队列,因此入队列(不管是从头部还是尾部)总能成功。

上面描述的ArrayDeque和LinkedList是两种不同方式的实现,通常在遍历和节省内存上ArrayDeque更高效(索引更快,另外不需要Entry对象),但是在队列扩容下LinkedList更灵活,因为不需要复制原始的队列,某些情况下可能更高效。

同样需要注意的上述两个实现都不是线程安全的,因此只适合在单线程环境下使用,下面章节要介绍的LinkedBlockingDeque就是线程安全的可阻塞的Deque。事实上也应该是功能最强大的Queue实现,当然了实现起来也许会复杂一点。 来源: [http://www.blogjava.net/xylz/archive/2010/08/12/328587.html](http://www.blogjava.net/xylz/archive/2010/08/12/328587.html)

这个小节介绍Queue的最后一个工具,也是最强大的一个工具。从名称上就可以看到此工具的特点:双向并发阻塞队列。所谓双向是指可以从队列的头和尾同时操作,并发只是线程安全的实现,阻塞允许在入队出队不满足条件时挂起线程,这里说的队列是指支持FIFO/FILO实现的链表。

首先看下LinkedBlockingDeque的数据结构。通常情况下从数据结构上就能看出这种实现的优缺点,这样就知道如何更好的使用工具了。

LinkedBlockingDeque类图

从数据结构和功能需求上可以得到以下结论:

  1. 要想支持阻塞功能,队列的容量一定是固定的,否则无法在入队的时候挂起线程。也就是capacity是final类型的。
  2. 既然是双向链表,每一个结点就需要前后两个引用,这样才能将所有元素串联起来,支持双向遍历。也即需要prev/next两个引用。
  3. 双向链表需要头尾同时操作,所以需要first/last两个节点,当然可以参考LinkedList那样采用一个节点的双向来完成,那样实现起来就稍微麻烦点。
  4. 既然要支持阻塞功能,就需要锁和条件变量来挂起线程。这里使用一个锁两个条件变量来完成此功能。

有了上面的结论再来研究LinkedBlockingDeque的优缺点。

优点当然是功能足够强大,同时由于采用一个独占锁,因此实现起来也比较简单。所有对队列的操作都加锁就可以完成。同时独占锁也能够很好的支持双向阻塞的特性。

凡事有利必有弊。缺点就是由于独占锁,所以不能同时进行两个操作,这样性能上就大打折扣。从性能的角度讲LinkedBlockingDeque要比LinkedBlockingQueue要低很多,比CocurrentLinkedQueue就低更多了,这在高并发情况下就比较明显了。

前面分析足够多的Queue实现后,LinkedBlockingDeque的原理和实现就不值得一提了,无非是在独占锁下对一个链表的普通操作。

有趣的是此类支持序列化,但是Node并不支持序列化,因此fist/last就不能序列化,那么如何完成序列化/反序列化过程呢?

清单1 LinkedBlockingDeque的序列化、反序列化 private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { lock.lock(); try { // Write out capacity and any hidden stuff s.defaultWriteObject(); // Write out all elements in the proper order. for (Node p = first; p != null; p = p.next) s.writeObject(p.item); // Use trailing null as sentinel s.writeObject(null); } finally { lock.unlock(); } }

private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); count = 0; first = null; last = null; // Read in all elements and place in queue for (;;) { E item = (E)s.readObject(); if (item == null) break; add(item); } }

清单1 描述的是LinkedBlockingDeque序列化/反序列化的过程。序列化时将真正的元素写入输出流,最后还写入了一个null。读取的时候将所有对象列表读出来,如果读取到一个null就表示结束。这就是为什么写入的时候写入一个null的原因,因为没有将count写入流,所以就靠null来表示结束,省一个整数空间。

来源: [http://www.blogjava.net/xylz/archive/2010/08/18/329227.html](http://www.blogjava.net/xylz/archive/2010/08/18/329227.html)

可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给

exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为

SynchronousQueue 的双向形式。

换句话说Exchanger提供的是一个交换服务,允许原子性的交换两个(多个)对象,但同时只有一对才会成功。先看一个简单的实例模型。

Exchanger

在上面的模型中,我们假定一个空的栈(Stack),栈顶(Top)当然是没有元素的。同时我们假定一个数据结构Node,包含一个要交换的元素E和一个要填充的“洞”Node。这时线程T1携带节点node1进入栈(cas_push),当然这是CAS操作,这样栈顶就不为空了。线程T2携带节点node2进入栈,发现栈里面已经有元素了node1,同时发现node1的hold(Node)为空,于是将自己(node2)填充到node1的hold中(cas_fill)。然后将元素node1从栈中弹出(cas_take)。这样线程T1就得到了node1.hold.item也就是node2的元素e2,线程T2就得到了node1.item也就是e1,从而达到了交换的目的。

算法描述就是下图展示的内容。

image

JDK 5就是采用类似的思想实现的Exchanger。JDK 6以后为了支持多线程多对象同时Exchanger了就进行了改造(为了支持更好的并发),采用ConcurrentHashMap的思想,将Stack分割成很多的片段(或者说插槽Slot),线程Id(Thread.getId())hash相同的落在同一个Slot上,这样在默认32个Slot上就有很好的吞吐量。当然会根据机器CPU内核的数量有一定的优化,有兴趣的可以去了解下Exchanger的源码。

至于Exchanger的使用,在JDK文档上有个例子,讲述的是两个线程交换数据缓冲区的例子(实际上仍然可以认为是生产者/消费者模型)。 class FillAndEmpty { Exchanger exchanger = new Exchanger(); DataBuffer initialEmptyBuffer = a made-up type DataBuffer initialFullBuffer = class FillingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialEmptyBuffer; try { while (currentBuffer != null) { addToBuffer(currentBuffer); if (currentBuffer.isFull()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { handle } } } class EmptyingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialFullBuffer; try { while (currentBuffer != null) { takeFromBuffer(currentBuffer); if (currentBuffer.isEmpty()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { handle } } } void start() { new Thread(new FillingLoop()).start(); new Thread(new EmptyingLoop()).start(); } }

Exchanger实现的是一种数据分片的思想,这在大数据情况下将数据分成一定的片段并且多线程执行的情况下有一定的使用价值。

最近一直推托工作忙,更新频度越来越低了,好在现在的工作还有点个人时间,以后争取多更新下吧,至少也要把这个专辑写完。 来源: [http://www.blogjava.net/xylz/archive/2010/11/22/338733.html](http://www.blogjava.net/xylz/archive/2010/11/22/338733.html)

本小节是《并发容器》的最后一部分,这一个小节描述的是针对List/Set接口的一个线程版本。

在《并发队列与Queue简介》中介绍了并发容器的一个概括,主要描述的是Queue的实现。其中特别提到一点LinkedList是List/Queue的实现,但是LinkedList确实非线程安全的。不管BlockingQueue还是ConcurrentMap的实现,我们发现都是针对链表的实现,当然尽可能的使用CAS或者Lock的特性,同时都有通过锁部分容器来提供并发的特性。而对于List或者Set而言,增、删操作其实都是针对整个容器,因此每次操作都不可避免的需要锁定整个容器空间,性能肯定会大打折扣。要实现一个线程安全的List/Set,只需要在修改操作的时候进行同步即可,比如使用java.util.Collections.synchronizedList(List)或者java.util.Collections.synchronizedSet(Set)。当然也可以使用Lock来实现线程安全的List/Set。

通常情况下我们的高并发都发生在“多读少写”的情况,因此如果能够实现一种更优秀的算法这对生产环境还是很有好处的。ReadWriteLock当然是一种实现。CopyOnWriteArrayList/CopyOnWriteArraySet确实另外一种思路。

CopyOnWriteArrayList/CopyOnWriteArraySet的基本思想是一旦对容器有修改,那么就“复制”一份新的集合,在新的集合上修改,然后将新集合复制给旧的引用。当然了这部分少不了要加锁。显然对于CopyOnWriteArrayList/CopyOnWriteArraySet来说最大的好处就是“读”操作不需要锁了。

我们来看看源码。 /// The array, accessed only via getArray/setArray. /*/ private volatile transient Object[] array; public E get(int index) { return (E)(getArray()[index]); } private static int indexOf(Object o, Object[] elements, int index, int fence) { if (o == null) { for (int i = index; i < fence; i++) if (elements[i] == null) return i; } else { for (int i = index; i < fence; i++) if (o.equals(elements[i])) return i; } return -1; } public Iterator iterator() { return new COWIterator(getArray(), 0); } public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { setArray(new Object[0]); } finally { lock.unlock(); } }

对于上述代码,有几点说明:

  1. List仍然是基于数组的实现,因为只有数组是最快的。
  2. 为了保证无锁的读操作能够看到写操作的变化,因此数组array是volatile类型的。
  3. get/indexOf/iterator等操作都是无锁的,同时也可以看到所操作的都是某一时刻array的镜像(这得益于数组是不可变化的)
  4. add/set/remove/clear等元素变化的都是需要加锁的,这里使用的是ReentrantLock。

这里有一段有意思的代码片段。 public E set(int index, E element) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); Object oldValue = elements[index]; if (oldValue != element) { int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len); newElements[index] = element; setArray(newElements); } else { // Not quite a no-op; ensures volatile write semantics setArray(elements); } return (E)oldValue; } finally { lock.unlock(); } } final void setArray(Object[] a) { array = a; }

对于set操作,如果元素有变化,修改后setArray(newElements);将新数组赋值还好理解。那么如果一个元素没有变化,也就是上述代码的else部分,为什么还需要进行一个无谓的setArray操作?毕竟setArray操作没有改变任何数据。

对于这个问题也是很有意思,有一封邮件讨论了此问题(123)。 大致的意思是,尽管没有改变任何数据,但是为了保持“volatile”的语义,任何一个读操作都应该是一个写操作的结果,也就是读操作看到的数据一定是某个写操作的结果(尽管写操作没有改变数据本身)。所以这里即使不设置也没有问题,仅仅是为了一个语义上的补充(个人理解)。

这里还有一个有意思的讨论,说什么addIfAbsent在元素没有变化的时候为什么没有setArray操作?这个要看怎么理解addIfAbsent的语义了。如果说addIfAbsent语义是”写“或者”不写“操作,而把”不写“操作当作一次”读“操作的话,那么”读“操作就不需要保持volatile语义了。

对于CopyOnWriteArraySet而言就简单多了,只是持有一个CopyOnWriteArrayList,仅仅在add/addAll的时候检测元素是否存在,如果存在就不加入集合中。 private final CopyOnWriteArrayList al; /// / Creates an empty set. // public CopyOnWriteArraySet() { al = new CopyOnWriteArrayList(); } public boolean add(E e) { return al.addIfAbsent(e); }

在使用上CopyOnWriteArrayList/CopyOnWriteArraySet就简单多了,和List/Set基本相同,这里就不再介绍了。

整个并发容器结束了,接下来好好规划下线程池部分,然后进入最后一部分的梳理。 来源: [http://www.blogjava.net/xylz/archive/2010/11/23/338853.html](http://www.blogjava.net/xylz/archive/2010/11/23/338853.html)

java多线程学习

Posted on

java多线程学习-java.util.concurrent详解

Latch/Barrier

Java1.5提供了一个非常高效实用的多线程包:java.util.concurrent, 提供了大量高级工具,可以帮助开发者编写高效、易维护、结构清晰的Java多线程程序。从这篇blog起,我将跟大家一起共同学习这些新的Java多线程构件

  1. CountDownLatch 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: “一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。 用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。” 这就是说,CountDownLatch可以用来管理一组相关的线程执行,只需在主线程中调用CountDownLatch 的await方法(一直阻塞),让各个线程调用countDown方法。当所有的线程都只需完countDown了,await也顺利返回,不再阻塞了。在这样情况下尤其适用:将一个任务分成若干线程执行,等到所有线程执行完,再进行汇总处理。 下面我举一个非常简单的例子。假设我们要打印1-100,最后再输出“Ok“。1-100的打印顺序不要求统一,只需保证“Ok“是在最后出现即可。 解决方案:我们定义一个CountDownLatch,然后开10个线程分别打印(n-1)/10+1至(n-1)/10+10。主线程中调用await方法等待所有线程的执行完毕,每个线程执行完毕后都调用countDown方法。最后再await返回后打印“Ok”。 具体代码如下(本代码参考了JDK示例代码): Java代码 收藏代码

  2. import java.util.concurrent.CountDownLatch;

  3. ///
  4. /* 示例:CountDownLatch的使用举例
  5. /* Mail: ken@iamcoding.com
  6. /* @author janeky
  7. /*/
  8. public class TestCountDownLatch {
  9. private static final int N = 10;
  10. public static void main(String[] args) throws InterruptedException {
  11. CountDownLatch doneSignal = new CountDownLatch(N);
  12. CountDownLatch startSignal = new CountDownLatch(1);//开始执行信号
  13. for (int i = 1; i <= N; i++) {
  14. new Thread(new Worker(i, doneSignal, startSignal)).start();//线程启动了
  15. }
  16. System.out.println("begin------------");
  17. startSignal.countDown();//开始执行啦
  18. doneSignal.await();//等待所有的线程执行完毕
  19. System.out.println("Ok");
  20. }
  21. static class Worker implements Runnable {
  22. private final CountDownLatch doneSignal;
  23. private final CountDownLatch startSignal;
  24. private int beginIndex;
  25. Worker(int beginIndex, CountDownLatch doneSignal,
  26. CountDownLatch startSignal) {
  27. this.startSignal = startSignal;
  28. this.beginIndex = beginIndex;
  29. this.doneSignal = doneSignal;
  30. }
  31. public void run() {
  32. try {
  33. startSignal.await(); //等待开始执行信号的发布
  34. beginIndex = (beginIndex - 1) /* 10 + 1;
  35. for (int i = beginIndex; i <= beginIndex + 10; i++) {
  36. System.out.println(i);
  37. }
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. } finally {
  41. doneSignal.countDown();
  42. }
  43. }
  44. }
  45. }
    总结:CounDownLatch对于管理一组相关线程非常有用。上述示例代码中就形象地描述了两种使用情况。第一种是计算器为1,代表了两种状态,开关。第二种是计数器为N,代表等待N个操作完成。今后我们在编写多线程程序时,可以使用这个构件来管理一组独立线程的执行。
  46. CyclicBarrier 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: “一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。 CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。 我们在学习CountDownLatch的时候就提到了CyclicBarrier。两者究竟有什么联系呢?引用[JCIP]中的描述“The key difference is that with a barrier, all the threads must come together at a barrier point at the same time in order to proceed. Latches are for waiting for events; barriers are for waiting for other threads。CyclicBarrier等待所有的线程一起完成后再执行某个动作。这个功能CountDownLatch也同样可以实现。但是CountDownLatch更多时候是在等待某个事件的发生。在CyclicBarrier中,所有的线程调用await方法,等待其他线程都执行完。 举一个很简单的例子,今天晚上我们哥们4个去Happy。就互相通知了一下:晚上八点准时到xx酒吧门前集合,不见不散!。有个哥们住的近,早早就到了。有的事务繁忙,刚好踩点到了。无论怎样,先来的都不能独自行动,只能等待所有人 代码如下(参考了网上给的一些教程) Java代码 收藏代码

  47. import java.util.Random;

  48. import java.util.concurrent.BrokenBarrierException;
  49. import java.util.concurrent.CyclicBarrier;
  50. import java.util.concurrent.ExecutorService;
  51. import java.util.concurrent.Executors;
  52. public class TestCyclicBarrier {
  53. public static void main(String[] args) {
  54. ExecutorService exec = Executors.newCachedThreadPool();
  55. final Random random=new Random();
  56. final CyclicBarrier barrier=new CyclicBarrier(4,new Runnable(){
  57. @Override
  58. public void run() {
  59. System.out.println("大家都到齐了,开始happy去");
  60. }});
  61. for(int i=0;i<4;i++){
  62. exec.execute(new Runnable(){
  63. @Override
  64. public void run() {
  65. try {
  66. Thread.sleep(random.nextInt(1000));
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. }
  70. System.out.println(Thread.currentThread().getName()+"到了,其他哥们呢");
  71. try {
  72. barrier.await();//等待其他哥们
  73. } catch (InterruptedException e) {
  74. e.printStackTrace();
  75. } catch (BrokenBarrierException e) {
  76. e.printStackTrace();
  77. }
  78. }});
  79. }
  80. exec.shutdown();
  81. }
  82. }
    关于await方法要特别注意一下,它有可能在阻塞的过程中由于某些原因被中断 总结:CyclicBarrier就是一个栅栏,等待所有线程到达后再执行相关的操作。barrier 在释放等待线程后可以重用。

  83. Semaphore 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: “一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。” 我们一般用它来控制某个对象的线程访问对象 例如,对于某个容器,我们规定,最多只能容纳n个线程同时操作 使用信号量来模拟实现 具体代码如下(参考 [JCIP]) Java代码 收藏代码

  84. import java.util.Collections;

  85. import java.util.HashSet;
  86. import java.util.Set;
  87. import java.util.concurrent.ExecutorService;
  88. import java.util.concurrent.Executors;
  89. import java.util.concurrent.Semaphore;
  90. public class TestSemaphore {
  91. public static void main(String[] args) {
  92. ExecutorService exec = Executors.newCachedThreadPool();
  93. TestSemaphore t = new TestSemaphore();
  94. final BoundedHashSet set = t.getSet();
  95. for (int i = 0; i < 3; i++) {//三个线程同时操作add
  96. exec.execute(new Runnable() {
  97. public void run() {
  98. try {
  99. set.add(Thread.currentThread().getName());
  100. } catch (InterruptedException e) {
  101. e.printStackTrace();
  102. }
  103. }
  104. });
  105. }
  106. for (int j = 0; j < 3; j++) {//三个线程同时操作remove
  107. exec.execute(new Runnable() {
  108. public void run() {
  109. set.remove(Thread.currentThread().getName());
  110. }
  111. });
  112. }
  113. exec.shutdown();
  114. }
  115. public BoundedHashSet getSet() {
  116. return new BoundedHashSet(2);//定义一个边界约束为2的线程
  117. }
  118. class BoundedHashSet {
  119. private final Set set;
  120. private final Semaphore semaphore;
  121. public BoundedHashSet(int bound) {
  122. this.set = Collections.synchronizedSet(new HashSet());
  123. this.semaphore = new Semaphore(bound, true);
  124. }
  125. public void add(T o) throws InterruptedException {
  126. semaphore.acquire();//信号量控制可访问的线程数目
  127. set.add(o);
  128. System.out.printf("add:%s%n",o);
  129. }
  130. public void remove(T o) {
  131. if (set.remove(o))
  132. semaphore.release();//释放掉信号量
  133. System.out.printf("remove:%s%n",o);
  134. }
  135. }
  136. }
    总结:Semaphore通常用于对象池的控制 4.FutureTask 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: “取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。 可使用 FutureTask 包装 Callable 或 Runnable 对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给 Executor 执行。 除了作为一个独立的类外,此类还提供了 protected 功能,这在创建自定义任务类时可能很有用。 “ 应用举例:我们的算法中有一个很耗时的操作,在编程的是,我们希望将它独立成一个模块,调用的时候当做它是立刻返回的,并且可以随时取消的 具体代码如下(参考 [JCIP]) Java代码 收藏代码

  137. import java.util.concurrent.Callable;

  138. import java.util.concurrent.ExecutionException;
  139. import java.util.concurrent.ExecutorService;
  140. import java.util.concurrent.Executors;
  141. import java.util.concurrent.FutureTask;
  142. public class TestFutureTask {
  143. public static void main(String[] args) {
  144. ExecutorService exec=Executors.newCachedThreadPool();
  145. FutureTask task=new FutureTask(new Callable(){//FutrueTask的构造参数是一个Callable接口
  146. @Override
  147. public String call() throws Exception {
  148. return Thread.currentThread().getName();//这里可以是一个异步操作
  149. }});
  150. try {
  151. exec.execute(task);//FutureTask实际上也是一个线程
  152. String result=task.get();//取得异步计算的结果,如果没有返回,就会一直阻塞等待
  153. System.out.printf("get:%s%n",result);
  154. } catch (InterruptedException e) {
  155. e.printStackTrace();
  156. } catch (ExecutionException e) {
  157. e.printStackTrace();
  158. }
  159. }
  160. }
    总结:FutureTask其实就是新建了一个线程单独执行,使得线程有一个返回值,方便程序的编写
  161. Exchanger 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: “可以在pair中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。 “ 应用举例:有两个缓存区,两个线程分别向两个缓存区fill和take,当且仅当一个满了,两个缓存区交换 代码如下(参考了网上给的示例 http://hi.baidu.com/webidea/blog/item/2995e731e53ad5a55fdf0e7d.html) Java代码 收藏代码

  162. import java.util.ArrayList;

  163. import java.util.concurrent.Exchanger;
  164. public class TestExchanger {
  165. public static void main(String[] args) {
  166. final Exchanger> exchanger = new Exchanger>();
  167. final ArrayList buff1 = new ArrayList(10);
  168. final ArrayList buff2 = new ArrayList(10);
  169. new Thread(new Runnable() {
  170. @Override
  171. public void run() {
  172. ArrayList buff = buff1;
  173. try {
  174. while (true) {
  175. if (buff.size() >= 10) {
  176. buff = exchanger.exchange(buff);//开始跟另外一个线程交互数据
  177. System.out.println("exchange buff1");
  178. buff.clear();
  179. }
  180. buff.add((int)(Math.random()/*100));
  181. Thread.sleep((long)(Math.random()/*1000));
  182. }
  183. } catch (InterruptedException e) {
  184. e.printStackTrace();
  185. }
  186. }
  187. }).start();
  188. new Thread(new Runnable(){
  189. @Override
  190. public void run() {
  191. ArrayList buff=buff2;
  192. while(true){
  193. try {
  194. for(Integer i:buff){
  195. System.out.println(i);
  196. }
  197. Thread.sleep(1000);
  198. buff=exchanger.exchange(buff);//开始跟另外一个线程交换数据
  199. System.out.println("exchange buff2");
  200. } catch (InterruptedException e) {
  201. e.printStackTrace();
  202. }
  203. }
  204. }}).start();
  205. }
  206. }
    总结:Exchanger在特定的使用场景比较有用(两个伙伴线程之间的数据交互)

  207. ScheduledThreadPoolExecutor 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: "可另行安排在给定的延迟后运行命令,或者定期执行命令。需要多个辅助线程时,或者要求 ThreadPoolExecutor 具有额外的灵活性或功能时,此类要优于 Timer。 一旦启用已延迟的任务就执行它,但是有关何时启用,启用后何时执行则没有任何实时保证。按照提交的先进先出 (FIFO) 顺序来启用那些被安排在同一执行时间的任务。 虽然此类继承自 ThreadPoolExecutor,但是几个继承的调整方法对此类并无作用。特别是,因为它作为一个使用 corePoolSize 线程和一个无界队列的固定大小的池,所以调整 maximumPoolSize 没有什么效果。" 在JDK1.5之前,我们关于定时/周期操作都是通过Timer来实现的。但是Timer有以下几种危险[JCIP] a. Timer是基于绝对时间的。容易受系统时钟的影响。 b. Timer只新建了一个线程来执行所有的TimeTask。所有TimeTask可能会相关影响 c. Timer不会捕获TimerTask的异常,只是简单地停止。这样势必会影响其他TimeTask的执行。 如果你是使用JDK1.5以上版本,建议用ScheduledThreadPoolExecutor代替Timer。它基本上解决了上述问题。它采用相对时间,用线程池来执行TimerTask,会出来TimerTask异常。 下面通过一个简单的实例来阐述ScheduledThreadPoolExecutor的使用。

    我们定期让定时器抛异常 我们定期从控制台打印系统时间 代码如下(参考了网上的一些代码,在此表示感谢) Java代码 收藏代码

  208. import java.util.concurrent.ScheduledThreadPoolExecutor;

  209. import java.util.concurrent.TimeUnit;
  210. public class TestScheduledThreadPoolExecutor {
  211. public static void main(String[] args) {
  212. ScheduledThreadPoolExecutor exec=new ScheduledThreadPoolExecutor(1);
  213. exec.scheduleAtFixedRate(new Runnable(){//每隔一段时间就触发异常
  214. @Override
  215. public void run() {
  216. throw new RuntimeException();
  217. }}, 1000, 5000, TimeUnit.MILLISECONDS);
  218. exec.scheduleAtFixedRate(new Runnable(){//每隔一段时间打印系统时间,证明两者是互不影响的
  219. @Override
  220. public void run() {
  221. System.out.println(System.nanoTime());
  222. }}, 1000, 2000, TimeUnit.MILLISECONDS);
  223. }
  224. }
    总结:是时候把你的定时器换成 ScheduledThreadPoolExecutor了

7.BlockingQueue “支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。“ 这里我们主要讨论BlockingQueue的最典型实现:LinkedBlockingQueue 和ArrayBlockingQueue。两者的不同是底层的数据结构不够,一个是链表,另外一个是数组。

后面将要单独解释其他类型的BlockingQueue和SynchronousQueue 
BlockingQueue的经典用途是 生产者-消费者模式 
代码如下: 

Java代码 收藏代码

  1. import java.util.Random;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.LinkedBlockingQueue;
  4. public class TestBlockingQueue {
  5. public static void main(String[] args) {
  6. final BlockingQueue queue=new LinkedBlockingQueue(3);
  7. final Random random=new Random();
  8. class Producer implements Runnable{
  9. @Override
  10. public void run() {
  11. while(true){
  12. try {
  13. int i=random.nextInt(100);
  14. queue.put(i);//当队列达到容量时候,会自动阻塞的
  15. if(queue.size()==3)
  16. {
  17. System.out.println("full");
  18. }
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
  24. }
  25. class Consumer implements Runnable{
  26. @Override
  27. public void run() {
  28. while(true){
  29. try {
  30. queue.take();//当队列为空时,也会自动阻塞
  31. Thread.sleep(1000);
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. }
  37. }
  38. new Thread(new Producer()).start();
  39. new Thread(new Consumer()).start();
  40. }
  41. }
    总结:BlockingQueue使用时候特别注意take 和 put
  42. DelayQueue 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: “它是包含Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。” 在现实生活中,很多DelayQueue的例子。就拿上海的SB会来说明,很多国家地区的开馆时间不同。你很早就来到园区,然后急急忙忙地跑到一些心仪的馆区,发现有些还没开,你吃了闭门羹。 仔细研究DelayQueue,你会发现它其实就是一个PriorityQueue的封装(按照delay时间排序),里面的元素都实现了Delayed接口,相关操作需要判断延时时间是否到了。 在实际应用中,有人拿它来管理跟实际相关的缓存、session等 下面我就通过 “上海SB会的例子来阐述DelayQueue的用法” 代码如下: Java代码 收藏代码

  43. import java.util.Random;

  44. import java.util.concurrent.DelayQueue;
  45. import java.util.concurrent.Delayed;
  46. import java.util.concurrent.TimeUnit;
  47. public class TestDelayQueue {
  48. private class Stadium implements Delayed
  49. {
  50. long trigger;
  51. public Stadium(long i){
  52. trigger=System.currentTimeMillis()+i;
  53. }
  54. @Override
  55. public long getDelay(TimeUnit arg0) {
  56. long n=trigger-System.currentTimeMillis();
  57. return n;
  58. }
  59. @Override
  60. public int compareTo(Delayed arg0) {
  61. return (int)(this.getDelay(TimeUnit.MILLISECONDS)-arg0.getDelay(TimeUnit.MILLISECONDS));
  62. }
  63. public long getTriggerTime(){
  64. return trigger;
  65. }
  66. }
  67. public static void main(String[] args)throws Exception {
  68. Random random=new Random();
  69. DelayQueue queue=new DelayQueue();
  70. TestDelayQueue t=new TestDelayQueue();
  71. for(int i=0;i<5;i++){
  72. queue.add(t.new Stadium(random.nextInt(30000)));
  73. }
  74. Thread.sleep(2000);
  75. while(true){
  76. Stadium s=queue.take();//延时时间未到就一直等待
  77. if(s!=null){
  78. System.out.println(System.currentTimeMillis()-s.getTriggerTime());//基本上是等于0
  79. }
  80. if(queue.size()==0)
  81. break;
  82. }
  83. }
  84. }
    总结:适用于需要延时操作的队列管理
  85. SynchronousQueue 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: “一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。 同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。 “ 看起来很有意思吧。队列竟然是没有内部容量的。这个队列其实是BlockingQueue的一种实现。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。它给我们提供了在线程之间交换单一元素的极轻量级方法 应用举例:我们要在多个线程中传递一个变量。 代码如下(其实就是生产者消费者模式) Java代码 收藏代码

  86. import java.util.Arrays;

  87. import java.util.List;
  88. import java.util.concurrent.BlockingQueue;
  89. import java.util.concurrent.SynchronousQueue;
  90. public class TestSynchronousQueue {
  91. class Producer implements Runnable {
  92. private BlockingQueue queue;
  93. List objects = Arrays.asList("one", "two", "three");
  94. public Producer(BlockingQueue q) {
  95. this.queue = q;
  96. }
  97. @Override
  98. public void run() {
  99. try {
  100. for (String s : objects) {
  101. queue.put(s);// 产生数据放入队列中
  102. System.out.printf("put:%s%n",s);
  103. }
  104. queue.put("Done");// 已完成的标志
  105. } catch (InterruptedException e) {
  106. e.printStackTrace();
  107. }
  108. }
  109. }
  110. class Consumer implements Runnable {
  111. private BlockingQueue queue;
  112. public Consumer(BlockingQueue q) {
  113. this.queue = q;
  114. }
  115. @Override
  116. public void run() {
  117. String obj = null;
  118. try {
  119. while (!((obj = queue.take()).equals("Done"))) {
  120. System.out.println(obj);//从队列中读取对象
  121. Thread.sleep(3000); //故意sleep,证明Producer是put不进去的
  122. }
  123. } catch (InterruptedException e) {
  124. e.printStackTrace();
  125. }
  126. }
  127. }
  128. public static void main(String[] args) {
  129. BlockingQueue q=new SynchronousQueue();
  130. TestSynchronousQueue t=new TestSynchronousQueue();
  131. new Thread(t.new Producer(q)).start();
  132. new Thread(t.new Consumer(q)).start();
  133. }
  134. }
    总结:SynchronousQueue主要用于单个元素在多线程之间的传递

深入浅出 Java Concurrency (1)

Posted on

深入浅出 Java Concurrency (1) : J.U.C的整体认识

最近一直用的比较多的就是java.util.concurrent(J.U.C),实际上这块一直也没有完全深入研究,这次准备花点时间研究下Java里面整个并发体系。初步的设想包括比较大的方便(包括硬件、软件、思想以及误区等等),因此可能会持续较长的时间。这块内容也是Java在多线程方面引以为豪的一部分,深入这一部分不仅对整个Java体系有更深的了解,也对工作、学习的态度有多帮助。

从深入浅出入手,大体内容包括一下几个方面:

(1)J.U.C的API:包括完整的类库结构和样例分析。

(2)J.U.C的硬件原理以及软件思想:这部分也就将自己个人对硬件与程序语言的一些认识与大家分享,主要以总结前人的经验和所谓的理论来进行一些描述。

(3)J.U.C的误区和常见陷阱:包括对J.U.C的一些设计思想和使用上的原则进行说明,同时对可能犯的错误进行一些总结。

下面的图是J.U.C完整的API。完整的图片地址在这里

J.U.C

完整的MindManger图形可以从下面的地址得到(注意,我是用MindManger 8写的,低版本没有测试是否能正常显示)。

J.U.C完整的MindManger图形

在接下来的系列文章中,我们将根据这张API的图形进行完整的分析和研究。 来源: [http://www.blogjava.net/xylz/archive/2010/06/30/324915.html](http://www.blogjava.net/xylz/archive/2010/06/30/324915.html)