Atomics 对象

多线程共享内存,最大的问题就是如何防止两个线程同时修改某个地址,或者说,当一个线程修改共享内存以后,必须有一个机制让其他线程同步。SharedArrayBuffer API 提供Atomics对象,保证所有共享内存的操作都是“原子性”的,并且可以在所有线程内同步。

什么叫“原子性操作”呢?现代编程语言中,一条普通的命令被编译器处理以后,会变成多条机器指令。如果是单线程运行,这是没有问题的;多线程环境并且共享内存时,就会出问题,因为这一组机器指令的运行期间,可能会插入其他线程的指令,从而导致运行结果出错。请看下面的例子。

  1. // 主线程
  2. ia[42] = 314159; // 原先的值 191
  3. ia[37] = 123456; // 原先的值 163
  4. // Worker 线程
  5. console.log(ia[37]);
  6. console.log(ia[42]);
  7. // 可能的结果
  8. // 123456
  9. // 191

上面代码中,主线程的原始顺序是先对 42 号位置赋值,再对 37 号位置赋值。但是,编译器和 CPU 为了优化,可能会改变这两个操作的执行顺序(因为它们之间互不依赖),先对 37 号位置赋值,再对 42 号位置赋值。而执行到一半的时候,Worker 线程可能就会来读取数据,导致打印出123456191

下面是另一个例子。

  1. // 主线程
  2. const sab = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * 100000);
  3. const ia = new Int32Array(sab);
  4. for (let i = 0; i < ia.length; i++) {
  5. ia[i] = primes.next(); // 将质数放入 ia
  6. }
  7. // worker 线程
  8. ia[112]++; // 错误
  9. Atomics.add(ia, 112, 1); // 正确

上面代码中,Worker 线程直接改写共享内存ia[112]++是不正确的。因为这行语句会被编译成多条机器指令,这些指令之间无法保证不会插入其他进程的指令。请设想如果两个线程同时ia[112]++,很可能它们得到的结果都是不正确的。

Atomics对象就是为了解决这个问题而提出,它可以保证一个操作所对应的多条机器指令,一定是作为一个整体运行的,中间不会被打断。也就是说,它所涉及的操作都可以看作是原子性的单操作,这可以避免线程竞争,提高多线程共享内存时的操作安全。所以,ia[112]++要改写成Atomics.add(ia, 112, 1)

Atomics对象提供多种方法。

(1)Atomics.store(),Atomics.load()

store()方法用来向共享内存写入数据,load()方法用来从共享内存读出数据。比起直接的读写操作,它们的好处是保证了读写操作的原子性。

此外,它们还用来解决一个问题:多个线程使用共享内存的某个位置作为开关(flag),一旦该位置的值变了,就执行特定操作。这时,必须保证该位置的赋值操作,一定是在它前面的所有可能会改写内存的操作结束后执行;而该位置的取值操作,一定是在它后面所有可能会读取该位置的操作开始之前执行。store方法和load方法就能做到这一点,编译器不会为了优化,而打乱机器指令的执行顺序。

  1. Atomics.load(array, index)
  2. Atomics.store(array, index, value)

store方法接受三个参数:SharedBuffer 的视图、位置索引和值,返回sharedArray[index]的值。load方法只接受两个参数:SharedBuffer 的视图和位置索引,也是返回sharedArray[index]的值。

  1. // 主线程 main.js
  2. ia[42] = 314159; // 原先的值 191
  3. Atomics.store(ia, 37, 123456); // 原先的值是 163
  4. // Worker 线程 worker.js
  5. while (Atomics.load(ia, 37) == 163);
  6. console.log(ia[37]); // 123456
  7. console.log(ia[42]); // 314159

上面代码中,主线程的Atomics.store向 42 号位置的赋值,一定是早于 37 位置的赋值。只要 37 号位置等于 163,Worker 线程就不会终止循环,而对 37 号位置和 42 号位置的取值,一定是在Atomics.load操作之后。

下面是另一个例子。

  1. // 主线程
  2. const worker = new Worker('worker.js');
  3. const length = 10;
  4. const size = Int32Array.BYTES_PER_ELEMENT * length;
  5. // 新建一段共享内存
  6. const sharedBuffer = new SharedArrayBuffer(size);
  7. const sharedArray = new Int32Array(sharedBuffer);
  8. for (let i = 0; i < 10; i++) {
  9. // 向共享内存写入 10 个整数
  10. Atomics.store(sharedArray, i, 0);
  11. }
  12. worker.postMessage(sharedBuffer);

上面代码中,主线程用Atomics.store()方法写入数据。下面是 Worker 线程用Atomics.load()方法读取数据。

  1. // worker.js
  2. self.addEventListener('message', (event) => {
  3. const sharedArray = new Int32Array(event.data);
  4. for (let i = 0; i < 10; i++) {
  5. const arrayValue = Atomics.load(sharedArray, i);
  6. console.log(`The item at array index ${i} is ${arrayValue}`);
  7. }
  8. }, false);

(2)Atomics.exchange()

Worker 线程如果要写入数据,可以使用上面的Atomics.store()方法,也可以使用Atomics.exchange()方法。它们的区别是,Atomics.store()返回写入的值,而Atomics.exchange()返回被替换的值。

  1. // Worker 线程
  2. self.addEventListener('message', (event) => {
  3. const sharedArray = new Int32Array(event.data);
  4. for (let i = 0; i < 10; i++) {
  5. if (i % 2 === 0) {
  6. const storedValue = Atomics.store(sharedArray, i, 1);
  7. console.log(`The item at array index ${i} is now ${storedValue}`);
  8. } else {
  9. const exchangedValue = Atomics.exchange(sharedArray, i, 2);
  10. console.log(`The item at array index ${i} was ${exchangedValue}, now 2`);
  11. }
  12. }
  13. }, false);

上面代码将共享内存的偶数位置的值改成1,奇数位置的值改成2

(3)Atomics.wait(),Atomics.wake()

使用while循环等待主线程的通知,不是很高效,如果用在主线程,就会造成卡顿,Atomics对象提供了wait()wake()两个方法用于等待通知。这两个方法相当于锁内存,即在一个线程进行操作时,让其他线程休眠(建立锁),等到操作结束,再唤醒那些休眠的线程(解除锁)。

  1. // Worker 线程
  2. self.addEventListener('message', (event) => {
  3. const sharedArray = new Int32Array(event.data);
  4. const arrayIndex = 0;
  5. const expectedStoredValue = 50;
  6. Atomics.wait(sharedArray, arrayIndex, expectedStoredValue);
  7. console.log(Atomics.load(sharedArray, arrayIndex));
  8. }, false);

上面代码中,Atomics.wait()方法等同于告诉 Worker 线程,只要满足给定条件(sharedArray0号位置等于50),就在这一行 Worker 线程进入休眠。

主线程一旦更改了指定位置的值,就可以唤醒 Worker 线程。

  1. // 主线程
  2. const newArrayValue = 100;
  3. Atomics.store(sharedArray, 0, newArrayValue);
  4. const arrayIndex = 0;
  5. const queuePos = 1;
  6. Atomics.wake(sharedArray, arrayIndex, queuePos);

上面代码中,sharedArray0号位置改为100,然后就执行Atomics.wake()方法,唤醒在sharedArray0号位置休眠队列里的一个线程。

Atomics.wait()方法的使用格式如下。

  1. Atomics.wait(sharedArray, index, value, timeout)

它的四个参数含义如下。

  • sharedArray:共享内存的视图数组。
  • index:视图数据的位置(从0开始)。
  • value:该位置的预期值。一旦实际值等于预期值,就进入休眠。
  • timeout:整数,表示过了这个时间以后,就自动唤醒,单位毫秒。该参数可选,默认值是Infinity,即无限期的休眠,只有通过Atomics.wake()方法才能唤醒。

Atomics.wait()的返回值是一个字符串,共有三种可能的值。如果sharedArray[index]不等于value,就返回字符串not-equal,否则就进入休眠。如果Atomics.wake()方法唤醒,就返回字符串ok;如果因为超时唤醒,就返回字符串timed-out

Atomics.wake()方法的使用格式如下。

  1. Atomics.wake(sharedArray, index, count)

它的三个参数含义如下。

  • sharedArray:共享内存的视图数组。
  • index:视图数据的位置(从0开始)。
  • count:需要唤醒的 Worker 线程的数量,默认为Infinity

Atomics.wake()方法一旦唤醒休眠的 Worker 线程,就会让它继续往下运行。

请看一个例子。

  1. // 主线程
  2. console.log(ia[37]); // 163
  3. Atomics.store(ia, 37, 123456);
  4. Atomics.wake(ia, 37, 1);
  5. // Worker 线程
  6. Atomics.wait(ia, 37, 163);
  7. console.log(ia[37]); // 123456

上面代码中,视图数组ia的第 37 号位置,原来的值是163。Worker 线程使用Atomics.wait()方法,指定只要ia[37]等于163,就进入休眠状态。主线程使用Atomics.store()方法,将123456写入ia[37],然后使用Atomics.wake()方法唤醒 Worker 线程。

另外,基于waitwake这两个方法的锁内存实现,可以看 Lars T Hansen 的 js-lock-and-condition 这个库。

注意,浏览器的主线程不宜设置休眠,这会导致用户失去响应。而且,主线程实际上会拒绝进入休眠。

(4)运算方法

共享内存上面的某些运算是不能被打断的,即不能在运算过程中,让其他线程改写内存上面的值。Atomics 对象提供了一些运算方法,防止数据被改写。

  1. Atomics.add(sharedArray, index, value)

Atomics.add用于将value加到sharedArray[index],返回sharedArray[index]旧的值。

  1. Atomics.sub(sharedArray, index, value)

Atomics.sub用于将valuesharedArray[index]减去,返回sharedArray[index]旧的值。

  1. Atomics.and(sharedArray, index, value)

Atomics.and用于将valuesharedArray[index]进行位运算and,放入sharedArray[index],并返回旧的值。

  1. Atomics.or(sharedArray, index, value)

Atomics.or用于将valuesharedArray[index]进行位运算or,放入sharedArray[index],并返回旧的值。

  1. Atomics.xor(sharedArray, index, value)

Atomic.xor用于将vaulesharedArray[index]进行位运算xor,放入sharedArray[index],并返回旧的值。

(5)其他方法

Atomics对象还有以下方法。

  • Atomics.compareExchange(sharedArray, index, oldval, newval):如果sharedArray[index]等于oldval,就写入newval,返回oldval
  • Atomics.isLockFree(size):返回一个布尔值,表示Atomics对象是否可以处理某个size的内存锁定。如果返回false,应用程序就需要自己来实现锁定。

Atomics.compareExchange的一个用途是,从 SharedArrayBuffer 读取一个值,然后对该值进行某个操作,操作结束以后,检查一下 SharedArrayBuffer 里面原来那个值是否发生变化(即被其他线程改写过)。如果没有改写过,就将它写回原来的位置,否则读取新的值,再重头进行一次操作。