Skip to content

Latest commit

 

History

History
260 lines (201 loc) · 7.44 KB

process、child_process.md

File metadata and controls

260 lines (201 loc) · 7.44 KB

process、child_process

主进程

node 在 c++部分初始化,会将主进程执行状态、执行上下文、打开的文件、根目录、工作目录、收到的信号、信号处理函数、代码段、数据段的信息、进程 id、执行时间、退出码等初始化完毕。

随后会逐步将 env 等属性挂载到主进程上。

子进程

核心 API

  • child_process.spawn()
    • 最基础的创建新进程的方法,接收三个参数:命令、命令参数数组、其它选项
  • child_process.fork()
    • 衍生新的 node.js 进程,父子进程建立 IPC 通信,
  • child_process.exec()
    • 衍生 shell,并在 ishell 中执行命令
    • 数据量不大
  • child_process.execFile()
    • 衍生命令,默认不衍生 shell,效率高一点
  • ChildProcess
    • 调用上述方法之后,会返回一个ChildProcess类的实例

child_process.spawn()

spawn()中会先初始化一个ChildProcess的实例,然后调用这个实例的.spawn()方法。

function spawn(file, args, options) {
  // 创建一个新的子进程实例
  const child = new ChildProcess();
  // 调用子进程实例的spawn方法
  child.spawn(options);
  return child;
}

在 spawn 方法中,会生成用于 IPC 的 stdio 配置。

ChildProcess.prototype.spawn = function(options) {
  let i = 0;

  // 默认以管道的形式进行进程间通信
  let stdio = options.stdio || 'pipe';
  // 创建stdio
  stdio = getValidStdio(stdio, false);

  const ipc = stdio.ipc;
  // ipc文件描述符
  const ipcFd = stdio.ipcFd;
  stdio = options.stdio = stdio.stdio;
  // 默认以json格式进行序列化
  const serialization = options.serialization || 'json';

  if (ipc !== undefined) {
    if (options.envPairs === undefined)
      options.envPairs = [];
    else if (!ArrayIsArray(options.envPairs)) {
      throw new ERR_INVALID_ARG_TYPE('options.envPairs',
                                     'Array',
                                     options.envPairs);
    }
    // 传递ipc文件描述符及序列化方式
    ArrayPrototypePush(options.envPairs, `NODE_CHANNEL_FD=${ipcFd}`);
    ArrayPrototypePush(options.envPairs,
                       `NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`);
  }
  this.spawnfile = options.file;

  // 创建子进程,会调用到c++层,并最终通过libuv的uv_spawn方法创建一个新的进程
  const err = this._handle.spawn(options);

  // 为父进程创建一个.send()方法,.send()方法用于向子进程传递消息。
  // 同时,父进程开始监听IPC消息
  if (ipc !== undefined) setupChannel(this, ipc, serialization);

  return err;
};

child_process.fork()

fork 方法最终会调用到 spawn 方法。

function fork(modulePath /* , args, options */) {


  if (typeof options.stdio === 'string') {
    options.stdio = stdioStringToArray(options.stdio, 'ipc');
  } else if (!ArrayIsArray(options.stdio)) {
    // 默认以继承的方式创建子进程的stdio
    // 如果silent为true,则子进程的stdio输出到父进程
    options.stdio = stdioStringToArray(
      options.silent ? 'pipe' : 'inherit',
      'ipc');
  } else if (!ArrayPrototypeIncludes(options.stdio, 'ipc')) {
    throw new ERR_CHILD_PROCESS_IPC_REQUIRED('options.stdio');
  }

  options.execPath = options.execPath || process.execPath;
  options.shell = false;

  // 最终调用到spawn方法
  return spawn(options.execPath, args, options);
}

进程间通信

父进程在创建子进程之前,会先创建好 IPC,并对这个 IPC 进行监听。随后,将 IPC 的 fd 传递给子进程,子进程根据 fd 连接这个 IPC,从而建立起父子进程的通信机制。

创建 IPC 通道

spawn()方法中会调用getValidStdio()方法来生成 stdio:

ChildProcess.prototype.spawn = function(options) {
  stdio = getValidStdio(stdio, false);
}

getValidStdio()中会创建一个新的 ipc,并返回 ipc、ipcFd。

function getValidStdio(stdio, sync) {
  let ipc;
  let ipcFd;

  stdio = ArrayPrototypeReduce(stdio, (acc, stdio, i) => {

    // ......
    } else if (stdio === 'ipc') {

      // 调用c++层,创建IPC
      ipc = new Pipe(PipeConstants.IPC);
      ipcFd = i;

      ArrayPrototypePush(acc, {
        type: 'pipe',
        handle: ipc,
        ipc: true
      });
    }
    // ......
  }

  return { stdio, ipc, ipcFd };
}

父进程监听 IPC

同样的,在spawn()方法的最后,会调用setupChannel()方法来建立连接。

ChildProcess.prototype.spawn = function(options) {
  stdio = getValidStdio(stdio, false);

  if (ipc !== undefined) setupChannel(this, ipc, serialization);
}

onread()方法中,父进程接收子进程传来的 handel、buffer,并进程处理:

function setupChannel(target, channel, serializationMode) {
  // 主进程处理接收到的数据
  channel.onread = function(arrayBuffer) {
    // 接收子进程传递过来的 handel
    const recvHandle = channel.pendingHandle;
    if (arrayBuffer) {
      // 获取到对应的buffer
      const pool = new Uint8Array(arrayBuffer, offset, nread);
      if (recvHandle)
        pendingHandle = recvHandle;

      for (const message of parseChannelMessages(channel, pool)) {
        // 处理接收到的数据
        handleMessage(message, pendingHandle, true);
      }
    }
  };

  function handleMessage(message, handle, internal) {
    const eventName = (internal ? 'internalMessage' : 'message');
    process.nextTick(emit, eventName, message, handle);
  }
}

父进程通过send()方法向子进程传递消息:

function setupChannel(target, channel, serializationMode) {

  target.send = function(message, handle, options, callback) {
    if (this.connected) {
      return this._send(message, handle, options, callback);
    }
  };

  target._send = function(message, handle, options, callback) {
    // 写入数据
    const err = writeChannelMessage(channel, req, message, handle);
  }
}

writeChannelMessage(channel, req, message, handle) {
  const ser = new ChildProcessSerializer();
  // 进行序列化
  ser.writeValue(message);
  // 写入buffer
  const result = channel.writeBuffer(req, buffer, handle);
},

子进程监听消息

在进程 bootstrap 的阶段,会调用setupChildProcessIpcChannel()方法,会尝试从process.env.NODE_CHANNEL_FD字段中获取 IPC 的 fd。如果存在该 fd,则表明这个进程是一个子进程:

function setupChildProcessIpcChannel() {
  if (process.env.NODE_CHANNEL_FD) {
    const assert = require('internal/assert');
    // 尝试获取 fd
    const fd = NumberParseInt(process.env.NODE_CHANNEL_FD, 10);
    // 如果fd存在
    assert(fd >= 0);

    const serializationMode =
      process.env.NODE_CHANNEL_SERIALIZATION_MODE || 'json';
    delete process.env.NODE_CHANNEL_SERIALIZATION_MODE;
    // 则调用_forkChild方法
    require('child_process')._forkChild(fd, serializationMode);
    assert(process.send);
  }
}

然后再_forkChild()方法中建立与 IPC 的链接:

function _forkChild(fd, serializationMode) {

  const p = new Pipe(PipeConstants.IPC);
  // 子进连接到IPC中
  p.open(fd);
  p.unref();
  // 同样,子进程调用setupChannel方法
  // 添加send方法,向IPC写入数据
  // 添加onread方法,用于监听IPC传递过来的数据
  const control = setupChannel(process, p, serializationMode);
}