URL特殊字符详解

Posted on

URL特殊字符详解 - 没有所谓的失败!除非你不再尝试! - PHPChina 开源社区门户 - Powered by X-Space

没有所谓的失败!除非你不再尝试!

copy Bookmark http://smarty.blog.phpchina.com

空间管理 您的位置: PHPChina 开源社区门户 » 没有所谓的失败!除非你不再尝试! » 日志

每天都在进步一点点,在这里我将记下我的点点滴滴,希望和大家一起进步,一起学习,一起共同成长PHPer之路! PHPer成长群(49052575)欢迎各界精英加入,互相学习,互补不足。空间不再更新:如有需要请进 http://www.phpcake.cn

URL特殊字符详解

上一篇 / 下一篇 2008-04-11 19:48:57 / 个人分类:url特殊字符 查看( 386 ) / 评论( 2 ) / 评分( 1 / 0 )

有些符号在URL中是不能直接传递的,如果要在URL中传递这些特殊符号,那么就要使用他们的编码了。编码的格式为:%加字符的ASCII码,即一个百分号%,后面跟对应字符的ASCII(16进制)码值。例如 空格的编码值是”%20″。 下表中列出了一些URL特殊符号及编码 十六进制值 1. + URL 中+号表示空格 %2B 2. 空格 URL中的空格可以用+号或者编码 %20 3. / 分隔目录和子目录 %2F 4. ? 分隔实际的 URL 和参数 %3F 5. % 指定特殊字符 %25 6. /# 表示书签 %23 7. & URL 中指定的参数间的分隔符 %26 8. = URL 中指定参数的值 %3D

例:要传递字符串“this%is/#te=st&o k?+/”作为参数t传给te.asp,则URL可以是: te.asp?t=this%25is%23te%3Dst%26o%20k%3F%2B%2F 或者 te.asp?t=this%25is%23te%3Dst%26o+k%3F%2B%2F (空格可以用%20或+代替)java中URL 的编码和解码函数 java.net.URLEncoder.encode(String s)和java.net.URLDecoder.decode(String s); 注:encode(String s)方法过期了,现在一般用encode(String s, “GBK”);

在javascrīpt 中URL 的编码和解码函数 escape(String s)和unescape(String s) ;

相关阅读:

导入论坛 收藏 分享给好友 管理 举报

TAG: url escape unescape url特殊字符 没有所谓的失败!除非你不再尝试! 引用 删除 design_dd / 2008-04-19 16:11:49 谢谢支持^_^ 引用 删除 废墟 / 2008-04-17 21:36:42 评 1 分 不错。。怎么没人顶啊 。。这些人 就知道看 也不顶。。

查看全部评论

-5 -3 -1 - +1 +3 +5

评分:0

我来说两句

显示全部

:loveliness: :handshake :victory: :funk: :time: :kiss: :call: :hug: :lol :'( :Q :L ;P :$ :P :o :@ :D :( :)

内容

昵称

加入事件

提交评论

design_dd

design_dd

用户菜单

我的存档

RSS订阅

  • RSS订阅

清空Cookie - 联系我们 - PHPChina 开源社区门户 - 交流论坛 - 空间列表 - 站点存档 - 升级自己的空间

Powered by X-Space 4.0 UC © 2001-2008 Comsenz Inc. 京ICP备07504665号 Open Toolbar

深入浅出 Java Concurrency (28)

Posted on

深入浅出 Java Concurrency (28): 线程池

简介

从这一节开始正式进入线程池的部分。其实整个体系已经拖了很长的时间,因此后面的章节会加快速度,甚至只是一个半成品或者简单化,以后有时间的慢慢补充、完善。

其实线程池是并发包里面很重要的一部分,在实际情况中也是使用很多的一个重要组件。

下图描述的是线程池API的一部分。广义上的完整线程池可能还包括Thread/Runnable、Timer/TimerTask等部分。这里只介绍主要的和高级的API以及架构和原理。

ThreadPool2

大多数并发应用程序是围绕执行任务(Task)进行管理的。所谓任务就是抽象、离散的工作单元(unit of work)。把一个应用程序的工作(work)分离到任务中,可以简化程序的管理;这种分离还在不同事物间划分了自然的分界线,可以方便程序在出现错误时进行恢复;同时这种分离还可以为并行工作提供一个自然的结构,有利于提高程序的并发性。[1]

并发执行任务的一个很重要前提是拆分任务。把一个大的过程或者任务拆分成很多小的工作单元,每一个工作单元可能相关、也可能无关,这些单元在一定程度上可以充分利用CPU的特性并发的执行,从而提高并发性(性能、响应时间、吞吐量等)。

所谓的任务拆分就是确定每一个执行任务(工作单元)的边界。理想情况下独立的工作单元有最大的吞吐量,这些工作单元不依赖于其它工作单元的状态、结果或者其他资源等。因此将任务尽可能的拆分成一个个独立的工作单元有利于提高程序的并发性。

对于有依赖关系以及资源竞争的工作单元就涉及到任务的调度和负载均衡。工作单元的状态、结果或者其他资源等有关联的工作单元就需要有一个总体的调度者来协调资源和执行顺序。同样在有限的资源情况下,大量的任务也需要一个协调各个工作单元的调度者。这就涉及到任务执行的策略问题。

任务的执行策略包括4W3H部分:

  • 任务在什么(What)线程中执行
  • 任务以什么(What)顺序执行(FIFO/LIFO/优先级等)
  • 同时有多少个(How Many)任务并发执行
  • 允许有多少个(How Many)个任务进入执行队列
  • 系统过载时选择放弃哪一个(Which)任务,如何(How)通知应用程序这个动作
  • 任务执行的开始、结束应该做什么(What)处理

在后面的章节中会详细分写这些策略是如何实现的。我们先来简单回答些如何满足上面的条件。

  1. 首先明确一定是在Java里面可以供使用者调用的启动线程类是Thread。因此Runnable或者Timer/TimerTask等都是要依赖Thread来启动的,因此在ThreadPool里面同样也是靠Thread来启动多线程的。
  2. 默认情况下Runnable接口执行完毕后是不能拿到执行结果的,因此在ThreadPool里就定义了一个Callable接口来处理执行结果。
  3. 为了异步阻塞的获取结果,Future可以帮助调用线程获取执行结果。
  4. Executor解决了向线程池提交任务的入口问题,同时ScheduledExecutorService解决了如何进行重复调用任务的问题。
  5. CompletionService解决了如何按照执行完毕的顺序获取结果的问题,这在某些情况下可以提高任务执行的并发,调用线程不必在长时间任务上等待过多时间。
  6. 显然线程的数量是有限的,而且也不宜过多,因此合适的任务队列是必不可少的,BlockingQueue的容量正好可以解决此问题。
  7. 固定任务容量就意味着在容量满了以后需要一定的策略来处理过多的任务(新任务),RejectedExecutionHandler正好解决此问题。
  8. 一定时间内阻塞就意味着有超时,因此TimeoutException就是为了描述这种现象。TimeUnit是为了描述超时时间方便的一个时间单元枚举类。
  9. 有上述问题就意味了配置一个合适的线程池是很复杂的,因此Executors默认的一些线程池配置可以减少这个操作。

线程池的基本策略大致就这些,从下一节开始就从线程池的基本原理和执行方法开始描述。

[1] Java Concurrency in Practice 来源: [http://www.blogjava.net/xylz/archive/2010/12/19/341098.html](http://www.blogjava.net/xylz/archive/2010/12/19/341098.html) Executor 以及Executors Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。

下面这张图完整描述了线程池的类体系结构。

Executor-class

首先Executor的execute方法只是执行一个Runnable的任务,当然了从某种角度上将最后的实现类也是在线程中启动此任务的。根据线程池的执行策略最后这个任务可能在新的线程中执行,或者线程池中的某个线程,甚至是调用者线程中执行(相当于直接运行Runnable的run方法)。这点在后面会详细说明。

ExecutorService在Executor的基础上增加了一些方法,其中有两个核心的方法:

  • Future<?> submit(Runnable task)
  • Future submit(Callable task)

这两个方法都是向线程池中提交任务,它们的区别在于Runnable在执行完毕后没有结果,Callable执行完毕后有一个结果。这在多个线程中传递状态和结果是非常有用的。另外他们的相同点在于都返回一个Future对象。Future对象可以阻塞线程直到运行完毕(获取结果,如果有的话),也可以取消任务执行,当然也能够检测任务是否被取消或者是否执行完毕。

在没有Future之前我们检测一个线程是否执行完毕通常使用Thread.join()或者用一个死循环加状态位来描述线程执行完毕。现在有了更好的方法能够阻塞线程,检测任务执行完毕甚至取消执行中或者未开始执行的任务。

ScheduledExecutorService描述的功能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。这包括延迟时间一次性执行、延迟时间周期性执行以及固定延迟时间周期性执行等。当然了继承ExecutorService的ScheduledExecutorService拥有ExecutorService的全部特性。

ThreadPoolExecutor是ExecutorService的默认实现,其中的配置、策略也是比较复杂的,在后面的章节中会有详细的分析。

ScheduledThreadPoolExecutor是继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现,在后面的章节中会有详细的分析。

这里需要稍微提一下的是CompletionService接口,它是用于描述顺序获取执行结果的一个线程池包装器。它依赖一个具体的线程池调度,但是能够根据任务的执行先后顺序得到执行结果,这在某些情况下可能提高并发效率。

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。

  • newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
  • newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
  • newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
  • newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
  • newSingleThreadScheduledExecutor:创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求。

在详细讲解ThreadPoolExecutor的时候会具体讨论上述参数配置后的意义和原理。

线程池是一个复杂的任务调度工具,因此它涉及到任务、线程池等的生命周期问题,在下一节中来探讨下这个问题。 来源: [http://www.blogjava.net/xylz/archive/2010/12/21/341281.html](http://www.blogjava.net/xylz/archive/2010/12/21/341281.html) Executor 生命周期

我们知道线程是有多种执行状态的,同样管理线程的线程池也有多种状态。JVM会在所有线程(非后台daemon线程)全部终止后才退出,为了节省资源和有效释放资源关闭一个线程池就显得很重要。有时候无法正确的关闭线程池,将会阻止JVM的结束。

线程池Executor是异步的执行任务,因此任何时刻不能够直接获取提交的任务的状态。这些任务有可能已经完成,也有可能正在执行或者还在排队等待执行。因此关闭线程池可能出现一下几种情况:

  • 平缓关闭:已经启动的任务全部执行完毕,同时不再接受新的任务
  • 立即关闭:取消所有正在执行和未执行的任务

另外关闭线程池后对于任务的状态应该有相应的反馈信息。

图1 描述了线程池的4种状态。

  • 线程池在构造前(new操作)是初始状态,一旦构造完成线程池就进入了执行状态RUNNING。严格意义上讲线程池构造完成后并没有线程被立即启动,只有进行“预启动”或者接收到任务的时候才会启动线程。这个会后面线程池的原理会详细分析。但是线程池是出于运行状态,随时准备接受任务来执行。
  • 线程池运行中可以通过shutdown()和shutdownNow()来改变运行状态。shutdown()是一个平缓的关闭过程,线程池停止接受新的任务,同时等待已经提交的任务执行完毕,包括那些进入队列还没有开始的任务,这时候线程池处于SHUTDOWN状态;shutdownNow()是一个立即关闭过程,线程池停止接受新的任务,同时线程池取消所有执行的任务和已经进入队列但是还没有执行的任务,这时候线程池处于STOP状态。
  • 一旦shutdown()或者shutdownNow()执行完毕,线程池就进入TERMINATED状态,此时线程池就结束了。
  • isTerminating()描述的是SHUTDOWN和STOP两种状态。
  • isShutdown()描述的是非RUNNING状态,也就是SHUTDOWN/STOP/TERMINATED三种状态。

Executor-Lifecycle

图1

线程池的API如下:

ExecutorService-LifeCycle

图2

其中shutdownNow()会返回那些已经进入了队列但是还没有执行的任务列表。awaitTermination描述的是等待线程池关闭的时间,如果等待时间线程池还没有关闭将会抛出一个超时异常。

对于关闭线程池期间发生的任务提交情况就会触发一个拒绝执行的操作。这是java.util.concurrent.RejectedExecutionHandler描述的任务操作。下一个小结中将描述这些任务被拒绝后的操作。

总结下这个小节:

  1. 线程池有运行、关闭、停止、结束四种状态,结束后就会释放所有资源
  2. 平缓关闭线程池使用shutdown()
  3. 立即关闭线程池使用shutdownNow(),同时得到未执行的任务列表
  4. 检测线程池是否正处于关闭中,使用isShutdown()
  5. 检测线程池是否已经关闭使用isTerminated()
  6. 定时或者永久等待线程池关闭结束使用awaitTermination()操作

来源: [http://www.blogjava.net/xylz/archive/2011/01/04/342316.html](http://www.blogjava.net/xylz/archive/2011/01/04/342316.html)

线程池数据结构与线程构造方法

由于已经看到了ThreadPoolExecutor的源码,因此很容易就看到了ThreadPoolExecutor线程池的数据结构。图1描述了这种数据结构。

ThreadPoolExecutor

图1 ThreadPoolExecutor 数据结构

其实,即使没有上述图形描述ThreadPoolExecutor的数据结构,我们根据线程池的要求也很能够猜测出其数据结构出来。

  • 线程池需要支持多个线程并发执行,因此有一个线程集合Collection来执行线程任务;
  • 涉及任务的异步执行,因此需要有一个集合来缓存任务队列Collection
  • 很显然在多个线程之间协调多个任务,那么就需要一个线程安全的任务集合,同时还需要支持阻塞、超时操作,那么BlockingQueue是必不可少的;
  • 既然是线程池,出发点就是提高系统性能同时降低资源消耗,那么线程池的大小就有限制,因此需要有一个核心线程池大小(线程个数)和一个最大线程池大小(线程个数),有一个计数用来描述当前线程池大小;
  • 如果是有限的线程池大小,那么长时间不使用的线程资源就应该销毁掉,这样就需要一个线程空闲时间的计数来描述线程何时被销毁;
  • 前面描述过线程池也是有生命周期的,因此需要有一个状态来描述线程池当前的运行状态;
  • 线程池的任务队列如果有边界,那么就需要有一个任务拒绝策略来处理过多的任务,同时在线程池的销毁阶段也需要有一个任务拒绝策略来处理新加入的任务;
  • 上面种的线程池大小、线程空闲实际那、线程池运行状态等等状态改变都不是线程安全的,因此需要有一个全局的锁(mainLock)来协调这些竞争资源;
  • 除了以上数据结构以外,ThreadPoolExecutor还有一些状态用来描述线程池的运行计数,例如线程池运行的任务数、曾经达到的最大线程数,主要用于调试和性能分析。

对于ThreadPoolExecutor而言,一个线程就是一个Worker对象,它与一个线程绑定,当Worker执行完毕就是线程执行完毕,这个在后面详细讨论线程池中线程的运行方式。

既然是线程池,那么就首先研究下线程的构造方法。 public interface ThreadFactory { Thread newThread(Runnable r); }

ThreadPoolExecutor使用一个线程工厂来构造线程。线程池都是提交一个任务Runnable,然后在某一个线程Thread中执行,ThreadFactory 负责如何创建一个新线程。

在J.U.C中有一个通用的线程工厂java.util.concurrent.Executors.DefaultThreadFactory,它的构造方式如下: static class DefaultThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }

在这个线程工厂中,同一个线程池的所有线程属于同一个线程组,也就是创建线程池的那个线程组,同时线程池的名称都是“pool--thread-”,其中poolNum是线程池的数量序号,threadNum是此线程池中的线程数量序号。这样如果使用jstack的话很容易就看到了系统中线程池的数量和线程池中线程的数量。另外对于线程池中的所有线程默认都转换为非后台线程,这样主线程退出时不会直接退出JVM,而是等待线程池结束。还有一点就是默认将线程池中的所有线程都调为同一个级别,这样在操作系统角度来看所有系统都是公平的,不会导致竞争堆积。

线程池中线程生命周期

一个线程Worker被构造出来以后就开始处于运行状态。以下是一个线程执行的简版逻辑。 private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { task.run(); } finally { runLock.unlock(); } } public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } }

ThreadPoolExecutor-Worker

当提交一个任务时,如果需要创建一个线程(何时需要在下一节中探讨)时,就调用线程工厂创建一个线程,同时将线程绑定到Worker工作队列中。需要说明的是,Worker队列构造的时候带着一个任务Runnable,因此Worker创建时总是绑定着一个待执行任务。换句话说,创建线程的前提是有必要创建线程(任务数已经超出了线程或者强制创建新的线程,至于为何强制创建新的线程后面章节会具体分析),不会无缘无故创建一堆空闲线程等着任务。这是节省资源的一种方式。

一旦线程池启动线程后(调用线程run())方法,那么线程工作队列Worker就从第1个任务开始执行(这时候发现构造Worker时传递一个任务的好处了),一旦第1个任务执行完毕,就从线程池的任务队列中取出下一个任务进行执行。循环如此,直到线程池被关闭或者任务抛出了一个RuntimeException。

由此可见,线程池的基本原理其实也很简单,无非预先启动一些线程,线程进入死循环状态,每次从任务队列中获取一个任务进行执行,直到线程池被关闭。如果某个线程因为执行某个任务发生异常而终止,那么重新创建一个新的线程而已。如此反复。

其实,线程池原理看起来简单,但是复杂的是各种策略,例如何时该启动一个线程,何时该终止、挂起、唤醒一个线程,任务队列的阻塞与超时,线程池的生命周期以及任务拒绝策略等等。下一节将研究这些策略问题。

来源: [http://www.blogjava.net/xylz/archive/2011/01/18/343183.html](http://www.blogjava.net/xylz/archive/2011/01/18/343183.html)

线程池任务执行流程

我们从一个API开始接触Executor是如何处理任务队列的。

java.util.concurrent.Executor.execute(Runnable) Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

线程池中所有任务执行都依赖于此接口。这段话有以下几个意思:

  1. 任务可能在将来某个时刻被执行,有可能不是立即执行。为什么这里有两个“可能”?继续往下面看。
  2. 任务可能在一个新的线程中执行或者线程池中存在的一个线程中执行。
  3. 任务无法被提交执行有以下两个原因:线程池已经关闭或者线程池已经达到了容量限制。
  4. 所有失败的任务都将被“当前”的任务拒绝策略RejectedExecutionHandler 处理。

回答上面两个“可能“。任务可能被执行,那不可能的情况就是上面说的情况3;可能不是立即执行,是因为任务可能还在队列中排队,因此还在等待分配线程执行。了解完了字面上的问题,我们再来看具体的实现。 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }

这一段代码看起来挺简单的,其实这就是线程池最重要的一部分,如果能够完全理解这一块,线程池还是挺容易的。整个执行流程是这样的:

  1. 如果任务command为空,则抛出空指针异常,返回。否则进行2。
  2. 如果当前线程池大小 大于或等于 核心线程池大小,进行4。否则进行3。
  3. 创建一个新工作队列(线程,参考上一节),成功直接返回,失败进行4。
  4. 如果线程池正在运行并且任务加入线程池队列成功,进行5,否则进行7。
  5. 如果线程池已经关闭或者线程池大小为0,进行6,否则直接返回。
  6. 如果线程池已经关闭则执行拒绝策略返回,否则启动一个新线程来进行执行任务,返回。
  7. 如果线程池大小 不大于 最大线程池数量,则启动新线程来进行执行,否则进行拒绝策略,结束。

文字描述步骤不够简单?下面图形详细表述了此过程。

Executor.execute

老实说这个图比上面步骤更难以理解,那么从何入手呢。

流程的入口很简单,我们就是要执行一个任务(Runnable command),那么它的结束点在哪或者有哪几个?

根据左边这个图我们知道可能有以下几种出口:

(1)图中的P1、P7,我们根据这条路径可以看到,仅仅是将任务加入任务队列(offer(command))了;

(2)图中的P3,这条路径不将任务加入任务队列,但是启动了一个新工作线程(Worker)进行扫尾操作,用户处理为空的任务队列;

(3)图中的P4,这条路径没有将任务加入任务队列,但是启动了一个新工作线程(Worker),并且工作现场的第一个任务就是当前任务;

(4)图中的P5、P6,这条路径没有将任务加入任务队列,也没有启动工作线程,仅仅是抛给了任务拒绝策略。P2是任务加入了任务队列却因为线程池已经关闭于是又从任务队列中删除,并且抛给了拒绝策略。

如果上面的解释还不清楚,可以去研究下面两段代码: java.util.concurrent.ThreadPoolExecutor.addIfUnderCorePoolSize(Runnable) java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Runnable) java.util.concurrent.ThreadPoolExecutor.ensureQueuedTaskHandled(Runnable)

那么什么时候一个任务被立即执行呢?

在线程池运行状态下,如果线程池大小 小于 核心线程池大小或者线程池已满(任务队列已满)并且线程池大小 小于 最大线程池大小(此时线程池大小 大于 核心线程池大小的),用程序描述为: runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())

上面的条件就是一个任务能够被立即执行的条件。

有了execute的基础,我们看看ExecutorService中的几个submit方法的实现。 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); execute(ftask); return ftask; } public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask; } public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask; }

很简单,不是么?对于一个线程池来说复杂的地方也就在execute方法的执行流程。在下一节中我们来讨论下如何获取任务的执行结果,也就是Future类的使用和原理。

来源: [http://www.blogjava.net/xylz/archive/2011/02/11/344091.html](http://www.blogjava.net/xylz/archive/2011/02/11/344091.html)

线程池任务执行结果

这一节来探讨下线程池中任务执行的结果以及如何阻塞线程、取消任务等等。 1 package info.imxylz.study.concurrency.future; 2 3 public class SleepForResultDemo implements Runnable { 4 5 static boolean result = false; 6 7 static void sleepWhile(long ms) { 8 try { 9 Thread.sleep(ms); 10 } catch (Exception e) {} 11 } 12 13 @Override 14 public void run() { 15 //do work 16 System.out.println("Hello, sleep a while."); 17 sleepWhile(2000L); 18 result = true; 19 } 20 21 public static void main(String[] args) { 22 SleepForResultDemo demo = new SleepForResultDemo(); 23 Thread t = new Thread(demo); 24 t.start(); 25 sleepWhile(3000L); 26 System.out.println(result); 27 } 28 29 } 30

在没有线程池的时代里面,使用Thread.sleep(long)去获取线程执行完毕的场景很多。显然这种方式很笨拙,他需要你事先知道任务可能的执行时间,并且还会阻塞主线程,不管任务有没有执行完毕。

1 package info.imxylz.study.concurrency.future; 2 3 public class SleepLoopForResultDemo implements Runnable { 4 5 boolean result = false; 6 7 volatile boolean finished = false; 8 9 static void sleepWhile(long ms) { 10 try { 11 Thread.sleep(ms); 12 } catch (Exception e) {} 13 } 14 15 @Override 16 public void run() { 17 //do work 18 try { 19 System.out.println("Hello, sleep a while."); 20 sleepWhile(2000L); 21 result = true; 22 } finally { 23 finished = true; 24 } 25 } 26 27 public static void main(String[] args) { 28 SleepLoopForResultDemo demo = new SleepLoopForResultDemo(); 29 Thread t = new Thread(demo); 30 t.start(); 31 while (!demo.finished) { 32 sleepWhile(10L); 33 } 34 System.out.println(demo.result); 35 } 36 37 } 38

使用volatile与while死循环的好处就是等待的时间可以稍微小一点,但是依然有CPU负载高并且阻塞主线程的问题。最简单的降低CPU负载的方式就是使用Thread.join().

    SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
    Thread t = new Thread(demo);
    t.start();
    t.join();
    System.out.println(demo.result);

显然这也是一种不错的方式,另外还有自己写锁使用wait/notify的方式。其实join()从本质上讲就是利用while和wait来实现的。

上面的方式中都存在一个问题,那就是会阻塞主线程并且任务不能被取消。为了解决这个问题,线程池中提供了一个Future接口。

ThreadPoolExecutor-Future

在Future接口中提供了5个方法。

  • V get() throws InterruptedException, ExecutionException: 等待计算完成,然后获取其结果。
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException。最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
  • boolean cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。
  • boolean isCancelled():如果在任务正常完成前将其取消,则返回 true。
  • boolean isDone():如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。

API看起来容易,来研究下异常吧。get()请求获取一个结果会阻塞当前进程,并且可能抛出以下三种异常:

  • InterruptedException:执行任务的线程被中断则会抛出此异常,此时不能知道任务是否执行完毕,因此其结果是无用的,必须处理此异常。
  • ExecutionException:任务执行过程中(Runnable/#run())方法可能抛出RuntimeException,如果提交的是一个java.util.concurrent.Callable接口任务,那么java.util.concurrent.Callable.call()方法有可能抛出任意异常。
  • CancellationException:实际上get()方法还可能抛出一个CancellationException的RuntimeException,也就是任务被取消了但是依然去获取结果。

对于get(long timeout, TimeUnit unit)而言,除了get()方法的异常外,由于有超时机制,因此还可能得到一个TimeoutException。

boolean cancel(boolean mayInterruptIfRunning)方法比较复杂,各种情况比较多:

  1. 如果任务已经执行完毕,那么返回false。
  2. 如果任务已经取消,那么返回false。
  3. 循环直到设置任务为取消状态,对于未启动的任务将永远不再执行,对于正在运行的任务,将根据mayInterruptIfRunning是否中断其运行,如果不中断那么任务将继续运行直到结束。
  4. 此方法返回后任务要么处于运行结束状态,要么处于取消状态。isDone()将永远返回true,如果cancel()方法返回true,isCancelled()始终返回true。

来看看Future接口的实现类java.util.concurrent.FutureTask具体是如何操作的。

在FutureTask中使用了一个AQS数据结构来完成各种状态以及加锁、阻塞的实现。

在此AQS类java.util.concurrent.FutureTask.Sync中一个任务用4中状态:

ThreadPoolExecutor-FutureTask-state

初始情况下任务状态state=0,任务执行(innerRun)后状态变为运行状态RUNNING(state=1),执行完毕后变成运行结束状态RAN(state=2)。任务在初始状态或者执行状态被取消后就变为状态CANCELLED(state=4)。AQS最擅长无锁情况下处理几种简单的状态变更的。 void innerRun() { if (!compareAndSetState(0, RUNNING)) return; try { runner = Thread.currentThread(); if (getState() == RUNNING) // recheck after setting thread innerSet(callable.call()); else releaseShared(0); // cancel } catch (Throwable ex) { innerSetException(ex); } }

执行一个任务有四步:设置运行状态、设置当前线程(AQS需要)、执行任务(Runnable/#run或者Callable/#call)、设置执行结果。这里也可以看到,一个任务只能执行一次,因为执行完毕后它的状态不在为初始值0,要么为CANCELLED,要么为RAN。

取消一个任务(cancel)又是怎样进行的呢?对比下前面取消任务的描述是不是很简单,这里无非利用AQS的状态来改变任务的执行状态,最终达到放弃未启动或者正在执行的任务的目的。 boolean innerCancel(boolean mayInterruptIfRunning) { for (;;) { int s = getState(); if (ranOrCancelled(s)) return false; if (compareAndSetState(s, CANCELLED)) break; } if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); } releaseShared(0); done(); return true; }

到目前为止我们依然没有说明到底是如何阻塞获取一个结果的。下面四段代码描述了这个过程。

1 V innerGet() throws InterruptedException, ExecutionException { 2 acquireSharedInterruptibly(0); 3 if (getState() == CANCELLED) 4 throw new CancellationException(); 5 if (exception != null) 6 throw new ExecutionException(exception); 7 return result; 8 } 9 //AQS/#acquireSharedInterruptibly 10 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { 11 if (Thread.interrupted()) 12 throw new InterruptedException(); 13 if (tryAcquireShared(arg) < 0) 14 doAcquireSharedInterruptibly(arg); //park current Thread for result 15 } 16 protected int tryAcquireShared(int ignore) { 17 return innerIsDone()? 1 : -1; 18 } 19 20 boolean innerIsDone() { 21 return ranOrCancelled(getState()) && runner == null; 22 }

当调用Future/#get()的时候尝试去获取一个共享变量。这就涉及到AQS的使用方式了。这里获取一个共享变量的状态是任务是否结束(innerIsDone()),也就是任务是否执行完毕或者被取消。如果不满足条件,那么在AQS中就会doAcquireSharedInterruptibly(arg)挂起当前线程,直到满足条件。AQS前面讲过,挂起线程使用的是LockSupport的park方式,因此性能消耗是很低的。

至于将Runnable接口转换成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供了一个简单实现。 static final class RunnableAdapter implements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }

延迟、周期性任务调度的实现

java.util.concurrent.ScheduledThreadPoolExecutor是默认的延迟、周期性任务调度的实现。

有了整个线程池的实现,再回头来看延迟、周期性任务调度的实现应该就很简单了,因为所谓的延迟、周期性任务调度,无非添加一系列有序的任务队列,然后按照执行顺序的先后来处理整个任务队列。如果是周期性任务,那么在执行完毕的时候加入下一个时间点的任务即可。

由此可见,ScheduledThreadPoolExecutor和ThreadPoolExecutor的唯一区别在于任务是有序(按照执行时间顺序)的,并且需要到达时间点(临界点)才能执行,并不是任务队列中有任务就需要执行的。也就是说唯一不同的就是任务队列BlockingQueue workQueue不一样。ScheduledThreadPoolExecutor的任务队列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,它是基于java.util.concurrent.DelayQueue队列的实现。

DelayQueue是基于有序队列PriorityQueue实现的。PriorityQueue 也叫优先级队列,按照自然顺序对元素进行排序,类似于TreeMap/Collections.sort一样。

同样是有序队列,DelayQueue和PriorityQueue区别在什么地方?

由于DelayQueue在获取元素时需要检测元素是否“可用”,也就是任务是否达到“临界点”(指定时间点),因此加入元素和移除元素会有一些额外的操作。

典型的,移除元素需要检测元素是否达到“临界点”,增加元素的时候如果有一个元素比“头元素”更早达到临界点,那么就需要通知任务队列。因此这需要一个条件变量final Condition available 。

移除元素(出队列)的过程是这样的:

  • 总是检测队列的头元素(顺序最小元素,也是最先达到临界点的元素)
  • 检测头元素与当前时间的差,如果大于0,表示还未到底临界点,因此等待响应时间(使用条件变量available)
  • 如果小于或者等于0,说明已经到底临界点或者已经过了临界点,那么就移除头元素,并且唤醒其它等待任务队列的线程。 public E take() throws InterruptedException {
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
          for (;;) {
              E first = q.peek();
              if (first == null) {
                  available.await();
              } else {
                  long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                  if (delay > 0) {
                      long tl = available.awaitNanos(delay);
                  } else {
                      E x = q.poll();
                      assert x != null;
                      if (q.size() != 0)
                          available.signalAll(); // wake up other takers
                      return x;
                  }
              }
          }
      } finally {
          lock.unlock();
      }
    
    }

同样加入元素也会有相应的条件变量操作。当前仅当队列为空或者要加入的元素比队列中的头元素还小的时候才需要唤醒“等待线程”去检测元素。因为头元素都没有唤醒那么比头元素更延迟的元素就更加不会唤醒。

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        q.offer(e);
        if (first == null || e.compareTo(first) < 0)
            available.signalAll();
        return true;
    } finally {
        lock.unlock();
    }
}

有了任务队列后再来看Future在ScheduledThreadPoolExecutor中是如何操作的。

java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask是继承java.util.concurrent.FutureTask的,区别在于执行任务是否是周期性的。 private void runPeriodic() { boolean ok = ScheduledFutureTask.super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { long p = period; if (p > 0) time += p; else time = now() - p; ScheduledThreadPoolExecutor.super.getQueue().add(this); } // This might have been the final executed delayed // task. Wake up threads to check. else if (down) interruptIdleWorkers(); } /// / Overrides FutureTask version so as to reset/requeue if periodic. // public void run() { if (isPeriodic()) runPeriodic(); else ScheduledFutureTask.super.run(); } }

如果不是周期性任务调度,那么就和java.util.concurrent.FutureTask.Sync的调度方式是一样的。如果是周期性任务(isPeriodic())那么就稍微有所不同的。

ScheduledThreadPoolExecutor-ScheduledFutureTask

先从功能/结构上分析下。第一种情况假设提交的任务每次执行花费10s,间隔(delay/period)为20s,对于scheduleAtFixedRate而言,每次执行开始时间20s,对于scheduleWithFixedDelay来说每次执行开始时间30s。第二种情况假设提交的任务每次执行时间花费20s,间隔(delay/period)为10s,对于scheduleAtFixedRate而言,每次执行开始时间10s,对于scheduleWithFixedDelay来说每次执行开始时间30s。(具体分析可以参考这里

也就是说scheduleWithFixedDelay的执行开始时间为(delay+cost),而对于scheduleAtFixedRate来说执行开始时间为max(period,cost)。

回头再来看上面源码runPeriodic()就很容易了。但特别要提醒的,如果任务的任何一个执行遇到异常,则后续执行都会被取消,这从runPeriodic()就能看出。要强调的第二点就是同一个周期性任务不会被同时执行。就比如说尽管上面第二种情况的scheduleAtFixedRate任务每隔10s执行到达一个时间点,但是由于每次执行时间花费为20s,因此每次执行间隔为20s,只不过执行的任务次数会多一点。但从本质上讲就是每隔20s执行一次,如果任务队列不取消的话。

为什么不会同时执行?

这是因为ScheduledFutureTask执行的时候会将任务从队列中移除来,执行完毕以后才会添加下一个同序列的任务,因此任务队列中其实最多只有同序列的任务的一份副本,所以永远不会同时执行(尽管要执行的时间在过去)。

ScheduledThreadPoolExecutor使用一个无界(容量无限,整数的最大值)的容器(DelayedWorkQueue队列),根据ThreadPoolExecutor的原理,只要当容器满的时候才会启动一个大于corePoolSize的线程数。因此实际上ScheduledThreadPoolExecutor是一个固定线程大小的线程池,固定大小为corePoolSize,构造函数里面的Integer.MAX_VALUE其实是不生效的(尽管PriorityQueue使用数组实现有PriorityQueue大小限制,如果你的任务数超过了2147483647就会导致OutOfMemoryError,这个参考PriorityQueue的grow方法)。

再回头看scheduleAtFixedRate等方法就容易多了。无非就是往任务队列中添加一个未来某一时刻的ScheduledFutureTask任务,如果是scheduleAtFixedRate那么period/delay就是正数,如果是scheduleWithFixedDelay那么period/delay就是一个负数,如果是0那么就是一次性任务。直接调用父类ThreadPoolExecutor的execute/submit等方法就相当于period/delay是0,并且initialDelay也是0。 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); if (initialDelay < 0) initialDelay = 0; long triggerTime = now() + unit.toNanos(initialDelay); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask(command, null, triggerTime, unit.toNanos(period))); delayedExecute(t); return t; }

另外需要补充说明的一点,前面说过java.util.concurrent.FutureTask.Sync任务只能执行一次,那么在runPeriodic()里面怎么又将执行过的任务加入队列中呢?这是因为java.util.concurrent.FutureTask.Sync提供了一个innerRunAndReset()方法,此方法不仅执行任务还将任务的状态还原成0(初始状态)了,所以此任务就可以重复执行。这就是为什么runPeriodic()里面调用runAndRest()的缘故。

    boolean innerRunAndReset() {
        if (!compareAndSetState(0, RUNNING))
            return false;
        try {
            runner = Thread.currentThread();
            if (getState() == RUNNING)
                callable.call(); // don't set result
            runner = null;
            return compareAndSetState(RUNNING, 0);
        } catch (Throwable ex) {
            innerSetException(ex);
            return false;
        }
    }

后话

整个并发实践原理和实现(源码)上的东西都讲完了,后面几个小节是一些总结和扫尾的工作,包括超时机制、异常处理等一些细节问题。也就是说大部分只需要搬出一些理论和最佳实践知识出来就好了,不会有大量费脑筋的算法分析和原理、思想探讨之类的。后面的章节也会加快一些进度。

老实说从刚开始的好奇到中间的兴奋,再到现在的彻悟,收获还是很多,个人觉得这是最认真、最努力也是自我最满意的一次技术研究和探讨,同时在这个过程中将很多技术细节都串联起来了,慢慢就有了那种技术相通的感觉。原来有了理论以后再去实践、再去分析问题、解决问题和那种纯解决问题得到的经验完全不一样。整个专辑下来不仅仅是并发包这一点点知识,设计到硬件、软件、操作系统、网络、安全、性能、算法、理论等等,总的来说这也算是一次比较成功的研究切入点,这比Guice那次探讨要深入和持久的多。 来源: [http://www.blogjava.net/xylz/archive/2011/02/13/344207.html](http://www.blogjava.net/xylz/archive/2011/02/13/344207.html)

并发操作异常体系

并发包引入的工具类很多方法都会抛出一定的异常,这些异常描述了任务在线程池中执行时发生的例外情况,而通常这些例外需要应用程序进行捕捉和处理。

例如在Future接口中有如下一个API:

java.util.concurrent.Future.get(long, TimeUnit) throws InterruptedException, ExecutionException, TimeoutException;

前面的章节中描述了Future类的具体实现原理。这里不再讨论,但是比较好奇的抛出的三个异常。

这里有一篇文章(Java 理论与实践: 处理 InterruptedException)描述了InterruptedException的来源和处理方式。简单的说就是线程在执行的过程中被自己或者别人中断了。这时候为了响应中断就需要处理当前的异常。

对于java.lang.Thread而言,InterruptedException也是一个很诡异的问题。

中断一个线程Thread.interrupt()时会触发下面一种情况: 如果线程在调用 Object 类的 wait()、wait(long) 或 wait(long, int) 方法,或者该类的 join()、join(long)、join(long, int)、sleep(long) 或 sleep(long, int) 方法过程中受阻,则其中断状态将被清除,它还将收到一个 InterruptedException。

检测一个线程的中断状态描述是这样的Thread.interrupted():

测试当前线程是否已经中断。线程的中断状态 由该方法清除。换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态之后,且第二次调用检验完中断状态前,当前线程再次中断的情况除外)。

也就是说如果检测到一个线程已经被中断了,那么线程的使用方(挂起、等待或者正在执行)都将应该得到一个中断异常,同时将会清除异常中断状态。

V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; }

上面获取任务结果的方法实现中,将在获取锁的过程中得到一个中断异常。代码java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(int, long)描述了这种情况: public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }

这里在获取锁的时候检测线程中断情况,如果被中断则清除中断位,同时抛出一个中断异常。为什么如此做?因为我们的线程在线程池中是被重复执行的,所以一旦线程被中断后并不会退出线程,而是设置中断位,等候任务队列自己处理线程,从而达到线程被重复利用的目的。有兴趣的可以参考代码java.util.concurrent.ThreadPoolExecutor.Worker.runTask(Runnable)。这里在关闭线程池时就会导致中断所有线程。

除了InterruptedException 异常我们还发现了一个全新的异常java.util.concurrent.TimeoutException,此异常是用来描述任务执行时间超过了期望等待时间,也许是一直没有获取到锁,也许是还没有执行完成。

在innerGet代码片段中我们看到,如果线程在指定的时间无法获取到锁,那么就会得到一个超时异常。这个很好理解,比如如果执行一个非常耗时的网络任务,我们不希望任务一直等待从而占用大量的资源,可能在一定时间后就会希望取消此操作。此时超时异常很好的描述了这种需求。

与此同时,如果取消了一个任务,那么再次从任务中获取执行结果,那么将会得到一个任务被取消的异常java.util.concurrent.CancellationException。

除了上述异常外,还将得到一个java.util.concurrent.ExecutionException异常,

这是因为我们的提交的任务java.util.concurrent.Callable在call()方法中允许抛出任何异常,另外常规的线程执行也可能抛出一个RuntimeException,所以这里简单包装了下所有异常,当作执行过程中发生的异常ExecutionException抛出。

以上就是整个异常体系,所有并发操作的异常都可以归结于上述几类。

很多情况下处理时间长度都是用java.util.concurrent.TimeUnit,这是一个枚举类型,用来描述时间长度。其中内置了一些长度的单位。其中包括纳秒、微秒、毫秒、秒、分、时、天。例如超时操作5秒,可以使用

Future.get(5,TimeUnit.SECONDS) 或者 Future.get(5000L,TimeUnit.MILLISECONDS)

当然一种单位的时间转换成另一种单位的时间也是非常方便的。另外还有线程的sleep/join以及对象的wait操作的便捷操作。 来源: [http://www.blogjava.net/xylz/archive/2011/07/12/354206.html](http://www.blogjava.net/xylz/archive/2011/07/12/354206.html)

深入浅出 Java Concurrency (37)

Posted on

深入浅出 Java Concurrency (37): 并发总结

深入浅出 Java Concurrency (37): 并发总结 part 1 死锁与活跃度

死锁与活跃度

前面谈了很多并发的特性和工具,但是大部分都是和锁有关的。我们使用锁来保证线程安全,但是这也会引起一些问题。

  • 锁顺序死锁(lock-ordering deadlock):多个线程试图通过不同的顺序获得多个相同的资源,则发生的循环锁依赖现象。
  • 动态的锁顺序死锁(Dynamic Lock Order Deadlocks):多个线程通过传递不同的锁造成的锁顺序死锁问题。
  • 资源死锁(Resource Deadlocks):线程间相互等待对方持有的锁,并且谁都不会释放自己持有的锁发生的死锁。也就是说当现场持有和等待的目标成为资源,就有可能发生此死锁。这和锁顺序死锁不一样的地方是,竞争的资源之间并没有严格先后顺序,仅仅是相互依赖而已。

锁顺序死锁

最经典的锁顺序死锁就是LeftRightDeadLock.

public class LeftRightDeadLock { final Object left = new Object(); final Object right = new Object(); public void doLeftRight() { synchronized (left) { synchronized (right) { execute1(); } } } public void doRightLeft() { synchronized (right) { synchronized (left) { execute2(); } } } private void execute2() { } private void execute1() { } }

这个例子很简单,当两个线程分别获取到left和right锁时,互相等待对方释放其对应的锁,很显然双方都陷入了绝境。

动态的锁顺序死锁

与锁顺序死锁不同的是动态的锁顺序死锁只是将静态的锁变成了动态锁。 一个比较生动的例子是这样的。

public void transferMoney(Account fromAccount,// Account toAccount,// int amount ) { synchronized (fromAccount) { synchronized (toAccount) { fromAccount.decr(amount); toAccount.add(amount); } } } 当我们银行转账的时候,我们期望锁住双方的账户,这样保证是原子操作。 看起来很合理,可是如果双方同时在进行转账操作,那么就有可能发生死锁的可能性。

很显然,动态的锁顺序死锁的解决方案应该看起来和锁顺序死锁解决方案差不多。 但是一个比较特殊的解决方式是纠正这种顺序。 例如可以调整成这样: Object lock = new Object(); public void transferMoney(Account fromAccount,// Account toAccount,// int amount ) { int order = fromAccount.name().compareTo(toAccount.name()); Object lockFirst = order>0?toAccount:fromAccount; Object lockSecond = order>0?fromAccount:toAccount; if(order==0){ synchronized(lock){ synchronized(lockFirst){ synchronized(lockSecond){ //do work } } } }else{ synchronized(lockFirst){ synchronized(lockSecond){ //do work } } } }

这个挺有意思的。比较两个账户的顺序,保证此两个账户之间的传递顺序总是按照某一种锁的顺序进行的, 即使多个线程同时发生,也会遵循一次操作完释放完锁才进行下一次操作的顺序,从而可以避免死锁的发生。

资源死锁

资源死锁比较容易理解,就是需要的资源远远大于已有的资源,这样就有可能线程间的资源竞争从而发生死锁。 一个简单的场景是,应用同时从两个连接池中获取资源,两个线程都在等待对方释放连接池的资源以便能够同时获取 到所需要的资源,从而发生死锁。

资源死锁除了这种资源之间的直接依赖死锁外,还有一种叫线程饥饿死锁(thread-starvation deadlock)。 严格意义上讲,这种死锁更像是活跃度问题。例如提交到线程池中的任务由于总是不能够抢到线程从而一直不被执行, 造成任务的“假死”状况。

除了上述几种问题外,还有协作对象间的死锁以及开发调用的问题。这个描述起来会比较困难,也不容易看出死锁来。

避免和解决死锁

通常发生死锁后程序难以自恢复。但也不是不能避免的。 有一些技巧和原则是可以降低死锁可能性的。

最简单的原则是尽可能的减少锁的范围。锁的范围越小,那么竞争的可能性也越小。 尽快释放锁也有助于避开锁顺序。如果一个线程每次最多只能够获取一个锁,那么就不会产生锁顺序死锁。尽管应用中比较困难,但是减少锁的边界有助于分析程序的设计和简化流程。 减少锁之间的依赖以及遵守获取锁的顺序是避免锁顺序死锁的有效途径。

另外尽可能的使用定时的锁有助于程序从死锁中自恢复。 例如对于上述顺序锁死锁中,使用定时锁很容易解决此问题。

public void doLeftRight() throws Exception { boolean over = false; while (!over) { if (left.tryLock(1, TimeUnit.SECONDS)) { try { if (right.tryLock(1, TimeUnit.SECONDS)) { try { execute1(); } finally { right.unlock(); over = true; } } } finally { left.unlock(); } } } } public void doRightLeft() throws Exception { boolean over = false; while (!over) { if (right.tryLock(1, TimeUnit.SECONDS)) { try { if (left.tryLock(1, TimeUnit.SECONDS)) { try { execute2(); } finally { left.unlock(); over = true; } } } finally { right.unlock(); } } } } 看起来代码会比较复杂,但是这是避免死锁的有效方式。

活跃度

对于多线程来说,死锁是非常严重的系统问题,必须修正。除了死锁,遇到很多的就是活跃度问题了。 活跃度问题主要包括:饥饿,丢失信号,和活锁等。

饥饿

饥饿是指线程需要访问的资源被永久拒绝,以至于不能在继续进行。 比如说:某个权重比较低的线程可能一直不能够抢到CPU周期,从而一直不能够被执行。

也有一些场景是比较容易理解的。对于一个固定大小的连接池中,如果连接一直被用完,那么过多的任务可能由于一直无法抢占到连接从而不能够被执行。这也是饥饿的一种表现。

对于饥饿而言,就需要平衡资源的竞争,例如线程的优先级,任务的权重,执行的周期等等。总之,当空闲的资源较多的情况下,发生饥饿的可能性就越小。

弱响应性

弱响应是指,线程最终能够得到有效的执行,只是等待的响应时间较长。 最常见的莫过于GUI的“假死”了。很多时候GUI的响应只是为了等待后台数据的处理,如果线程协调不好,很有可能就会发生“失去响应”的现象。

另外,和饥饿很类似的情况。如果一个线程长时间独占一个锁,那么其它需要此锁的线程很有可能就会被迫等待。

活锁

活锁(Livelock)是指线程虽然没有被阻塞,但是由于某种条件不满足,一直尝试重试,却终是失败。

考虑一个场景,我们从队列中拿出一个任务来执行,如果任务执行失败,那么将任务重新加入队列,继续执行。假如任务总是执行失败,或者某种依赖的条件总是不满足,那么线程一直在繁忙却没有任何结果。

错误的循环引用和判断也有可能导致活锁。当某些条件总是不能满足的时候,可能陷入死循环的境地。

线程间的协同也有可能导致活锁。例如如果两个线程发生了某些条件的碰撞后重新执行,那么如果再次尝试后依然发生了碰撞,长此下去就有可能发生活锁。

解决活锁的一种方案是对重试机制引入一些随机性。例如如果检测到冲突,那么就暂停随机的一定时间进行重试。这回大大减少碰撞的可能性。

另外为了避免可能的死锁,适当加入一定的重试次数也是有效的解决办法。尽管这在业务上会引起一些复杂的逻辑处理。 来源: [http://www.blogjava.net/xylz/archive/2011/12/29/365149.html](http://www.blogjava.net/xylz/archive/2011/12/29/365149.html)

常见的并发场景

线程池

并发最常见用于线程池,显然使用线程池可以有效的提高吞吐量。

最常见、比较复杂一个场景是Web容器的线程池。Web容器使用线程池同步或者异步处理HTTP请求,同时这也可以有效的复用HTTP连接,降低资源申请的开销。通常我们认为HTTP请求时非常昂贵的,并且也是比较耗费资源和性能的,所以线程池在这里就扮演了非常重要的角色。

线程池的章节中非常详细的讨论了线程池的原理和使用,同时也提到了,线程池的配置和参数对性能的影响是巨大的。不尽如此,受限于资源(机器的性能、网络的带宽等等)、依赖的服务,客户端的响应速度等,线程池的威力也不会一直增长。达到了线程池的瓶颈后,性能和吞吐量都会大幅度降低。

一直增加机器的性能或者增大线程的个数,并不一定能有效的提高吞吐量。高并发的情况下,机器的负载会大幅提升,这时候机器的稳定性、服务的可靠性都会下降。

尽管如此,线程池依然是提高吞吐量的一个有效措施,配合合适的参数能够有效的充分利用资源,提高资源的利用率。

任务队列

除了线程池是比较发杂的并发场景外,任务队列也是一个不错的并发工具。JDK内部有大量的队列(Queue),这些工具不仅能够方便使用,提高生产力,也能够进行组合适应于不同的场景。即使线程池内部,也是用了任务队列来处理任务的积压,平衡资源的消耗。

安全的任务队列能够有效的平衡机器的复杂,抵消由于峰值和波动带来的不稳定,有效提高服务的可靠性。同时任务队列的处理也有助于统计和分析服务的状况。

任务队列也可以在多个线程之间传递数据,有助于并行处理任务。例如经典的“生产者-消费者”模型就可以有效的提高多个线程的并行处理能力。在IO延时比较大的服务中尤其有效。 我最喜欢的一个案例是导数据是,一个线程负责往固定大小的任务队列中压入大量的数据,队列满了以后就暂停,另外几个线程负责从任务队列中获取数据并消费。这将串行的“生产-消费”,变成了并行的“生产-消费”。实践证明极大的节省任务处理时间。

异步处理

线程池也是异步处理的一种表现形式,除此之外,使用异步处理的目的也是为了提高服务的处理速度。 例如AOP的一个例子就是使用切面来记录日志,如果说我们要远程收集日志,显然不希望由于收集日志而影响服务本身。这时候就将日志收集的过程进行异步处理。

如今大量的开源组件都喜欢使用异步处理来提高IO的效率,某些不需要同步返回的操作使用异步处理后能够有效的提高吞吐量。

当然,异步也不总是令人满意的,也会有相应的问题。例如引入异步设计后的复杂性,线程中断后的处理机制,失败后的处理策略,产生的消息比消费的还快时怎么办,关闭程序时如何关闭异步处理逻辑等等。这都会增加系统的复杂性。

尽管大量的服务、业务使用异步来处理,但是很显然需要有保障机制能够保证异步处理的逻辑正确性。如果认为异步处理的任务不是特别重要,或者说主业务不能因为附属业务的逻辑出错而崩溃,那么使用异步处理是正确的选择。

同步操作

并发操作的同时还需要维护数据的一致性,或多或少的会涉及到同步操作。正确的使用原子操作,合理的使用独占锁和读写锁也是一个很大的挑战。

线程间的协调与通信,尤其是状态的同步都是比较困难的。我们看到线程池ThreadPoolExecutor的实现为了解决各个线程的执行状态,引入的很多的同步操作。线程越来越多的情况下,同步的成本会越来越高,同时也有可能引入死锁的情况。

尽管如此,单个JVM内部的多线程同步还是比较容易控制的。JDK内部也提供了大量的工具来方便完成数据的同步。例如Lock/Condition/CountDownLatch/CyclicBarrier/Semaphore/Exchanger等等。

分布式锁

分布式的并发问题更难以处理,根据CAP的原理,基本上没有一个至善至美的方案。 分布式资源协调使用分布式锁是一个不错的选择。Google的分布式锁(建立在BigTable之上),Zookeeper的分布式锁,甚至简单的利用memcache的add操作或者redis的setnx操作建立伪分布式锁也可以解决类似的问题。 来源: [http://www.blogjava.net/xylz/archive/2011/12/29/367480.html](http://www.blogjava.net/xylz/archive/2011/12/29/367480.html)

常见的并发陷阱

volatile

volatile只能强调数据的可见性,并不能保证原子操作和线程安全,因此volatile不是万能的。参考指令重排序

volatile最常见于下面两种场景。

a. 循环检测机制 volatile boolean done = false; while( ! done ){ dosomething(); }

b. 单例模型 (http://www.blogjava.net/xylz/archive/2009/12/18/306622.html)

public class DoubleLockSingleton { private static volatile DoubleLockSingleton instance = null; private DoubleLockSingleton() { } public static DoubleLockSingleton getInstance() { if (instance == null) { synchronized (DoubleLockSingleton.class) { if (instance == null) { instance = new DoubleLockSingleton(); } } } return instance; } }

synchronized/Lock

看起来Lock有更好的性能以及更灵活的控制,是否完全可以替换synchronized?

锁的一些其它问题中说过,synchronized的性能随着JDK版本的升级会越来越高,而Lock优化的空间受限于CPU的性能,很有限。另外JDK内部的工具(线程转储)对synchronized是有一些支持的(方便发现死锁等),而对Lock是没有任何支持的。

也就说简单的逻辑使用synchronized完全没有问题,随着机器的性能的提高,这点开销是可以忽略的。而且从代码结构上讲是更简单的。简单就是美。

对于复杂的逻辑,如果涉及到读写锁、条件变量、更高的吞吐量以及更灵活、动态的用法,那么就可以考虑使用Lock。当然这里尤其需要注意Lock的正确用法。 Lock lock = lock.lock(); try{ //do something }finally{ lock.unlock(); }

一定要将Lock的释放放入finally块中,否则一旦发生异常或者逻辑跳转,很有可能会导致锁没有释放,从而发生死锁。而且这种死锁是难以排查的。

如果需要synchronized无法做到的尝试锁机制,或者说担心发生死锁无法自恢复,那么使用tryLock()是一个比较明智的选择的。 Lock lock = if(lock.tryLock()){ try{ //do something }finally{ lock.unlock(); } }

甚至可以使用获取锁一段时间内超时的机制Lock.tryLock(long,TimeUnit)。 锁的使用可以参考前面文章的描述和建议。

锁的边界

一个流行的错误是这样的。 ConcurrentMap map = new ConcurrentHashMap(); if(!map.containsKey(key)){ map.put(key,value); }

看起来很合理的,对于一个线程安全的Map实现,要存取一个不重复的结果,先检测是否存在然后加入。 其实我们知道两个原子操作和在一起的指令序列不代表就是线程安全的。 割裂的多个原子操作放在一起在多线程的情况下就有可能发生错误。

实际上ConcurrentMap提供了putIfAbsent(K, V)的“原子操作”机制,这等价于下面的逻辑: if(map.containsKey(key)){ return map.get(key); }else{ return map.put(k,v); }

除了putIfAbsent还有replace(K, V)以及replace(K, V, V)两种机制来完成组合的操作。

提到Map,这里有一篇谈HashMap读写并发的问题。

构造函数启动线程

下面的实例是在构造函数中启动一个线程。 public class Runner{ int x,y; Thread thread; public Runner(){ this.x=1; this.y=2; this.thread=new MyThread(); this.thread.start(); } }

这里可能存在的陷阱是如果此类被继承,那么启动的线程可能无法正确读取子类的初始化操作。

因此一个简单的原则是,禁止在构造函数中启动线程,可以考虑但是提供一个方法来启动线程。如果非要这么做,最好将类设置为final,禁止继承。

丢失通知的问题

这篇文章里面提到过notify丢失通知的问题。

对于wait/notify/notifyAll以及await/singal/singalAll,如果不确定到底是否能够正确的收到消息,担心丢失通知,简单一点就是总是通知所有。

如果担心只收到一次消息,使用循环一直监听是不错的选择。

非常主用性能的系统,可能就需要区分到底是通知单个还是通知所有的挂起者。

线程数

并不是线程数越多越好,在下一篇文章里面会具体了解下性能和可伸缩性。 简单的说,线程数多少没有一个固定的结论,受限于CPU的内核数,IO的性能以及依赖的服务等等。因此选择一个合适的线程数有助于提高吞吐量。

对于CPU密集型应用,线程数和CPU的内核数一致有助于提高吞吐量,所有CPU都很繁忙,效率就很高。 对于IO密集型应用,线程数受限于IO的性能,某些时候单线程可能比多线程效率更高。但通常情况下适当提高线程数,有利于提高网络IO的效率,因为我们总是认为网络IO的效率比较低。

对于线程池而言,选择合适的线程数以及任务队列是提高线程池效率的手段。 public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

对于线程池来说,如果任务总是有积压,那么可以适当提高corePoolSize大小;如果机器负载较低,那么可以适当提高maximumPoolSize的大小;任务队列不长的情况下减小keepAliveTime的时间有助于降低负载;另外任务队列的长度以及任务队列的拒绝策略也会对任务的处理有一些影响。 来源: [http://www.blogjava.net/xylz/archive/2011/12/30/367592.html](http://www.blogjava.net/xylz/archive/2011/12/30/367592.html)

性能与伸缩性

使用线程的一种说法是为了提高性能。多线程可以使程序充分利用闲置的资源,提高资源的利用率,同时能够并行处理任务,提高系统的响应性。 但是很显然,引入线程的同时也引入了系统的复杂性。另外系统的性能并不是总是随着线程数的增加而总是提高。

性能与伸缩性

性能的提升通常意味着可以用更少的资源做更多的事情。这里资源是包括我们常说的CPU周期、内存、网络带宽、磁盘IO、数据库、WEB服务等等。 引入多线程可以充分利用多核的优势,充分利用IO阻塞带来的延迟,也可以降低网络开销带来的影响,从而提高单位时间内的响应效率。

为了提高性能,需要有效的利用我们现有的处理资源,同时也要开拓新的可用资源。例如,对于CPU而言,理想状况下希望CPU能够满负荷工作。当然这里满负荷工作是指做有用的事情,而不是无谓的死循环或者等待。受限于CPU的计算能力,如果CPU达到了极限,那么很显然我们充分利用了计算能力。对于IO而言(内存、磁盘、网络等),如果达到了其对于的带宽,这些资源的利用率也就上去了。理想状况下所有资源的能力都被用完了,那么这个系统的性能达到了最大值。

为了衡量系统的性能,有一些指标用于定性、定量的分析。例如服务时间、等待时间、吞吐量、效率、可伸缩性、生成量等等。服务时间、等待时间等用于衡量系统的效率,即到底有多快。吞吐量、生成量等用于衡量系统的容量,即能够处理多少数据。除此之外,有效服务时间、中断时间等用于能力系统的可靠性和稳定性等。

可伸缩性的意思是指增加计算资源,吞吐量和生产量相应得到的改进。 从算法的角度讲,通常用复杂度来衡量其对应的性能。例如时间复杂度、空间复杂度等。

Amdahl定律

并行的任务增加资源显然能够提高性能,但是如果是串行的任务,增加资源并不一定能够得到合理的性能提升。 Amdahl定律描述的在一个系统中,增加处理器资源对系统行的提升比率。 假定在一个系统中,F是必须串行化执行的比重,N是处理器资源,那么随着N的增加最多增加的加速比:

理论上,当N趋近于无穷大时,加速比最大值无限趋近于1/F。 这意味着如果一个程序的串行化比重为50%,那么并行化后最大加速比为2倍。

加速比除了可以用于加速的比率外,也可以用于衡量CPU资源的利用率。如果每一个CPU的资源利用率为100%,那么CPU的资源每次翻倍时,加速比也应该翻倍。 事实上,在拥有10个处理器的系统中,程序如果有10%是串行化的,那么最多可以加速1/(0.1+(1-0.1)/10)=5.3倍,换句话说CPU的利用率只用5.3/10=53%。而如果处理器增加到100倍,那么加速比为9.2倍,也就是说CPU的利用率只有个9.3%。

显然增加CPU的数量并不能提高CPU的利用率。下图描述的是随着CPU的数量增加,不同串行化比重的系统的加速比。

很显然,串行比重越大,增加CPU资源的效果越不明显。

性能提升

性能的提升可以从以下几个方面入手。

系统平台的资源利用率

一个程序对系统平台的资源利用率是指某一个设备繁忙且服务于此程序的时间占所有时间的比率。从物理学的角度讲类似于有用功的比率。简单的说就是:资源利用率=有效繁忙时间/总耗费时间。

也就说尽可能的让设备做有用的功,同时榨取其最大值。无用的循环可能会导致CPU 100%的使用率,但不一定是有效的工作。有效性通常难以衡量,通常只能以主观来评估,或者通过被优化的程序的行为来判断是否提高了有效性。

延迟

延迟描述的是完成任务所耗费的时间。延迟有时候也成为响应时间。如果有多个并行的操作,那么延迟取决于耗费时间最大的任务。

多处理

多处理是指在单一系统上同时执行多个进程或者多个程序的能力。多处理能力的好处是可以提高吞吐量。多处理可以有效利用多核CPU的资源。

多线程

多线程描述的是同一个地址空间内同时执行多个线程的过程。这些线程都有不同的执行路径和不同的栈结构。我们说的并发性更多的是指针对线程。

并发性

同时执行多个程序或者任务称之为并发。单程序内的多任务处理或者多程序间的多任务处理都认为是并发。

吞吐量

吞吐量衡量系统在单位之间内可以完成的工作总量。对于硬件系统而言,吞吐量是物理介质的上限。在没有达到物理介质之前,提高系统的吞吐量也可以大幅度改进性能。同时吞吐量也是衡量性能的一个指标。

瓶颈

程序运行过程中性能最差的地方。通常而言,串行的IO、磁盘IO、内存单元分配、网络IO等都可能造成瓶颈。某些使用太频繁的算法也有可能成为瓶颈。

可扩展性

这里的可扩展性主要是指程序或系统通过增加可使用的资源而增加性能的能力。

线程开销

假设引入的多线程都用于计算,那么性能一定会有很大的提升么? 其实引入多线程以后也会引入更多的开销。

切换上下文

如果可运行的线程数大于CPU的内核数,那么OS会根据一定的调度算法,强行切换正在运行的线程,从而使其它线程能够使用CPU周期。

切换线程会导致上下文切换。线程的调度会导致CPU需要在操作系统和进程间花费更多的时间片段,这样真正执行应用程序的时间就减少了。另外上下文切换也会导致缓存的频繁进出,对于一个刚被切换的线程来说,可能由于高速缓冲中没有数据而变得更慢,从而导致更多的IO开销。

内存同步

不同线程间要进行数据同步,synchronized以及volatile提供的可见性都会导致缓存失效。线程栈之间的数据要和主存进行同步,这些同步有一些小小的开销。如果线程间同时要进行数据同步,那么这些同步的线程可能都会受阻。

阻塞

当发生锁竞争时,失败的线程会导致阻塞。通常阻塞的线程可能在JVM内部进行自旋等待,或者被操作系统挂起。自旋等待可能会导致更多的CPU切片浪费,而操作系统挂起则会导致更多的上下文切换。

了解了性能的提升的几个方面,也了解性能的开销后,应用程序就要根据实际的场景进行取舍和评估。没有一劳永逸的优化方案,不断的进行小范围改进和调整是提高性能的有效手段。当前一些大的架构调整也会导致较大的性能的提升。

简单的原则是在保证逻辑正确的情况小,找到性能瓶颈,小步改进和优化。

参考资料

深入浅出 Java Concurrency (3)

Posted on

深入浅出 Java Concurrency (3): 锁机制

前面的章节主要谈谈原子操作,至于与原子操作一些相关的问题或者说陷阱就放到最后的总结篇来整体说明。从这一章开始花少量的篇幅谈谈锁机制。

上一个章节中谈到了锁机制,并且针对于原子操作谈了一些相关的概念和设计思想。接下来的文章中,尽可能的深入研究锁机制,并且理解里面的原理和实际应用场合。

尽管synchronized在语法上已经足够简单了,在JDK 5之前只能借助此实现,但是由于是独占锁,性能却不高,因此JDK 5以后就开始借助于JNI来完成更高级的锁实现。

JDK 5中的锁是接口java.util.concurrent.locks.Lock。另外java.util.concurrent.locks.ReadWriteLock提供了一对可供读写并发的锁。根据前面的规则,我们从java.util.concurrent.locks.Lock的API开始。

void lock();

获取锁。

如果锁不可用,出于线程调度目的,将禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态。

void lockInterruptibly() throws InterruptedException;

如果当前线程未被中断,则获取锁。

如果锁可用,则获取锁,并立即返回。

如果锁不可用,出于线程调度目的,将禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态:

  • 锁由当前线程获得;或者
  • 其他某个线程中断)当前线程,并且支持对锁获取的中断。

如果当前线程:

  • 在进入此方法时已经设置了该线程的中断状态;或者
  • 在获取锁时被中断),并且支持对锁获取的中断, 则将抛出

InterruptedException ,并清除当前线程的已中断状态。

Condition newCondition();

返回绑定到此

Lock 实例的新

Condition 实例。下一小节中会重点谈Condition,此处不做过多的介绍。

boolean tryLock();

仅在调用时锁为空闲状态才获取该锁。

如果锁可用,则获取锁,并立即返回值

true 。如果锁不可用,则此方法将立即返回值

false 。

通常对于那些不是必须获取锁的操作可能有用。

boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。

如果锁可用,则此方法将立即返回值

true 。如果锁不可用,出于线程调度目的,将禁用当前线程,并且在发生以下三种情况之一前,该线程将一直处于休眠状态:

  • 锁由当前线程获得;或者
  • 其他某个线程中断当前线程,并且支持对锁获取的中断;或者
  • 已超过指定的等待时间

如果获得了锁,则返回值

true 。

如果当前线程:

  • 在进入此方法时已经设置了该线程的中断状态;或者
  • 在获取锁时被中断,并且支持对锁获取的中断, 则将抛出

InterruptedException ,并会清除当前线程的已中断状态。

如果超过了指定的等待时间,则将返回值

false 。如果 time 小于等于 0,该方法将完全不等待。

void unlock();

释放锁。对应于lock()、tryLock()、tryLock(xx)、lockInterruptibly()等操作,如果成功的话应该对应着一个unlock(),这样可以避免死锁或者资源浪费。

相对于比较空洞的API,来看一个实际的例子。下面的代码实现了一个类似于AtomicInteger的操作。 package xylz.study.concurrency.lock;

import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;

public class AtomicIntegerWithLock {

private int value;

private Lock lock = new ReentrantLock();

public AtomicIntegerWithLock() {
    super();
}

public AtomicIntegerWithLock(int value) {
    this.value = value;
}

public final int get() {
    lock.lock();
    try {
        return value;
    } finally {
        lock.unlock();
    }
}

public final void set(int newValue) {
    lock.lock();
    try {
        value = newValue;
    } finally {
        lock.unlock();
    }

}

public final int getAndSet(int newValue) {
    lock.lock();
    try {
        int ret = value;
        value = newValue;
        return ret;
    } finally {
        lock.unlock();
    }
}

public final boolean compareAndSet(int expect, int update) {
    lock.lock();
    try {
        if (value == expect) {
            value = update;
            return true;
        }
        return false;
    } finally {
        lock.unlock();
    }
}

public final int getAndIncrement() {
    lock.lock();
    try {
        return value++;
    } finally {
        lock.unlock();
    }
}

public final int getAndDecrement() {
    lock.lock();
    try {
        return value--;
    } finally {
        lock.unlock();
    }
}

public final int incrementAndGet() {
    lock.lock();
    try {
        return ++value;
    } finally {
        lock.unlock();
    }
}

public final int decrementAndGet() {
    lock.lock();
    try {
        return --value;
    } finally {
        lock.unlock();
    }
}

public String toString() {
    return Integer.toString(get());
}

}

AtomicIntegerWithLock是线程安全的,此结构中大量使用了Lock对象的lock/unlock方法对。同样可以看到的是对于自增和自减操作使用了++/--。之所以能够保证线程安全,是因为Lock对象的lock()方法保证了只有一个线程能够只有此锁。需要说明的是对于任何一个lock()方法,都需要一个unlock()方法与之对于,通常情况下为了保证unlock方法总是能够得到执行,unlock方法被置于finally块中。另外这里使用了java.util.concurrent.locks.ReentrantLock.ReentrantLock对象,下一个小节中会具体描述此类作为Lock的唯一实现是如何设计和实现的。

尽管synchronized实现Lock的相同语义,并且在语法上比Lock要简单多,但是前者却比后者的开销要大得多。做一个简单的测试。 public static void main(String[] args) throws Exception{ final int max = 10; final int loopCount = 100000; long costTime = 0; for (int m = 0; m < max; m++) { long start1 = System.nanoTime(); final AtomicIntegerWithLock value1 = new AtomicIntegerWithLock(0); Thread[] ts = new Thread[max]; for(int i=0;i<max;i++) { ts[i] = new Thread() { public void run() { for (int i = 0; i < loopCount; i++) { value1.incrementAndGet(); } } }; } for(Thread t:ts) { t.start(); } for(Thread t:ts) { t.join(); } long end1 = System.nanoTime(); costTime += (end1-start1); } System.out.println("cost1: " + (costTime)); // System.out.println(); costTime = 0; // final Object lock = new Object(); for (int m = 0; m < max; m++) { staticValue=0; long start1 = System.nanoTime(); Thread[] ts = new Thread[max]; for(int i=0;i<max;i++) { ts[i] = new Thread() { public void run() { for (int i = 0; i < loopCount; i++) { synchronized(lock) { ++staticValue; } } } }; } for(Thread t:ts) { t.start(); } for(Thread t:ts) { t.join(); } long end1 = System.nanoTime(); costTime += (end1-start1); } // System.out.println("cost2: " + (costTime)); }

static int staticValue = 0;

在这个例子中每次启动10个线程,每个线程计算100000次自增操作,重复测试10次,下面是某此测试的结果:

cost1: 624071136

cost2: 2057847833

尽管上面的例子不是非常正式的测试案例,但上面的例子在于说明,Lock的性能比synchronized的要好得多。如果可以的话总是使用Lock替代synchronized是一个明智的选择。

来源: [http://www.blogjava.net/xylz/archive/2010/07/05/325274.html](http://www.blogjava.net/xylz/archive/2010/07/05/325274.html)

在理解J.U.C原理以及锁机制之前,我们来介绍J.U.C框架最核心也是最复杂的一个基础类:java.util.concurrent.locks.AbstractQueuedSynchronizer

AQS

AbstractQueuedSynchronizer,简称AQS,是J.U.C最复杂的一个类,导致绝大多数讲解并发原理或者实战的时候都不会提到此类。但是虚心的作者愿意借助自己有限的能力和精力来探讨一二(参考资源中也有一些作者做了部分的分析。)。

首先从理论知识开始,在了解了相关原理后会针对源码进行一些分析,最后加上一些实战来描述。

image

上面的继承体系中,AbstractQueuedSynchronizer是CountDownLatch/FutureTask/ReentrantLock/RenntrantReadWriteLock/Semaphore的基础,因此AbstractQueuedSynchronizer是Lock/Executor实现的前提。公平锁、不公平锁、Condition、CountDownLatch、Semaphore等放到后面的篇幅中说明。

完整的设计原理可以参考Doug Lea的论文 The java.util.concurrent Synchronizer Framework ,这里做一些简要的分析。

基本的思想是表现为一个同步器,支持下面两个操作:

获取锁:首先判断当前状态是否允许获取锁,如果是就获取锁,否则就阻塞操作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败。另外如果是阻塞线程,那么线程就需要进入阻塞队列。当状态位允许获取锁时就修改状态,并且如果进了队列就从队列中移除。 while(synchronization state does not allow acquire){

enqueue current thread if not already queued;

possibly block current thread;

}

dequeue current thread if it was queued;

释放锁:这个过程就是修改状态位,如果有线程因为状态位阻塞的话就唤醒队列中的一个或者更多线程。

update synchronization state;

if(state may permit a blocked thread to acquire)

unlock one or more queued threads;

要支持上面两个操作就必须有下面的条件:

  • 原子性操作同步器的状态位
  • 阻塞和唤醒线程
  • 一个有序的队列

目标明确,要解决的问题也清晰了,那么剩下的就是解决上面三个问题。

状态位的原子操作

这里使用一个32位的整数来描述状态位,前面章节的原子操作的理论知识整好派上用场,在这里依然使用CAS操作来解决这个问题。事实上这里还有一个64位版本的同步器(AbstractQueuedLongSynchronizer),这里暂且不谈。

阻塞和唤醒线程

标准的JAVA API里面是无法挂起(阻塞)一个线程,然后在将来某个时刻再唤醒它的。JDK 1.0的API里面有Thread.suspend和Thread.resume,并且一直延续了下来。但是这些都是过时的API,而且也是不推荐的做法。

在JDK 5.0以后利用JNI在LockSupport类中实现了此特性。 LockSupport.park() LockSupport.park(Object) LockSupport.parkNanos(Object, long) LockSupport.parkNanos(long) LockSupport.parkUntil(Object, long) LockSupport.parkUntil(long) LockSupport.unpark(Thread)

上面的API中park()是在当前线程中调用,导致线程阻塞,带参数的Object是挂起的对象,这样监视的时候就能够知道此线程是因为什么资源而阻塞的。由于park()立即返回,所以通常情况下需要在循环中去检测竞争资源来决定是否进行下一次阻塞。park()返回的原因有三:

  • 其他某个线程调用将当前线程作为目标调用 unpark );
  • 其他某个线程中断)当前线程;
  • 该调用不合逻辑地(即毫无理由地)返回。

其实第三条就决定了需要循环检测了,类似于通常写的while(checkCondition()){Thread.sleep(time);}类似的功能。

有序队列

在AQS中采用CHL列表来解决有序的队列的问题。

imageAQS采用的CHL模型采用下面的算法完成FIFO的入队列和出队列过程。

对于入队列(enqueue):采用CAS操作,每次比较尾结点是否一致,然后插入的到尾结点中。 do {

    pred = tail;

}while ( !compareAndSet(pred,tail,node) );

对于出队列(dequeue):由于每一个节点也缓存了一个状态,决定是否出队列,因此当不满足条件时就需要自旋等待,一旦满足条件就将头结点设置为下一个节点。

while (pred.status != RELEASED) ;

head = node;

实际上这里自旋等待也是使用LockSupport.park()来实现的。

AQS里面有三个核心字段: private volatile int state;

private transient volatile Node head;

private transient volatile Node tail;

其中state描述的有多少个线程取得了锁,对于互斥锁来说state<=1。head/tail加上CAS操作就构成了一个CHL的FIFO队列。下面是Node节点的属性。

volatile int waitStatus; 节点的等待状态,一个节点可能位于以下几种状态:

  • CANCELLED = 1: 节点操作因为超时或者对应的线程被interrupt。节点不应该留在此状态,一旦达到此状态将从CHL队列中踢出。
  • SIGNAL = -1: 节点的继任节点是(或者将要成为)BLOCKED状态(例如通过LockSupport.park()操作),因此一个节点一旦被释放(解锁)或者取消就需要唤醒(LockSupport.unpack())它的继任节点。
  • CONDITION = -2:表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。
  • 0: 正常状态,新生的非CONDITION节点都是此状态。
  • 非负值标识节点不需要被通知(唤醒)。

volatile Node prev;此节点的前一个节点。节点的waitStatus依赖于前一个节点的状态。

volatile Node next;此节点的后一个节点。后一个节点是否被唤醒(uppark())依赖于当前节点是否被释放。

volatile Thread thread;节点绑定的线程。

Node nextWaiter;下一个等待条件(Condition)的节点,由于Condition是独占模式,因此这里有一个简单的队列来描述Condition上的线程节点。

AQS 在J.U.C里面是一个非常核心的工具,而且也非常复杂,里面考虑到了非常多的逻辑实现,所以在后面的章节中总是不断的尝试介绍AQS的特性和实现。

这一个小节主要介绍了一些理论背景和相关的数据结构,在下一个小节中将根据以上知识来了解Lock.lock/unlock是如何实现的。

参考资料:

(1)ReentrantLock代码剖析之ReentrantLock.lock ReentrantLock代码剖析之ReentrantLock.unlock ReentrantLock代码剖析之ReentrantLock.lockInterruptibly

(2)java多线程--java.util.concurrent.locks.AbstractQueuedSynchronizer解析(只包含多线程同步示例)

(3)处理 InterruptedException

(4)AbstractQueuedSynchronizer源码解析之ReentrantLock(一) AbstractQueuedSynchronizer源码解析之ReentrantLock(二)

(5)The java.util.concurrent Synchronizer Framework 来源: [http://www.blogjava.net/xylz/archive/2010/07/06/325390.html](http://www.blogjava.net/xylz/archive/2010/07/06/325390.html)

接上篇,这篇从Lock.lock/unlock开始。特别说明在没有特殊情况下所有程序、API、文档都是基于JDK 6.0的。

public void java.util.concurrent.locks.ReentrantLock.lock() 获取锁。

如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。

如果当前线程已经保持该锁,则将保持计数加 1,并且该方法立即返回。

如果该锁被另一个线程保持,则出于线程调度的目的,禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态,此时锁保持计数被设置为 1。

从上面的文档可以看出ReentrantLock是可重入锁的实现。而内部是委托java.util.concurrent.locks.ReentrantLock.Sync.lock()实现的。java.util.concurrent.locks.ReentrantLock.Sync是抽象类,有java.util.concurrent.locks.ReentrantLock.FairSync和java.util.concurrent.locks.ReentrantLock.NonfairSync两个实现,也就是常说的公平锁和不公平锁。

公平锁和非公平锁 如果获取一个锁是按照请求的顺序得到的,那么就是公平锁,否则就是非公平锁。

在没有深入了解内部机制及实现之前,先了解下为什么会存在公平锁和非公平锁。公平锁保证一个阻塞的线程最终能够获得锁,因为是有序的,所以总是可以按照请求的顺序获得锁。不公平锁意味着后请求锁的线程可能在其前面排列的休眠线程恢复前拿到锁,这样就有可能提高并发的性能。这是因为通常情况下挂起的线程重新开始与它真正开始运行,二者之间会产生严重的延时。因此非公平锁就可以利用这段时间完成操作。这是非公平锁在某些时候比公平锁性能要好的原因之一。

二者在实现上的区别会在后面介绍,我们先从公平锁(FairSync)开始。

前面说过java.util.concurrent.locks.AbstractQueuedSynchronizer (AQS)是Lock的基础,对于一个FairSync而言,lock()就直接调用AQS的acquire(int arg); public final void acquire(int arg) 以独占模式获取对象,忽略中断。通过至少调用一次 tryAcquire(int) ) 来实现此方法,并在成功时返回。否则在成功之前,一直调用 tryAcquire(int) ) 将线程加入队列,线程可能重复被阻塞或不被阻塞。

在介绍实现之前先要补充上一节的知识,对于一个AQS的实现而言,通常情况下需要实现以下方法来描述如何锁定线程。

  • tryAcquire(int) 试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。

此方法总是由执行 acquire 的线程来调用。如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。也就是说此方法是一种尝试性方法,如果成功获取锁那最好,如果没有成功也没有关系,直接返回false。

  • tryRelease(int) 试图设置状态来反映独占模式下的一个释放。 此方法总是由正在执行释放的线程调用。释放锁可能失败或者抛出异常,这个在后面会具体分析。
  • tryAcquireShared(int) 试图在共享模式下获取对象状态。
  • tryReleaseShared(int) 试图设置状态来反映共享模式下的一个释放。
  • isHeldExclusively() 如果对于当前(正调用的)线程,同步是以独占方式进行的,则返回 true 。

除了tryAcquire(int)外,其它方法会在后面具体介绍。首先对于ReentrantLock而言,不管是公平锁还是非公平锁,都是独占锁,也就是说同时能够有一个线程持有锁。因此对于acquire(int arg)而言,arg==1。在AQS中acquire的实现如下:

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

这个看起来比较复杂,我们分解以下4个步骤。

  1. 如果tryAcquire(arg)成功,那就没有问题,已经拿到锁,整个lock()过程就结束了。如果失败进行操作2。
  2. 创建一个独占节点(Node)并且此节点加入CHL队列末尾。进行操作3。
  3. 自旋尝试获取锁,失败根据前一个节点来决定是否挂起(park()),直到成功获取到锁。进行操作4。
  4. 如果当前线程已经中断过,那么就中断当前线程(清除中断位)。

这是一个比较复杂的过程,我们按部就班一个一个分析。

tryAcquire(acquires)

对于公平锁而言,它的实现方式如下: protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (isFirst(current) && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }

在这段代码中,前面说明对于AQS存在一个state来描述当前有多少线程持有锁。由于AQS支持共享锁(例如读写锁,后面会继续讲),所以这里state>=0,但是由于ReentrantLock是独占锁,所以这里不妨理解为0<=state,acquires=1。isFirst(current)是一个很复杂的逻辑,包括踢出无用的节点等复杂过程,这里暂且不提,大体上的意思是说判断AQS是否为空或者当前线程是否在队列头(为了区分公平与非公平锁)。

  1. 如果当前锁有其它线程持有,c!=0,进行操作2。否则,如果当前线程在AQS队列头部,则尝试将AQS状态state设为acquires(等于1),成功后将AQS独占线程设为当前线程返回true,否则进行2。这里可以看到compareAndSetState就是使用了CAS操作。
  2. 判断当前线程与AQS的独占线程是否相同,如果相同,那么就将当前状态位加1(这里+1后结果为负数后面会讲,这里暂且不理它),修改状态位,返回true,否则进行3。这里之所以不是将当前状态位设置为1,而是修改为旧值+1呢?这是因为ReentrantLock是可重入锁,同一个线程每持有一次就+1。
  3. 返回false。

比较非公平锁的tryAcquire实现java.util.concurrent.locks.ReentrantLock.Sync.nonfairTryAcquire(int),公平锁多了一个判断当前节点是否在队列头,这个就保证了是否按照请求锁的顺序来决定获取锁的顺序(同一个线程的多次获取锁除外)。

现在再回头看公平锁和非公平锁的lock()方法。公平锁只有一句acquire(1);而非公平锁的调用如下: final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }

很显然,非公平锁在第一次获取锁,或者其它线程释放锁后(可能等待),优先采用compareAndSetState(0,1)然后设置AQS独占线程而持有锁,这样有时候比acquire(1)顺序检查锁持有而要高效。即使在重入锁上,也就是compareAndSetState(0,1)失败,但是是当前线程持有锁上,非公平锁也没有问题。

addWaiter(mode)

tryAcquire失败就意味着入队列了。此时AQS的队列中节点Node就开始发挥作用了。一般情况下AQS支持独占锁和共享锁,而独占锁在Node中就意味着条件(Condition)队列为空(上一篇中介绍过相关概念)。在java.util.concurrent.locks.AbstractQueuedSynchronizer.Node中有两个常量, static final Node EXCLUSIVE = null; //独占节点模式

static final Node SHARED = new Node(); //共享节点模式

addWaiter(mode)中的mode就是节点模式,也就是共享锁还是独占锁模式。

前面一再强调ReentrantLock是独占锁模式。 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

上面是节点如队列的一部分。当前仅当队列不为空并且将新节点插入尾部成功后直接返回新节点。否则进入enq(Node)进行操作。

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize Node h = new Node(); // Dummy header h.next = node; node.prev = h; if (compareAndSetHead(h)) { tail = node; return h; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

enq(Node)去队列操作实现了CHL队列的算法,如果为空就创建头结点,然后同时比较节点尾部是否是改变来决定CAS操作是否成功,当且仅当成功后才将为不节点的下一个节点指向为新节点。可以看到这里仍然是CAS操作。

acquireQueued(node,arg)

自旋请求锁,如果可能的话挂起线程,直到得到锁,返回当前线程是否中断过(如果park()过并且中断过的话有一个interrupted中断位)。 final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }

下面的分析就需要用到上节节点的状态描述了。acquireQueued过程是这样的:

  1. 如果当前节点是AQS队列的头结点(如果第一个节点是DUMP节点也就是傀儡节点,那么第二个节点实际上就是头结点了),就尝试在此获取锁tryAcquire(arg)。如果成功就将头结点设置为当前节点(不管第一个结点是否是DUMP节点),返回中断位。否则进行2。
  2. 检测当前节点是否应该park(),如果应该park()就挂起当前线程并且返回当前线程中断位。进行操作1。

一个节点是否该park()是关键,这是由方法java.util.concurrent.locks.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node, Node)实现的。 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int s = pred.waitStatus; if (s < 0) return true; if (s > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else compareAndSetWaitStatus(pred, 0, Node.SIGNAL); return false; }

  1. 如果前一个节点的等待状态waitStatus<0,也就是前面的节点还没有获得到锁,那么返回true,表示当前节点(线程)就应该park()了。否则进行2。
  2. 如果前一个节点的等待状态waitStatus>0,也就是前一个节点被CANCELLED了,那么就将前一个节点去掉,递归此操作直到所有前一个节点的waitStatus<=0,进行4。否则进行3。
  3. 前一个节点等待状态waitStatus=0,修改前一个节点状态位为SINGAL,表示后面有节点等待你处理,需要根据它的等待状态来决定是否该park()。进行4。
  4. 返回false,表示线程不应该park()。

selfInterrupt() private static void selfInterrupt() { Thread.currentThread().interrupt(); }

如果线程曾经中断过(或者阻塞过)(比如手动interrupt()或者超时等等,那么就再中断一次,中断两次的意思就是清除中断位)。

大体上整个Lock.lock()就这样一个流程。除了lock()方法外,还有lockInterruptibly()/tryLock()/unlock()/newCondition()等,在接下来的章节中会一一介绍。 来源: [http://www.blogjava.net/xylz/archive/2010/07/07/325410.html](http://www.blogjava.net/xylz/archive/2010/07/07/325410.html)

本小节介绍锁释放Lock.unlock()。

Release/TryRelease

unlock操作实际上就调用了AQS的release操作,释放持有的锁。 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

前面提到过tryRelease(arg)操作,此操作里面总是尝试去释放锁,如果成功,说明锁确实被当前线程持有,那么就看AQS队列中的头结点是否为空并且能否被唤醒,如果可以的话就唤醒继任节点(下一个非CANCELLED节点,下面会具体分析)。

对于独占锁而言,java.util.concurrent.locks.ReentrantLock.Sync.tryRelease(int)展示了如何尝试释放锁(tryRelease)操作。 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

整个tryRelease操作是这样的:

  1. 判断持有锁的线程是否是当前线程,如果不是就抛出IllegalMonitorStateExeception(),因为一个线程是不能释放另一个线程持有的锁(否则锁就失去了意义)。否则进行2。
  2. 将AQS状态位减少要释放的次数(对于独占锁而言总是1),如果剩余的状态位0(也就是没有线程持有锁),那么当前线程就是最后一个持有锁的线程,清空AQS持有锁的独占线程。进行3。
  3. 将剩余的状态位写回AQS,如果没有线程持有锁就返回true,否则就是false。

参考上一节的分析就可以知道,这里c==0决定了是否完全释放了锁。由于ReentrantLock是可重入锁,因此同一个线程可能多重持有锁,那么当且仅当最后一个持有锁的线程释放锁是才能将AQS中持有锁的独占线程清空,这样接下来的操作才需要唤醒下一个需要锁的AQS节点(Node),否则就只是减少锁持有的计数器,并不能改变其他操作。

tryRelease操作成功后(也就是完全释放了锁),release操作才能检查是否需要唤醒下一个继任节点。这里的前提是AQS队列的头结点需要锁(waitStatus!=0),如果头结点需要锁,就开始检测下一个继任节点是否需要锁操作。

在上一节中说道acquireQueued操作完成后(拿到了锁),会将当前持有锁的节点设为头结点,所以一旦头结点释放锁,那么就需要寻找头结点的下一个需要锁的继任节点,并唤醒它。 private void unparkSuccessor(Node node) { //此时node是需要是需要释放锁的头结点

    //清空头结点的waitStatus,也就是不再需要锁了
    compareAndSetWaitStatus(node, Node.SIGNAL, 0);

    //从头结点的下一个节点开始寻找继任节点,当且仅当继任节点的waitStatus<=0才是有效继任节点,否则将这些waitStatus>0(也就是CANCELLED的节点)从AQS队列中剔除  
   //这里并没有从head->tail开始寻找,而是从tail->head寻找最后一个有效节点。
   //解释在这里 http://www.blogjava.net/xylz/archive/2010/07/08/325540.html/#377512

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }

    //如果找到一个有效的继任节点,就唤醒此节点线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

这里再一次把acquireQueued的过程找出来。对比unparkSuccessor,一旦头节点的继任节点被唤醒,那么继任节点就会尝试去获取锁(在acquireQueued中node就是有效的继任节点,p就是唤醒它的头结点),如果成功就会将头结点设置为自身,并且将头结点的前任节点清空,这样前任节点(已经过时了)就可以被GC释放了。

final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }

setHead中,将头结点的前任节点清空并且将头结点的线程清空就是为了更好的GC,防止内存泄露。

private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }

对比lock()操作,unlock()操作还是比较简单的,主要就是释放响应的资源,并且唤醒AQS队列中有效的继任节点。这样所就按照请求的顺序去尝试获取锁了。

整个lock()/unlock()过程完成了,我们再回头看公平锁(FairSync)和非公平锁(NonfairSync)。

公平锁和非公平锁只是在获取锁的时候有差别,其它都是一样的。 final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }

在上面非公平锁的代码中总是优先尝试当前是否有线程持有锁,一旦没有任何线程持有锁,那么非公平锁就霸道的尝试将锁“占为己有”。如果在抢占锁的时候失败就和公平锁一样老老实实的去排队。

也即是说公平锁和非公平锁只是在入AQSCLH队列之前有所差别,一旦进入了队列,所有线程都是按照队列中先来后到的顺序请求锁。

Condition

条件变量很大一个程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。

条件(也称为条件队列条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式 释放相关的锁,并挂起当前线程,就像

Object.wait 做的那样。

上述API说明表明条件变量需要与锁绑定,而且多个Condition需要绑定到同一锁上。前面的Lock中提到,获取一个条件变量的方法是Lock.newCondition()。 void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll();

以上是Condition接口定义的方法,await/**对应于Object.waitsignal对应于Object.notifysignalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll*方法。

每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。

一个使用Condition实现生产者消费者的模型例子如下。 package xylz.study.concurrency.lock;

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;

public class ProductQueue {

private final T[] items;

private final Lock lock = new ReentrantLock();

private Condition notFull = lock.newCondition();

private Condition notEmpty = lock.newCondition();

//
private int head, tail, count;

public ProductQueue(int maxSize) {
    items = (T[]) new Object[maxSize];
}

public ProductQueue() {
    this(10);
}

public void put(T t) throws InterruptedException {
    lock.lock();
    try {
        while (count == getCapacity()) {
            notFull.await();
        }
        items[tail] = t;
        if (++tail == getCapacity()) {
            tail = 0;
        }
        ++count;
        notEmpty.signalAll();
    } finally {
        lock.unlock();
    }
}

public T take() throws InterruptedException {
    lock.lock();
    try {
        while (count == 0) {
            notEmpty.await();
        }
        T ret = items[head];
        items[head] = null;//GC
        //
        if (++head == getCapacity()) {
            head = 0;
        }
        --count;
        notFull.signalAll();
        return ret;
    } finally {
        lock.unlock();
    }
}

public int getCapacity() {
    return items.length;
}

public int size() {
    lock.lock();
    try {
        return count;
    } finally {
        lock.unlock();
    }
}

}

在这个例子中消费take()需要 队列不为空,如果为空就挂起(await()),直到收到notEmpty的信号;生产put()需要队列不满,如果满了就挂起(await()),直到收到notFull的信号。

可能有人会问题,如果一个线程lock()对象后被挂起还没有unlock,那么另外一个线程就拿不到锁了(lock()操作会挂起),那么就无法通知(notify)前一个线程,这样岂不是“死锁”了?

await/* 操作

上一节中说过多次ReentrantLock是独占锁,一个线程拿到锁后如果不释放,那么另外一个线程肯定是拿不到锁,所以在lock.lock()lock.unlock()之间可能有一次释放锁的操作(同样也必然还有一次获取锁的操作)。我们再回头看代码,不管take()还是put(),在进入lock.lock()后唯一可能释放锁的操作就是await()了。也就是说await()操作实际上就是释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁! public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

上面是await()的代码片段。上一节中说过,AQS在获取锁的时候需要有一个CHL的FIFO队列,所以对于一个Condition.await()而言,如果释放了锁,要想再一次获取锁那么就需要进入队列,等待被通知获取锁。完整的await()操作是安装如下步骤进行的:

  1. 将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。后面会具体谈到此结构。进行2。
  2. 释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。进行3。
  3. 自旋(while)挂起,直到被唤醒或者超时或者CACELLED等。进行4。
  4. 获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。

这里再回头介绍Condition的数据结构。我们知道一个Condition可以在多个地方被await/(),那么就需要一个FIFO的结构将这些Condition串联起来,然后根据需要唤醒一个或者多个(通常是所有)。所以在Condition*内部就需要一个FIFO的队列。 private transient Node firstWaiter; private transient Node lastWaiter;

上面的两个节点就是描述一个FIFO的队列。我们再结合前面提到的节点(Node)数据结构。我们就发现Node.nextWaiter就派上用场了!nextWaiter就是将一系列的Condition.await/*串联起来组成一个FIFO的队列。

signal/signalAll 操作

await/()清楚了,现在再来看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要将Condition.await/()中FIFO队列中第一个Node唤醒(或者全部Node)唤醒。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }

private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }

上面的代码很容易看出来,signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程。当然了遇到CANCELLED线程就需要将其从FIFO队列中剔除。

final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;

Node p = enq(node);
int c = p.waitStatus;
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
    LockSupport.unpark(node.thread);
return true;

}

上面就是唤醒一个await/()线程的过程,根据前面的小节介绍的,如果要unpark线程,并使线程拿到锁,那么就需要线程节点进入AQS的队列。所以可以看到在LockSupport.unpark之前调用了enq(node)操作,将当前节点加入到*AQS队列。

整个锁机制的原理就介绍完了,从下一节开始就进入了锁机制的应用了。

此小节介绍几个与锁有关的有用工具。

闭锁(Latch)

闭锁(Latch):一种同步方法,可以延迟线程的进度直到线程到达某个终点状态。通俗的讲就是,一个闭锁相当于一扇大门,在大门打开之前所有线程都被阻断,一旦大门打开所有线程都将通过,但是一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态。也就是说闭锁的状态是一次性的,它确保在闭锁打开之前所有特定的活动都需要在闭锁打开之后才能完成。

CountDownLatch是JDK 5+里面闭锁的一个实现,允许一个或者多个线程等待某个事件的发生。CountDownLatch有一个正数计数器,countDown方法对计数器做减操作,await方法等待计数器达到0。所有await的线程都会阻塞直到计数器为0或者等待线程中断或者超时。

CountDownLatch的API如下。

  • public void await() throws InterruptedException
  • public boolean await(long timeout, TimeUnit unit) throws InterruptedException
  • public void countDown()
  • public long getCount()

其中getCount()描述的是当前计数,通常用于调试目的。

下面的例子中描述了闭锁的两种常见的用法。 package xylz.study.concurrency.lock;

import java.util.concurrent.CountDownLatch;

public class PerformanceTestTool {

public long timecost(final int times, final Runnable task) throws InterruptedException {
    if (times <= 0) throw new IllegalArgumentException();
    final CountDownLatch startLatch = new CountDownLatch(1);
    final CountDownLatch overLatch = new CountDownLatch(times);
    for (int i = 0; i < times; i++) {
        new Thread(new Runnable() {
            public void run() {
                try {
                    startLatch.await();
                    //
                    task.run();
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                } finally {
                    overLatch.countDown();
                }
            }
        }).start();
    }
    //
    long start = System.nanoTime();
    startLatch.countDown();
    overLatch.await();
    return System.nanoTime() - start;
}

}

在上面的例子中使用了两个闭锁,第一个闭锁确保在所有线程开始执行任务前,所有准备工作都已经完成,一旦准备工作完成了就调用startLatch.countDown()打开闭锁,所有线程开始执行。第二个闭锁在于确保所有任务执行完成后主线程才能继续进行,这样保证了主线程等待所有任务线程执行完成后才能得到需要的结果。在第二个闭锁当中,初始化了一个N次的计数器,每个任务执行完成后都会将计数器减一,所有任务完成后计数器就变为了0,这样主线程闭锁overLatch拿到此信号后就可以继续往下执行了。

根据前面的happend-before法则可以知道闭锁有以下特性: **内存一致性效果:线程中调用

countDown() 之前的操作 happen-before** 紧跟在从另一个线程中对应

await() 成功返回的操作。**

在上面的例子中第二个闭锁相当于把一个任务拆分成N份,每一份独立完成任务,主线程等待所有任务完成后才能继续执行。这个特性在后面的线程池框架中会用到,其实FutureTask就可以看成一个闭锁。后面的章节还会具体分析FutureTask的。

同样基于探索精神,仍然需要“窥探”下CountDownLatch里面到底是如何实现await/**和countDown*的。

首先,研究下await()方法。内部直接调用了AQSacquireSharedInterruptibly(1)。 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }

前面一直提到的都是独占锁(排它锁、互斥锁),现在就用到了另外一种锁,共享锁。

所谓共享锁是说所有共享锁的线程共享同一个资源,一旦任意一个线程拿到共享资源,那么所有线程就都拥有的同一份资源。也就是通常情况下共享锁只是一个标志,所有线程都等待这个标识是否满足,一旦满足所有线程都被激活(相当于所有线程都拿到锁一样)。这里的闭锁CountDownLatch就是基于共享锁的实现。

闭锁中关于AQStryAcquireShared的实现是如下代码(java.util.concurrent.CountDownLatch.Sync.tryAcquireShared): public int tryAcquireShared(int acquires) { return getState() == 0? 1 : -1; }

在这份逻辑中,对于闭锁而言第一次await时tryAcquireShared应该总是-1,因为对于闭锁CountDownLatch而言state的值就是初始化的count值。这也就解释了为什么在countDown调用之前闭锁的count总是>0。

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); }

上面的逻辑展示了如何通过await将所有线程串联并挂起,直到被唤醒或者条件满足或者被中断。整个过程是这样的:

  1. 将当前线程节点以共享模式加入AQSCLH队列中(相关概念参考这里这里)。进行2。
  2. 检查当前节点的前任节点,如果是头结点并且当前闭锁计数为0就将当前节点设置为头结点,唤醒继任节点,返回(结束线程阻塞)。否则进行3。
  3. 检查线程是否该阻塞,如果应该就阻塞(park),直到被唤醒(unpark)。重复2。
  4. 如果2、3有异常就抛出异常(结束线程阻塞)。

这里有一点值得说明下,设置头结点并唤醒继任节点setHeadAndPropagate。由于前面tryAcquireShared总是返回1或者-1,而进入setHeadAndPropagate时总是propagate>=0,所以这里propagate==1。后面唤醒继任节点操作就非常熟悉了。 private void setHeadAndPropagate(Node node, int propagate) { setHead(node); if (propagate > 0 && node.waitStatus != 0) { Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); } }

从上面的所有逻辑可以看出countDown应该就是在条件满足(计数为0)时唤醒头结点(时间最长的一个节点),然后头结点就会根据FIFO队列唤醒整个节点列表(如果有的话)。

CountDownLatchcountDown代码中看到,直接调用的是AQSreleaseShared(1),参考前面的知识,这就印证了上面的说法。

tryReleaseShared中正是采用CAS操作减少计数(每次减-1)。 public boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }

整个CountDownLatch就是这个样子的。其实有了前面原子操作和AQS的原理及实现,分析CountDownLatch还是比较容易的。

来源: [http://www.blogjava.net/xylz/archive/2010/07/09/325612.html](http://www.blogjava.net/xylz/archive/2010/07/09/325612.html)

如果说CountDownLatch是一次性的,那么CyclicBarrier正好可以循环使用。它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。所谓屏障点就是一组任务执行完毕的时刻。

清单1 一个使用CyclicBarrier的例子 package xylz.study.concurrency.lock;

import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

final CyclicBarrier barrier;

final int MAX_TASK;

public CyclicBarrierDemo(int cnt) {
    barrier = new CyclicBarrier(cnt + 1);
    MAX_TASK = cnt;
}

public void doWork(final Runnable work) {
    new Thread() {

        public void run() {
            work.run();
            try {
                int index = barrier.await();
                doWithIndex(index);
            } catch (InterruptedException e) {
                return;
            } catch (BrokenBarrierException e) {
                return;
            }
        }
    }.start();
}

private void doWithIndex(int index) {
    if (index == MAX_TASK / 3) {
        System.out.println("Left 30%.");
    } else if (index == MAX_TASK / 2) {
        System.out.println("Left 50%");
    } else if (index == 0) {
        System.out.println("run over");
    }
}

public void waitForNext() {
    try {
        doWithIndex(barrier.await());
    } catch (InterruptedException e) {
        return;
    } catch (BrokenBarrierException e) {
        return;
    }
}

public static void main(String[] args) {
    final int count = 10;
    CyclicBarrierDemo demo = new CyclicBarrierDemo(count);
    for (int i = 0; i < 100; i++) {
        demo.doWork(new Runnable() {

            public void run() {
                //do something
                try {
                    Thread.sleep(1000L);
                } catch (Exception e) {
                    return;
                }
            }
        });
        if ((i + 1) % count == 0) {
            demo.waitForNext();
        }
    }
}

}

清单1描述的是一个周期性处理任务的例子,在这个例子中有一对的任务(100个),希望每10个为一组进行处理,当前仅当上一组任务处理完成后才能进行下一组,另外在每一组任务中,当任务剩下50%,30%以及所有任务执行完成时向观察者发出通知。

在这个例子中,CyclicBarrierDemo 构建了一个count+1的任务组(其中一个任务时为了外界方便挂起主线程)。每一个子任务里,人物本身执行完毕后都需要等待同组内其它任务执行完成后才能继续。同时在剩下任务50%、30%已经0时执行特殊的其他任务(发通知)。

很显然CyclicBarrier有以下几个特点:

  • await()方法将挂起线程,直到同组的其它线程执行完毕才能继续
  • await()方法返回线程执行完毕的索引,注意,索引时从任务数-1开始的,也就是第一个执行完成的任务索引为parties-1,最后一个为0,这个parties为总任务数,清单中是cnt+1
  • CyclicBarrier 是可循环的,显然名称说明了这点。在清单1中,每一组任务执行完毕就能够执行下一组任务。

另外除了CyclicBarrier除了以上特点外,还有以下几个特点:

  • 如果屏障操作不依赖于挂起的线程,那么任何线程都可以执行屏障操作。在清单1中可以看到并没有指定那个线程执行50%、30%、0%的操作,而是一组线程(cnt+1)个中任何一个线程只要到达了屏障点都可以执行相应的操作
  • CyclicBarrier 的构造函数允许携带一个任务,这个任务将在0%屏障点执行,它将在await()==0后执行。
  • CyclicBarrier 如果在await时因为中断、失败、超时等原因提前离开了屏障点,那么任务组中的其他任务将立即被中断,以InterruptedException异常离开线程。
  • 所有await()之前的操作都将在屏障点之前运行,也就是CyclicBarrier 的内存一致性效果

CyclicBarrier 的所有API如下:

  • public CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
  • public CyclicBarrier(int parties, Runnable barrierAction) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
  • public int await() throws InterruptedException, BrokenBarrierException 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
  • public int await(long timeout,TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException 在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
  • public int getNumberWaiting() 返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。
  • public int getParties() 返回要求启动此 barrier 的参与者数目。
  • public boolean isBroken() 查询此屏障是否处于损坏状态。
  • public void reset() 将屏障重置为其初始状态。

针对以上API,下面来探讨下CyclicBarrier 的实现原理,以及为什么有这样的API。

清单2 CyclicBarrier.await/()的实现片段* private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException();

    if (Thread.interrupted()) {
        breakBarrier();
        throw new InterruptedException();
    }

   int index = --count;
   if (index == 0) {  // tripped
       boolean ranAction = false;
       try {
           final Runnable command = barrierCommand;
           if (command != null)
               command.run();
           ranAction = true;
           nextGeneration();
           return 0;
       } finally {
           if (!ranAction)
               breakBarrier();
       }
   }

    // loop until tripped, broken, interrupted, or timed out
    for (;;) {
        try {
            if (!timed)
                trip.await();
            else if (nanos > 0L)
                nanos = trip.awaitNanos(nanos);
        } catch (InterruptedException ie) {
            if (g == generation && ! g.broken) {
                breakBarrier();
                throw ie;
            } else {
                Thread.currentThread().interrupt();
            }
        }

        if (g.broken)
            throw new BrokenBarrierException();

        if (g != generation)
            return index;

        if (timed && nanos <= 0L) {
            breakBarrier();
            throw new TimeoutException();
        }
    }
} finally {
    lock.unlock();
}

}

清单2有点复杂,这里一点一点的剖析,并且还原到最原始的状态。

利用前面学到的知识,我们知道要想让线程等待其他线程执行完毕,那么已经执行完毕的线程(进入await/*()方法)就需要park(),直到超时或者被中断,或者被其它线程唤醒。

前面说过CyclicBarrier 的特点是要么大家都正常执行完毕,要么大家都异常被中断,不会其中有一个被中断而其它正常执行完毕的现象存在。这种特点叫all-or-none。类似的概念是原子操作中的要么大家都执行完,要么一个操作都不执行完。当前这其实是两个概念了。要完成这样的特点就必须有一个状态来描述曾经是否有过线程被中断(broken)了,这样后面执行完的线程就该知道是否需要继续等待了。而在CyclicBarrier 中Generation 就是为了完成这件事情的。Generation的定义非常简单,整个结构就只有一个变量boolean broken = false;,定义是否发生了broken操作。

由于有竞争资源的存在(broken/index),所以毫无疑问需要一把锁lock。拿到锁后整个过程是这样的:

  1. 检查是否存在中断位(broken),如果存在就立即以BrokenBarrierException异常返回。此异常描述的是线程进入屏障被破坏的等待状态。否则进行2。
  2. 检查当前线程是否被中断,如果是那么就设置中断位(使其它将要进入等待的线程知道),另外唤醒已经等待的线程,同时以InterruptedException异常返回,表示线程要处理中断。否则进行3。
  3. 将剩余任务数减1,如果此时剩下的任务数为0,也就是达到了公共屏障点,那么就执行屏障点任务(如果有的话),同时创建新的Generation(在这个过程中会唤醒其它所有线程,因此当前线程是屏障点线程,那么其它线程就都应该在等待状态)。否则进行4。
  4. 到这里说明还没有到达屏障点,那么此时线程就应该park()。很显然在下面的for循环中就是要park线程。这里park线程采用的是Condition.await()方法。也就是trip.await/()。为什么需要Condition?因为所有的await/()其实等待的都是一个条件,一旦条件满足就应该都被唤醒,所以Condition整好满足这个特点。所以到这里就会明白为什么在步骤3中到达屏障点时创建新的Generation的时候是一定要唤醒其它线程的原因了。

上面4个步骤其实只是描述主体结构,事实上整个过程中有非常多的逻辑来处理异常引发的问题,比如执行屏障点任务引发的异常,park线程超时引发的中断异常和超时异常等等。所以对于await()而言,异常的处理比业务逻辑的处理更复杂,这就解释了为什么await()的时候可能引发InterruptedException,BrokenBarrierException,TimeoutException 三种异常。

清单3 生成下一个循环周期并唤醒其它线程 private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }

清单3 描述了如何生成下一个循环周期的过程,在这个过程中当然需要使用Condition.signalAll()唤醒所有已经执行完成并且正在等待的线程。另外这里count描述的是还有多少线程需要执行,是为了线程执行完毕索引计数。

isBroken() 方法描述的就是generation.broken,也即线程组是否发生了异常。这里再一次解释下为什么要有这个状态的存在。

如果一个将要位于屏障点或者已经位于屏障点的而执行屏障点任务的线程发生了异常,那么即使唤醒了其它等待的线程,其它等待的线程也会因为循环等待而“死去”,因为再也没有一个线程来唤醒这些第二次进行park的线程了。还有一个意图是,如果屏障点都已经损坏了,那么其它将要等待屏障点的再线程挂起就没有意义了。 写到这里的时候非常不幸,用了4年多了台灯终于“寿终正寝了”。

其实CyclicBarrier 还有一个reset方法,描述的是手动立即将所有线程中断,恢复屏障点,进行下一组任务的执行。也就是与重新创建一个新的屏障点相比,可能维护的代价要小一些(减少同步,减少上一个CyclicBarrier 的管理等等)。

本来是想和Semaphore 一起将的,最后发现铺开后就有点长了,而且也不利于理解和吸收,所以放到下一篇吧。

参考资料:

  1. 使用 CyclicBarrier 做线程间同步
  2. CyclicBarrier And CountDownLatch Tutorial
  3. 线程—CyclicBarrier
  4. Java线程学习笔记(十)CountDownLatch 和CyclicBarrier
  5. 关于多线程同步的初步教程--Barrier的设计及使用
  6. Thread coordination with CountDownLatch and CyclicBarrier
  7. 如何充分利用多核CPU,计算很大的List中所有整数的和 来源: [http://www.blogjava.net/xylz/archive/2010/07/12/325913.html](http://www.blogjava.net/xylz/archive/2010/07/12/325913.html)

Semaphore 是一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个

acquire() ,然后再获取该许可。每个

release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,

Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

说白了,Semaphore是一个计数器,在计数器不为0的时候对线程就放行,一旦达到0,那么所有请求资源的新线程都会被阻塞,包括增加请求到许可的线程,也就是说Semaphore不是可重入的。每一次请求一个许可都会导致计数器减少1,同样每次释放一个许可都会导致计数器增加1,一旦达到了0,新的许可请求线程将被挂起。

缓存池整好使用此思想来实现的,比如链接池、对象池等。

清单1 对象池 package xylz.study.concurrency.lock;

import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;

public class ObjectCache {

public interface ObjectFactory<T> {

    T makeObject();
}

class Node {

    T obj;

    Node next;
}

final int capacity;

final ObjectFactory<T> factory;

final Lock lock = new ReentrantLock();

final Semaphore semaphore;

private Node head;

private Node tail;

public ObjectCache(int capacity, ObjectFactory<T> factory) {
    this.capacity = capacity;
    this.factory = factory;
    this.semaphore = new Semaphore(this.capacity);
    this.head = null;
    this.tail = null;
}

public T getObject() throws InterruptedException {
    semaphore.acquire();
    return getNextObject();
}

private T getNextObject() {
    lock.lock();
    try {
        if (head == null) {
            return factory.makeObject();
        } else {
            Node ret = head;
            head = head.next;
            if (head == null) tail = null;
            ret.next = null;//help GC
            return ret.obj;
        }
    } finally {
        lock.unlock();
    }
}

private void returnObjectToPool(T t) {
    lock.lock();
    try {
        Node node = new Node();
        node.obj = t;
        if (tail == null) {
            head = tail = node;
        } else {
            tail.next = node;
            tail = node;
        }

    } finally {
        lock.unlock();
    }
}

public void returnObject(T t) {
    returnObjectToPool(t);
    semaphore.release();
}

}

清单1描述了一个基于信号量Semaphore的对象池实现。此对象池最多支持capacity个对象,这在构造函数中传入。对象池有一个基于FIFO的队列,每次从对象池的头结点开始取对象,如果头结点为空就直接构造一个新的对象返回。否则将头结点对象取出,并且头结点往后移动。特别要说明的如果对象的个数用完了,那么新的线程将被阻塞,直到有对象被返回回来。返还对象时将对象加入FIFO的尾节点并且释放一个空闲的信号量,表示对象池中增加一个可用对象。

实际上对象池、线程池的原理大致上就是这样的,只不过真正的对象池、线程池要处理比较复杂的逻辑,所以实现起来还需要做很多的工作,例如超时机制,自动回收机制,对象的有效期等等问题。

这里特别说明的是信号量只是在信号不够的时候挂起线程,但是并不能保证信号量足够的时候获取对象和返还对象是线程安全的,所以在清单1中仍然需要锁Lock来保证并发的正确性。

将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多

Lock 实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。

上面这段话的意思是说当某个线程A持有信号量数为1的信号量时,其它线程只能等待此线程释放资源才能继续,这时候持有信号量的线程A就相当于持有了“锁”,其它线程的继续就需要这把锁,于是线程A的释放才能决定其它线程的运行,相当于扮演了“锁”的角色。

另外同公平锁非公平锁一样,信号量也有公平性。如果一个信号量是公平的表示线程在获取信号量时按FIFO的顺序得到许可,也就是按照请求的顺序得到释放。这里特别说明的是:所谓请求的顺序是指在请求信号量而进入FIFO队列的顺序,有可能某个线程先请求信号而后进去请求队列,那么次线程获取信号量的顺序就会晚于其后请求但是先进入请求队列的线程。这个在公平锁和非公平锁中谈过很多。

除了acquire以外,Semaphore还有几种类似的acquire方法,这些方法可以更好的处理中断和超时或者异步等特性,可以参考JDK API。

按照同样的学习原则,下面对主要的实现进行分析。Semaphore的acquire方法实际上访问的是AQSacquireSharedInterruptibly(arg)方法。这个可以参考CountDownLatch一节或者AQS一节。

所以Semaphore的await实现也是比较简单的。与CountDownLatch不同的是,Semaphore区分公平信号和非公平信号。

清单2 公平信号获取方法 protected int tryAcquireShared(int acquires) { Thread current = Thread.currentThread(); for (;;) { Thread first = getFirstQueuedThread(); if (first != null && first != current) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }

清单3 非公平信号获取方法

protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }

final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }

对比清单2和清单3可以看到,公平信号和非公平信号在于第一次尝试能否获取信号时,公平信号量总是将当前线程进入AQS的CLH队列进行排队(因为第一次尝试时队列的头结点线程很有可能不是当前线程,当然不排除同一个线程第二次进入信号量),从而根据AQS的CLH队列的顺序FIFO依次获取信号量;而对于非公平信号量,第一次立即尝试能否拿到信号量,一旦信号量的剩余数available大于请求数(acquires通常为1),那么线程就立即得到了释放,而不需要进行AQS队列进行排队。只有remaining<0的时候(也就是信号量不够的时候)才会进入AQS队列。

所以非公平信号量的吞吐量总是要比公平信号量的吞吐量要大,但是需要强调的是非公平信号量和非公平锁一样存在“饥渴死”的现象,也就是说活跃线程可能总是拿到信号量,而非活跃线程可能难以拿到信号量。而对于公平信号量由于总是靠请求的线程的顺序来获取信号量,所以不存在此问题。

参考资料:

  1. 信号量(Semaphore)在生产者和消费者模式的使用
  2. What is mutex and semaphore in Java ? What is the main difference ?
  3. 关于 java.util.concurrent 您不知道的 5 件事,第 2 部分
  4. Semahores

来源: [http://www.blogjava.net/xylz/archive/2010/07/13/326021.html](http://www.blogjava.net/xylz/archive/2010/07/13/326021.html)

从这一节开始介绍锁里面的最后一个工具:读写锁(ReadWriteLock)。

ReentrantLock 实现了标准的互斥操作,也就是一次只能有一个线程持有锁,也即所谓独占锁的概念。前面的章节中一直在强调这个特点。显然这个特点在一定程度上面减低了吞吐量,实际上独占锁是一种保守的锁策略,在这种情况下任何“读/读”,“写/读”,“写/写”操作都不能同时发生。但是同样需要强调的一个概念是,锁是有一定的开销的,当并发比较大的时候,锁的开销就比较客观了。所以如果可能的话就尽量少用锁,非要用锁的话就尝试看能否改造为读写锁。

ReadWriteLock描述的是:一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。也就是说读写锁使用的场合是一个共享资源被大量读取操作,而只有少量的写操作(修改数据)。清单1描述了ReadWriteLock的API。

清单1 ReadWriteLock 接口 public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }

清单1描述的ReadWriteLock结构,这里需要说明的是ReadWriteLock并不是Lock的子接口,只不过ReadWriteLock借助Lock来实现读写两个视角。在ReadWriteLock中每次读取共享数据就需要读取锁,当需要修改共享数据时就需要写入锁。看起来好像是两个锁,但其实不尽然,在下一节中的分析中会解释这点奥秘。

在JDK 6里面ReadWriteLock的实现是ReentrantReadWriteLock。

清单2 SimpleConcurrentMap package xylz.study.concurrency.lock;

import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;

public class SimpleConcurrentMap implements Map {

final ReadWriteLock lock = new ReentrantReadWriteLock();

final Lock r = lock.readLock();

final Lock w = lock.writeLock();

final Map<K, V> map;

public SimpleConcurrentMap(Map<K, V> map) {
    this.map = map;
    if (map == null) throw new NullPointerException();
}

public void clear() {
    w.lock();
    try {
        map.clear();
    } finally {
        w.unlock();
    }
}

public boolean containsKey(Object key) {
    r.lock();
    try {
        return map.containsKey(key);
    } finally {
        r.unlock();
    }
}

public boolean containsValue(Object value) {
    r.lock();
    try {
        return map.containsValue(value);
    } finally {
        r.unlock();
    }
}

public Set<java.util.Map.Entry<K, V>> entrySet() {
    throw new UnsupportedOperationException();
}

public V get(Object key) {
    r.lock();
    try {
        return map.get(key);
    } finally {
        r.unlock();
    }
}

public boolean isEmpty() {
    r.lock();
    try {
        return map.isEmpty();
    } finally {
        r.unlock();
    }
}

public Set<K> keySet() {
    r.lock();
    try {
        return new HashSet<K>(map.keySet());
    } finally {
        r.unlock();
    }
}

public V put(K key, V value) {
    w.lock();
    try {
        return map.put(key, value);
    } finally {
        w.unlock();
    }
}

public void putAll(Map<? extends K, ? extends V> m) {
    w.lock();
    try {
        map.putAll(m);
    } finally {
        w.unlock();
    }
}

public V remove(Object key) {
    w.lock();
    try {
        return map.remove(key);
    } finally {
        w.unlock();
    }
}

public int size() {
    r.lock();
    try {
        return map.size();
    } finally {
        r.unlock();
    }
}

public Collection<V> values() {
    r.lock();
    try {
        return new ArrayList<V>(map.values());
    } finally {
        r.unlock();
    }
}

}

清单2描述的是用读写锁实现的一个线程安全的Map。其中需要特别说明的是并没有实现entrySet()方法,这是因为实现这个方法比较复杂,在后面章节中讲到ConcurrentHashMap的时候会具体谈这些细节。另外这里keySet()和values()也没有直接返回Map的视图,而是一个映射原有元素的新视图,其实这个entrySet()一样,是为了保护原始Map的数据逻辑,防止不正确的修改导致原始Map发生数据错误。特别说明的是在没有特别需求的情况下没有必要按照清单2写一个线程安全的Map实现,因为ConcurrentHashMap已经完成了此操作。

ReadWriteLock需要严格区分读写操作,如果读操作使用了写入锁,那么降低读操作的吞吐量,如果写操作使用了读取锁,那么就可能发生数据错误。

另外ReentrantReadWriteLock还有以下几个特性:

  • 公平性

  • 非公平锁(默认) 这个和独占锁的非公平性一样,由于读线程之间没有锁竞争,所以读操作没有公平性和非公平性,写操作时,由于写操作可能立即获取到锁,所以会推迟一个或多个读操作或者写操作。因此非公平锁的吞吐量要高于公平锁。

  • 公平锁 利用AQS的CLH队列,释放当前保持的锁(读锁或者写锁)时,优先为等待时间最长的那个写线程分配写入锁,当前前提是写线程的等待时间要比所有读线程的等待时间要长。同样一个线程持有写入锁或者有一个写线程已经在等待了,那么试图获取公平锁的(非重入)所有线程(包括读写线程)都将被阻塞,直到最先的写线程释放锁。如果读线程的等待时间比写线程的等待时间还有长,那么一旦上一个写线程释放锁,这一组读线程将获取锁。
  • 重入性

  • 读写锁允许读线程和写线程按照请求锁的顺序重新获取读取锁或者写入锁。当然了只有写线程释放了锁,读线程才能获取重入锁。

  • 写线程获取写入锁后可以再次获取读取锁,但是读线程获取读取锁后却不能获取写入锁。
  • 另外读写锁最多支持65535个递归写入锁和65535个递归读取锁。
  • 锁降级

  • 写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样就从写入锁变成了读取锁,从而实现锁降级的特性。

  • 锁升级

  • 读取锁是不能直接升级为写入锁的。因为获取一个写入锁需要释放所有读取锁,所以如果有两个读取锁视图获取写入锁而都不释放读取锁时就会发生死锁。

  • 锁获取中断

  • 读取锁和写入锁都支持获取锁期间被中断。这个和独占锁一致。

  • 条件变量

  • 写入锁提供了条件变量(Condition)的支持,这个和独占锁一致,但是读取锁却不允许获取条件变量,将得到一个

UnsupportedOperationException 异常。

  • 重入数

  • 读取锁和写入锁的数量最大分别只能是65535(包括重入数)。这在下节中有介绍。

上面几个特性对读写锁的理解很有帮助,而且也是必要的,另外在下一节中讲ReadWriteLock的实现会用到这些知识的。 来源: [http://www.blogjava.net/xylz/archive/2010/07/14/326080.html](http://www.blogjava.net/xylz/archive/2010/07/14/326080.html)

这一节主要是谈谈读写锁的实现。

上一节中提到,ReadWriteLock看起来有两个锁:readLock/writeLock。如果真的是两个锁的话,它们之间又是如何相互影响的呢?

事实上在ReentrantReadWriteLock里锁的实现是靠java.util.concurrent.locks.ReentrantReadWriteLock.Sync完成的。这个类看起来比较眼熟,实际上它是AQS的一个子类,这中类似的结构在CountDownLatch、ReentrantLock、Semaphore里面都存在。同样它也有两种实现:公平锁和非公平锁,也就是java.util.concurrent.locks.ReentrantReadWriteLock.FairSync和java.util.concurrent.locks.ReentrantReadWriteLock.NonfairSync。这里暂且不提。

在ReentrantReadWriteLock里面的锁主体就是一个Sync,也就是上面提到的FairSync或者NonfairSync,所以说实际上只有一个锁,只是在获取读取锁和写入锁的方式上不一样,所以前面才有读写锁是独占锁的两个不同视图一说。

ReentrantReadWriteLock里面有两个类:ReadLock/WriteLock,这两个类都是Lock的实现。

清单1 ReadLock 片段 public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync;

protected ReadLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;
}

public void lock() {
    sync.acquireShared(1);
}

public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public  boolean tryLock() {
    return sync.tryReadLock();
}

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public  void unlock() {
    sync.releaseShared(1);
}

public Condition newCondition() {
    throw new UnsupportedOperationException();
}

}

清单2 WriteLock 片段

public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquire(1); }

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

public boolean tryLock( ) {
    return sync.tryWriteLock();
}

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public void unlock() {
    sync.release(1);
}

public Condition newCondition() {
    return sync.newCondition();
}

public boolean isHeldByCurrentThread() {
    return sync.isHeldExclusively();
}

public int getHoldCount() {
    return sync.getWriteHoldCount();
}

}

清单1描述的是读锁的实现,清单2描述的是写锁的实现。显然WriteLock就是一个独占锁,这和ReentrantLock里面的实现几乎相同,都是使用了AQS的acquire/release操作。当然了在内部处理方式上与ReentrantLock还是有一点不同的。对比清单1和清单2可以看到,ReadLock获取的是共享锁,WriteLock获取的是独占锁。

在AQS章节中介绍到AQS中有一个state字段(int类型,32位)用来描述有多少线程获持有锁。在独占锁的时代这个值通常是0或者1(如果是重入的就是重入的次数),在共享锁的时代就是持有锁的数量。在上一节中谈到,ReadWriteLock的读、写锁是相关但是又不一致的,所以需要两个数来描述读锁(共享锁)和写锁(独占锁)的数量。显然现在一个state就不够用了。于是在ReentrantReadWrilteLock里面将这个字段一分为二,高位16位表示共享锁的数量,低位16位表示独占锁的数量(或者重入数量)。2^16-1=65536,这就是上节中提到的为什么共享锁和独占锁的数量最大只能是65535的原因了。

有了上面的知识后再来分析读写锁的获取和释放就容易多了。

清单3 写入锁获取片段 protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if ((w == 0 && writerShouldBlock(current)) || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }

清单3 是写入锁获取的逻辑片段,整个工作流程是这样的:

  1. 持有锁线程数非0(c=getState()不为0),如果写线程数(w)为0(那么读线程数就不为0)或者独占锁线程(持有锁的线程)不是当前线程就返回失败,或者写入锁的数量(其实是重入数)大于65535就抛出一个Error异常。否则进行2。
  2. 如果当且写线程数位0(那么读线程也应该为0,因为步骤1已经处理c!=0的情况),并且当前线程需要阻塞那么就返回失败;如果增加写线程数失败也返回失败。否则进行3。
  3. 设置独占线程(写线程)为当前线程,返回true。

清单3 中 exclusiveCount(c)就是获取写线程数(包括重入数),也就是state的低16位值。另外这里有一段逻辑是当前写线程是否需要阻塞writerShouldBlock(current)。清单4 和清单5 就是公平锁和非公平锁中是否需要阻塞的片段。很显然对于非公平锁而言总是不阻塞当前线程,而对于公平锁而言如果AQS队列不为空或者当前线程不是在AQS的队列头那么就阻塞线程,直到队列前面的线程处理完锁逻辑。

清单4 公平读写锁写线程是否阻塞 final boolean writerShouldBlock(Thread current) { return !isFirst(current); }

清单5 非公平读写锁写线程是否阻塞

final boolean writerShouldBlock(Thread current) { return false; }

写入锁的获取逻辑清楚后,释放锁就比较简单了。清单6 描述的写入锁释放逻辑片段,其实就是检测下剩下的写入锁数量,如果是0就将独占锁线程清空(意味着没有线程获取锁),否则就是说当前是重入锁的一次释放,所以不能将独占锁线程清空。然后将剩余线程状态数写回AQS。

清单6 写入锁释放逻辑片段 protected final boolean tryRelease(int releases) { int nextc = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); if (exclusiveCount(nextc) == 0) { setExclusiveOwnerThread(null); setState(nextc); return true; } else { setState(nextc); return false; } }

清单3~6 描述的写入锁的获取释放过程。读取锁的获取和释放过程要稍微复杂些。 清单7描述的是读取锁的获取过程。

清单7 读取锁获取过程片段 protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (!readerShouldBlock(current) && compareAndSetState(c, c + SHARED_UNIT)) { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); rh.count++; return 1; } return fullTryAcquireShared(current); }

final int fullTryAcquireShared(Thread current) { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); for (;;) { int c = getState(); int w = exclusiveCount(c); if ((w != 0 && getExclusiveOwnerThread() != current) || ((rh.count | w) == 0 && readerShouldBlock(current))) return -1; if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { cachedHoldCounter = rh; // cache for release rh.count++; return 1; } } }

读取锁获取的过程是这样的:

  1. 如果写线程持有锁(也就是独占锁数量不为0),并且独占线程不是当前线程,那么就返回失败。因为允许写入线程获取锁的同时获取读取锁。否则进行2。
  2. 如果读线程请求锁数量达到了65535(包括重入锁),那么就跑出一个错误Error,否则进行3。
  3. 如果读线程不用等待(实际上是是否需要公平锁),并且增加读取锁状态数成功,那么就返回成功,否则进行4。
  4. 步骤3失败的原因是CAS操作修改状态数失败,那么就需要循环不断尝试去修改状态直到成功或者锁被写入线程占有。实际上是过程3的不断尝试直到CAS计数成功或者被写入线程占有锁。

在清单7 中有一个对象HoldCounter,这里暂且不提这是什么结构和为什么存在这样一个结构。

接下来根据清单8 我们来看如何释放一个读取锁。同样先不理HoldCounter,关键的在于for循环里面,其实就是一个不断尝试的CAS操作,直到修改状态成功。前面说过state的高16位描述的共享锁(读取锁)的数量,所以每次都需要减去2^16,这样就相当于读取锁数量减1。实际上SHARED_UNIT=1<<16。

清单8 读取锁释放过程 protected final boolean tryReleaseShared(int unused) { HoldCounter rh = cachedHoldCounter; Thread current = Thread.currentThread(); if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); if (rh.tryDecrement() <= 0) throw new IllegalMonitorStateException(); for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }

好了,现在回头看HoldCounter到底是一个什么东西。首先我们可以看到只有在获取共享锁(读取锁)的时候加1,也只有在释放共享锁的时候减1有作用,并且在释放锁的时候抛出了一个IllegalMonitorStateException异常。而我们知道IllegalMonitorStateException通常描述的是一个线程操作一个不属于自己的监视器对象的引发的异常。也就是说这里的意思是一个线程释放了一个不属于自己或者不存在的共享锁。

前面的章节中一再强调,对于共享锁,其实并不是锁的概念,更像是计数器的概念。一个共享锁就相对于一次计数器操作,一次获取共享锁相当于计数器加1,释放一个共享锁就相当于计数器减1。显然只有线程持有了共享锁(也就是当前线程携带一个计数器,描述自己持有多少个共享锁或者多重共享锁),才能释放一个共享锁。否则一个没有获取共享锁的线程调用一次释放操作就会导致读写锁的state(持有锁的线程数,包括重入数)错误。

明白了HoldCounter的作用后我们就可以猜到它的作用其实就是当前线程持有共享锁(读取锁)的数量,包括重入的数量。那么这个数量就必须和线程绑定在一起。

在Java里面将一个对象和线程绑定在一起,就只有ThreadLocal才能实现了。所以毫无疑问HoldCounter就应该是绑定到线程上的一个计数器。

清单9 线程持有读取锁数量的计数器 static final class HoldCounter { int count; final long tid = Thread.currentThread().getId(); int tryDecrement() { int c = count; if (c > 0) count = c - 1; return c; } }

static final class ThreadLocalHoldCounter extends ThreadLocal { public HoldCounter initialValue() { return new HoldCounter(); } }

清单9 描述的是线程持有读取锁数量的计数器。可以看到这里使用ThreadLocal将HoldCounter绑定到当前线程上,同时HoldCounter也持有线程Id,这样在释放锁的时候才能知道ReadWriteLock里面缓存的上一个读取线程(cachedHoldCounter)是否是当前线程。这样做的好处是可以减少ThreadLocal.get()的次数,因为这也是一个耗时操作。需要说明的是这样HoldCounter绑定线程id而不绑定线程对象的原因是避免HoldCounter和ThreadLocal互相绑定而GC难以释放它们(尽管GC能够智能的发现这种引用而回收它们,但是这需要一定的代价),所以其实这样做只是为了帮助GC快速回收对象而已。

除了readLock()和writeLock()外,Lock对象还允许tryLock(),那么ReadLock和WriteLock的tryLock()不一样。清单10 和清单11 分别描述了读取锁的tryLock()和写入锁的tryLock()。

读取锁tryLock()也就是tryReadLock()成功的条件是:没有写入锁或者写入锁是当前线程,并且读线程共享锁数量没有超过65535个。

写入锁tryLock()也就是tryWriteLock()成功的条件是: 没有写入锁或者写入锁是当前线程,并且尝试一次修改state成功。

清单10 读取锁的tryLock() final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); rh.count++; return true; } } }

清单11 写入锁的tryLock()

final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 ||current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; }

整个读写锁的逻辑大概就这么多,其实真正研究起来也不是很复杂,真正复杂的东西都在AQS里面。

锁部分的原理和思想都介绍完了,下一节里面会对锁机进行小节,并对线程并发也会有一些简单的小节。 来源: [http://www.blogjava.net/xylz/archive/2010/07/15/326152.html](http://www.blogjava.net/xylz/archive/2010/07/15/326152.html)

主要谈谈锁的性能以及其它一些理论知识,内容主要的出处是《Java Concurrency in Practice》,结合自己的理解和实际应用对锁机制进行一个小小的总结。

首先需要强调的一点是:所有锁(包括内置锁和高级锁)都是有性能消耗的,也就是说在高并发的情况下,由于锁机制带来的上下文切换、资源同步等消耗是非常可观的。在某些极端情况下,线程在锁上的消耗可能比线程本身的消耗还要多。所以如果可能的话,在任何情况下都尽量少用锁,如果不可避免那么采用非阻塞算法是一个不错的解决方案,但是却也不是绝对的。

内部锁

Java语言通过synchronized关键字来保证原子性。这是因为每一个Object都有一个隐含的锁,这个也称作监视器对象。在进入synchronized之前自动获取此内部锁,而一旦离开此方式(不管通过和中方式离开此方法)都会自动释放锁。显然这是一个独占锁,每个锁请求之间是互斥的。相对于前面介绍的众多高级锁(Lock/ReadWriteLock等),synchronized的代价都比后者要高。但是synchronized的语法比较简单,而且也比较容易使用和理解,不容易写法上的错误。而我们知道Lock一旦调用了lock()方法获取到锁而未正确释放的话很有可能就死锁了。所以Lock的释放操作总是跟在finally代码块里面,这在代码结构上也是一次调整和冗余。另外前面介绍中说过Lock的实现已经将硬件资源用到了极致,所以未来可优化的空间不大,除非硬件有了更高的性能。但是synchronized只是规范的一种实现,这在不同的平台不同的硬件还有很高的提升空间,未来Java在锁上的优化也会主要在这上面。

性能

由于锁总是带了性能影响,所以是否使用锁和使用锁的场合就变得尤为重要。如果在一个高并发的Web请求中使用了强制的独占锁,那么就可以发现Web的吞吐量将急剧下降。

为了利用并发来提高性能,出发点就是:更有效的利用现有的资源,同时让程序尽可能的开拓更多可用的资源。这意味着机器尽可能的处于忙碌的状态,通常意义是说CPU忙于计算,而不是等待。当然CPU要做有用的事情,而不是进行无谓的循环。当然在实践中通常会预留一些资源出来以便应急特殊情况,这在以后的线程池并发中可以看到很多例子。

线程阻塞

锁机制的实现通常需要操作系统提供支持,显然这会增加开销。当锁竞争的时候,失败的线程必然会发生阻塞。JVM既能自旋等待(不断尝试,知道成功,很多CAS就是这样实现的),也能够在操作系统中挂起阻塞的线程,直到超时或者被唤醒。通常情况下这取决于上下文切换的开销以及与获取锁需要等待的时间二者之间的关系。自旋等待适合于比较短的等待,而挂起线程比较适合那些比较耗时的等待。

挂起一个线程可能是因为无法获取到锁,或者需要某个特定的条件,或者耗时的I/O操作。挂起一个线程需要两次额外的上下文切换以及操作系统、缓存等多资源的配合:如果线程被提前换出,那么一旦拿到锁或者条件满足,那么又需要将线程换回执行队列,这对线程而言,两次上下文切换可能比较耗时。


锁竞争

影响锁竞争性的条件有两个:锁被请求的频率和每次持有锁的时间。显然当而这二者都很小的时候,锁竞争不会成为主要的瓶颈。但是如果锁使用不当,导致二者都比较大,那么很有可能CPU不能有效的处理任务,任务被大量堆积。

所以减少锁竞争的方式有下面三种:

  1. 减少锁持有的时间
  2. 减少锁请求的频率
  3. 采用共享锁取代独占锁

死锁

如果一个线程永远不释放另外一个线程需要的资源那么就会导致死锁。这有两种情况:一种情况是线程A永远不释放锁,结果B一直拿不到锁,所以线程B就“死掉”了;第二种情况下,线程A拥有线程B需要的锁Y,同时线程B拥有线程A需要的锁X,那么这时候线程A/B互相依赖对方释放锁,于是二者都“死掉”了。

还有一种情况为发生死锁,如果一个线程总是不能被调度,那么等待此线程结果的线程可能就死锁了。这种情况叫做线程饥饿死锁。比如说在前面介绍的非公平锁中,如果某些线程非常活跃,在高并发情况下这类线程可能总是拿到锁,那么那些活跃度低的线程可能就一直拿不到锁,这样就发生了“饥饿死”。

避免死锁的解决方案是:尽可能的按照锁的使用规范请求锁,另外锁的请求粒度要小(不要在不需要锁的地方占用锁,锁不用了尽快释放);在高级锁里面总是使用tryLock或者定时机制(这个以后会讲,就是指定获取锁超时的时间,如果时间到了还没有获取到锁那么就放弃)。高级锁(Lock)里面的这两种方式可以有效的避免死锁。

活锁

活锁描述的是线程总是尝试某项操作却总是失败的情况。这种情况下尽管线程没有被阻塞,但是人物却总是不能被执行。比如在一个死循环里面总是尝试做某件事,结果却总是失败,现在线程将永远不能跳出这个循环。另外一种情况是在一个队列中每次从队列头取出一个任务来执行,每次都失败,然后将任务放入队列头,接下来再一次从队列头取出任务执行,仍然失败。

还有一种活锁方式发生在“碰撞协让”情况下:两个人过独木桥,如果在半路相撞,双方礼貌退出去然后再试一次。如果总是失败,那么这两个任务将一直无法得到执行。

总之解决锁问题的关键就是:从简单的开始,先保证正确,然后再开始优化。 来源: [http://www.blogjava.net/xylz/archive/2010/07/16/326246.html](http://www.blogjava.net/xylz/archive/2010/07/16/326246.html)

聊聊并发(四)——深入分析ConcurrentHashMap

Posted on

聊聊并发(四)——深入分析ConcurrentHashMap

术语定义

术语 英文 解释 哈希算法 hash algorithm 是一种将任意内容的输入转换成相同长度输出的加密方式,其输出被称为哈希值。 哈希表 hash table 根据设定的哈希函数H(key)和处理冲突方法将一组关键字映象到一个有限的地址区间上,并以关键字在地址区间中的象作为记录在表中的存储位置,这种表称为哈希表或散列,所得存储位置称为哈希地址或散列地址。

线程不安全的HashMap

因为多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap,如以下代码 final HashMap map = new HashMap(2);

Thread t = new Thread(new Runnable() { @Override

public void run() { for (int i = 0; i < 10000; i++) {

new Thread(new Runnable() { @Override

public void run() { map.put(UUID.randomUUID().toString(), "");

} }, "ftf" + i).start();

} }

}, "ftf"); t.start();

t.join();

效率低下的HashTable容器

HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,并且也不能使用get方法来获取元素,所以竞争越激烈效率越低。

锁分段技术

HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

ConcurrentHashMap的结构

我们通过ConcurrentHashMap的类图来分析ConcurrentHashMap的结构。

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

ConcurrentHashMap的初始化

ConcurrentHashMap初始化方法是通过initialCapacity,loadFactor, concurrencyLevel几个参数来初始化segments数组,段偏移量segmentShift,段掩码segmentMask和每个segment里的HashEntry数组 。

初始化segments数组。让我们来看一下初始化segmentShift,segmentMask和segments数组的源代码。 if (concurrencyLevel > MAX_SEGMENTS)

concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments

int sshift = 0; int ssize = 1;

while (ssize < concurrencyLevel) { ++sshift;

ssize <<= 1; }

segmentShift = 32 - sshift; segmentMask = ssize - 1;

this.segments = Segment.newArray(ssize);

由上面的代码可知segments数组的长度ssize通过concurrencyLevel计算得出。为了能通过按位与的哈希算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个是大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14,15或16,ssize都会等于16,即容器里锁的个数也是16。注意concurrencyLevel的最大大小是65535,意味着segments数组的长度最大为65536,对应的二进制是16位。

初始化segmentShift和segmentMask。这两个全局变量在定位segment时的哈希算法里需要使用,sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。segmentShift用于定位参与hash运算的位数,segmentShift等于32减sshift,所以等于28,这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的,后面的测试中我们可以看到这点。segmentMask是哈希运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1。因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每个位都是1。

初始化每个Segment。输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法里需要通过这两个参数来初始化数组中的每个segment。 if (initialCapacity > MAXIMUM_CAPACITY)

initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize;

if (c /* ssize < initialCapacity) ++c;

int cap = 1; while (cap < c)

cap <<= 1; for (int i = 0; i < this.segments.length; ++i)

this.segments[i] = new Segment(cap, loadFactor);

上面代码中的变量cap就是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。segment的容量threshold=(int)cap/*loadFactor,默认情况下initialCapacity等于16,loadfactor等于0.75,通过运算cap等于1,threshold等于零。

定位Segment

既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。 private static int hash(int h) {

h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10);

h += (h << 3); h ^= (h >>> 6);

h += (h << 2) + (h << 14); return h ^ (h >>> 16);

}

之所以进行再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。我做了一个测试,不通过再哈希而直接执行哈希计算。

System.out.println(Integer.parseInt("0001111", 2) & 15);

System.out.println(Integer.parseInt("0011111", 2) & 15); System.out.println(Integer.parseInt("0111111", 2) & 15);

System.out.println(Integer.parseInt("1111111", 2) & 15);

计算后输出的哈希值全是15,通过这个例子可以发现如果不进行再哈希,哈希冲突会非常严重,因为只要低位一样,无论高位是什么数,其哈希值总是一样。我们再把上面的二进制数据进行再哈希后结果如下,为了方便阅读,不足32位的高位补了0,每隔四位用竖线分割下。

0100|0111|0110|0111|1101|1010|0100|1110

1111|0111|0100|0011|0000|0001|1011|1000 0111|0111|0110|1001|0100|0110|0011|1110

1000|0011|0000|0000|1100|1000|0001|1010

可以发现每一位的数据都散列开了,通过这种再哈希能让数字的每一位都能参加到哈希运算当中,从而减少哈希冲突。ConcurrentHashMap通过以下哈希算法定位segment。

final Segment segmentFor(int hash) {

return segments[(hash >>> segmentShift) & segmentMask]; }

默认情况下segmentShift为28,segmentMask为15,再哈希后的数最大是32位二进制数据,向右无符号移动28位,意思是让高4位参与到hash运算中, (hash >>> segmentShift) & segmentMask的运算结果分别是4,15,7和8,可以看到hash值没有发生冲突。

ConcurrentHashMap的get操作

Segment的get操作实现非常简单和高效。先经过一次再哈希,然后使用这个哈希值通过哈希运算定位到segment,再通过哈希算法定位到元素,代码如下: public V get(Object key) {

int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash);

}

get操作的高效之处在于整个get过程不需要加锁,除非读到的值是空的才会加锁重读,我们知道HashTable容器的get方法是需要加锁的,那么ConcurrentHashMap的get操作是如何做到不加锁的呢?原因是它的get方法里将要使用的共享变量都定义成volatile,如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写共享变量count和value,所以可以不用加锁。之所以不会读到过期的值,是根据java内存模型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景。

transient volatile int count;

volatile V value;

在定位元素的代码里我们可以发现定位HashEntry和定位Segment的哈希算法虽然一样,都与数组的长度减去一相与,但是相与的值不一样,定位Segment使用的是元素的hashcode通过再哈希后得到的值的高位,而定位HashEntry直接使用的是再哈希后的值。其目的是避免两次哈希后的值一样,导致元素虽然在Segment里散列开了,但是却没有在HashEntry里散列开。

hash >>> segmentShift) & segmentMask//定位Segment所使用的hash算法

int index = hash & (tab.length - 1);// 定位HashEntry所使用的hash算法

ConcurrentHashMap的Put操作

由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须得加锁。Put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置然后放在HashEntry数组里。

是否需要扩容。在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阀值,数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。

如何扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再hash后插入到新的数组里。为了高效ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。

ConcurrentHashMap的size操作

如果我们要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,我们是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是的,虽然相加时可以获取每个Segment的count的最新值,但是拿到之后可能累加前使用的count发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计size的时候把所有Segment的put,remove和clean方法全部锁住,但是这种做法显然非常低效。 因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。

那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

参考资料

  1. JDK1.6源代码。
  2. 《Java并发编程实践》。
  3. Java并发编程之ConcurrentHashMap

作者介绍

方腾飞,花名清英,淘宝资深开发工程师,关注并发编程,目前在广告技术部从事无线广告联盟的开发和设计工作。个人博客:http://ifeve.com 微博:http://weibo.com/kirals 欢迎通过我的微博进行技术交流。

感谢张龙对本文的审校。 来源: [http://www.infoq.com/cn/articles/ConcurrentHashMap](http://www.infoq.com/cn/articles/ConcurrentHashMap)