diff --git a/ecosystem.config.js b/ecosystem.config.js index 15cfbc5a..cea4ced5 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -4,9 +4,11 @@ module.exports = { name: 'mx-server', script: 'index.js', autorestart: true, - exec_mode: 'fork', + exec_mode: 'cluster', watch: false, + instances: 2, max_memory_restart: '230M', + env: { NODE_ENV: 'production', }, diff --git a/pm2.dev.config.js b/pm2.dev.config.js index 6bb990e9..22c8261e 100644 --- a/pm2.dev.config.js +++ b/pm2.dev.config.js @@ -8,7 +8,7 @@ module.exports = { watch: false, max_memory_restart: '230M', env: { - NODE_ENV: 'development', + NODE_ENV: 'production', }, args: '--allowed_origins=dev.* --cluster', diff --git a/src/app.config.js b/src/app.config.js index 12eabf88..0019675b 100644 --- a/src/app.config.js +++ b/src/app.config.js @@ -5,10 +5,6 @@ const cluster = require('cluster') const { argv } = require('zx') Object.defineProperty(exports, '__esModule', { value: true }) -if (cluster.isPrimary) { - console.log(argv) -} - exports.PORT = argv.port || process.env.PORT || 2333 exports.API_VERSION = 2 exports.CROSS_DOMAIN = { @@ -65,6 +61,10 @@ exports.SECURITY = { } exports.CLUSTER = { - enable: argv.cluster ?? true, - workers: argv.cluster_workers ?? 2, + enable: argv.cluster ?? false, + workers: argv.cluster_workers, +} + +if (!exports.CLUSTER.enable || cluster.isPrimary) { + console.log(argv) } diff --git a/src/app.module.ts b/src/app.module.ts index b29c2ca6..e6726d16 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -4,6 +4,7 @@ import { GraphQLModule } from '@nestjs/graphql' import cluster from 'cluster' import { mkdirSync } from 'fs' import { join } from 'path' +import { CLUSTER } from './app.config' import { AppController } from './app.controller' import { AppResolver } from './app.resolver' import { AllExceptionsFilter } from './common/filters/any-exception.filter' @@ -63,7 +64,7 @@ function mkdirs() { Logger.log(chalk.blue('资源目录已经建好: ' + USER_ASSET_DIR)) } -if (cluster.isPrimary) { +if (!CLUSTER.enable || cluster.isPrimary) { mkdirs() } diff --git a/src/bootstrap.ts b/src/bootstrap.ts index 312de0bc..70517831 100644 --- a/src/bootstrap.ts +++ b/src/bootstrap.ts @@ -1,7 +1,6 @@ import { Logger, RequestMethod, ValidationPipe } from '@nestjs/common' import { NestFactory } from '@nestjs/core' import { NestFastifyApplication } from '@nestjs/platform-fastify' -import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger' import cluster from 'cluster' import { performance } from 'perf_hooks' import { API_VERSION, CLUSTER, CROSS_DOMAIN, PORT } from './app.config' @@ -63,6 +62,7 @@ async function bootstrap() { app.useWebSocketAdapter(new RedisIoAdapter(app)) if (isDev) { + const { DocumentBuilder, SwaggerModule } = await import('@nestjs/swagger') const options = new DocumentBuilder() .setTitle('API') .setDescription('The blog API description') @@ -102,7 +102,7 @@ async function bootstrap() { } if (CLUSTER.enable) { - Cluster.register(+CLUSTER.workers, bootstrap) + Cluster.register(parseInt(CLUSTER.workers) || os.cpus().length, bootstrap) } else { bootstrap() } diff --git a/src/cluster.ts b/src/cluster.ts index 1e134347..febd8c50 100644 --- a/src/cluster.ts +++ b/src/cluster.ts @@ -36,8 +36,10 @@ export class Cluster { consola.info('Worker %s is online', worker.process.pid) }) cluster.on('exit', (worker, code, signal) => { - consola.info(`Worker ${worker.process.pid} died. Restarting`) - cluster.fork() + if (code !== 0) { + consola.info(`Worker ${worker.process.pid} died. Restarting`) + cluster.fork() + } }) } else { callback() diff --git a/src/modules/auth/auth.module.ts b/src/modules/auth/auth.module.ts index a95c8a3c..4199903b 100644 --- a/src/modules/auth/auth.module.ts +++ b/src/modules/auth/auth.module.ts @@ -3,7 +3,7 @@ import { JwtModule } from '@nestjs/jwt' import { PassportModule } from '@nestjs/passport' import cluster from 'cluster' import { machineIdSync } from 'node-machine-id' -import { SECURITY } from '~/app.config' +import { CLUSTER, SECURITY } from '~/app.config' import { AdminEventsGateway } from '../../processors/gateway/admin/events.gateway' import { AuthController } from './auth.controller' import { AuthService } from './auth.service' @@ -18,7 +18,7 @@ export const __secret: any = Buffer.from(getMachineId()).toString('base64').slice(0, 15) || 'asjhczxiucipoiopiqm2376' -if (cluster.isPrimary) { +if (!CLUSTER.enable || cluster.isPrimary) { consola.log( 'JWT Secret start with :', __secret.slice(0, 5) + '*'.repeat(__secret.length - 5), diff --git a/src/modules/health/health.controller.ts b/src/modules/health/health.controller.ts index c083de28..5027d3f6 100644 --- a/src/modules/health/health.controller.ts +++ b/src/modules/health/health.controller.ts @@ -83,7 +83,7 @@ export class HealthController { if (!isString(name)) { throw new BadRequestException('name must be string') } - const task = this.taskQueue.get(name) + const task = await this.taskQueue.get(name) if (!task) { throw new BadRequestException(`${name} is not a cron in task queue`) } diff --git a/src/processors/helper/helper.tq.service.ts b/src/processors/helper/helper.tq.service.ts index 8e4f61d5..90c94dee 100644 --- a/src/processors/helper/helper.tq.service.ts +++ b/src/processors/helper/helper.tq.service.ts @@ -1,7 +1,10 @@ import { Injectable } from '@nestjs/common' +import IORedis from 'ioredis' import { isAsyncFunction } from 'util/types' +import { safeJSONParse } from '~/utils' +import { CacheService } from '../cache/cache.service' -type ITask = Map< +type ITask = RedisMap< string, { status: 'pending' | 'fulfill' | 'reject' @@ -13,8 +16,8 @@ type ITask = Map< @Injectable() export class TaskQueueService { tasks: ITask - constructor() { - this.tasks = new Map() + constructor(private readonly redis: CacheService) { + this.tasks = new RedisMap(redis.getClient(), 'tq') } add(name: string, task: () => Promise) { @@ -50,12 +53,27 @@ export class TaskQueueService { } } - get(name: string) { - const task = this.tasks.get(name) + async get(name: string) { + const task = await this.tasks.get(name) return !task ? null : { ...task } } +} - get length() { - return this.tasks.size +class RedisMap { + constructor( + private readonly redis: IORedis.Redis, + private readonly hashName: string, + ) { + this.hashName = RedisMap.key + `${hashName}#` + } + + static key = 'redis-map#' + async get(key: K) { + const res = await this.redis.hget(this.hashName, key) + + return safeJSONParse(res) as V | null + } + set(key: K, data: V) { + return this.redis.hset(this.hashName, key, JSON.stringify(data)) } } diff --git a/src/utils/consola.util.ts b/src/utils/consola.util.ts index e19a3a08..949cd150 100644 --- a/src/utils/consola.util.ts +++ b/src/utils/consola.util.ts @@ -11,10 +11,8 @@ class DateTimeReporter extends FancyReporter { }) } } -const consola = consola_.create({ +export const consola = consola_.create({ reporters: [new DateTimeReporter()], level: isDev || argv.verbose ? LogLevel.Trace : LogLevel.Info, }) consola.wrapAll() - -export { consola } diff --git a/src/utils/tool.util.ts b/src/utils/tool.util.ts index 056681ee..532889c5 100644 --- a/src/utils/tool.util.ts +++ b/src/utils/tool.util.ts @@ -54,3 +54,11 @@ export function deleteKeys( return target } + +export const safeJSONParse = (p: any) => { + try { + return JSON.parse(p) + } catch { + return null + } +}