diff --git a/debug/socket.io.html b/debug/socket.io.html
new file mode 100644
index 00000000..da3e16c6
--- /dev/null
+++ b/debug/socket.io.html
@@ -0,0 +1,36 @@
+
+
+
+
+
+
+ Document
+
+
+
+
+
+
+
+
diff --git a/package.json b/package.json
index 3a3fb1dd..e5841d46 100644
--- a/package.json
+++ b/package.json
@@ -36,6 +36,7 @@
"start": "cross-env NODE_ENV=development nest start -w --path tsconfig.json",
"start:dev": "nest build --webpack --webpackPath webpack-hmr.config.js --watch",
"start:debug": "cross-env NODE_ENV=development nest start --debug --watch",
+ "start:cluster": "cross-env NODE_ENV=development nest start --watch -- --cluster",
"start:prod": "cross-env NODE_ENV=production node dist/main",
"lint": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix",
"prod": "cross-env NODE_ENV=production pm2-runtime start ecosystem.config.js",
diff --git a/pm2.dev.config.js b/pm2.dev.config.js
index bf50c811..6bb990e9 100644
--- a/pm2.dev.config.js
+++ b/pm2.dev.config.js
@@ -4,13 +4,14 @@ module.exports = {
name: 'mx-server',
script: 'dist/src/main.js',
autorestart: true,
- exec_mode: 'cluster',
+ exec_mode: 'fork',
watch: false,
- instances: 2,
max_memory_restart: '230M',
env: {
NODE_ENV: 'development',
},
+
+ args: '--allowed_origins=dev.* --cluster',
},
],
}
diff --git a/src/app.config.js b/src/app.config.js
index 5a92e88e..04b4a6b2 100644
--- a/src/app.config.js
+++ b/src/app.config.js
@@ -1,10 +1,13 @@
const isDev = process.env.NODE_ENV === 'development'
const isTEST = process.env.NODE_ENV === 'test'
+const cluster = require('cluster')
const { argv } = require('zx')
Object.defineProperty(exports, '__esModule', { value: true })
-console.log(argv)
+if (cluster.isPrimary) {
+ console.log(argv)
+}
exports.PORT = argv.port || process.env.PORT || 2333
exports.API_VERSION = 2
@@ -60,3 +63,8 @@ exports.SECURITY = {
// 跳过登陆鉴权
skipAuth: isTEST ? true : false,
}
+
+exports.CLUSTER = {
+ enable: argv.cluster ?? false,
+ workers: argv.cluster_workers ?? 8,
+}
diff --git a/src/app.module.ts b/src/app.module.ts
index 727cb2a5..b29c2ca6 100644
--- a/src/app.module.ts
+++ b/src/app.module.ts
@@ -1,6 +1,7 @@
import { Logger, MiddlewareConsumer, Module, NestModule } from '@nestjs/common'
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR } from '@nestjs/core'
import { GraphQLModule } from '@nestjs/graphql'
+import cluster from 'cluster'
import { mkdirSync } from 'fs'
import { join } from 'path'
import { AppController } from './app.controller'
@@ -61,7 +62,10 @@ function mkdirs() {
mkdirSync(USER_ASSET_DIR, { recursive: true })
Logger.log(chalk.blue('资源目录已经建好: ' + USER_ASSET_DIR))
}
-mkdirs()
+
+if (cluster.isPrimary) {
+ mkdirs()
+}
@Module({
imports: [
diff --git a/src/bootstrap.ts b/src/bootstrap.ts
index fe4c4f8c..312de0bc 100644
--- a/src/bootstrap.ts
+++ b/src/bootstrap.ts
@@ -2,9 +2,11 @@ import { Logger, RequestMethod, ValidationPipe } from '@nestjs/common'
import { NestFactory } from '@nestjs/core'
import { NestFastifyApplication } from '@nestjs/platform-fastify'
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger'
+import cluster from 'cluster'
import { performance } from 'perf_hooks'
-import { API_VERSION, CROSS_DOMAIN, PORT } from './app.config'
+import { API_VERSION, CLUSTER, CROSS_DOMAIN, PORT } from './app.config'
import { AppModule } from './app.module'
+import { Cluster } from './cluster'
import { fastifyApp } from './common/adapters/fastify.adapter'
import { RedisIoAdapter } from './common/adapters/socket.adapter'
import { SpiderGuard } from './common/guard/spider.guard'
@@ -79,12 +81,15 @@ async function bootstrap() {
app.useLogger(app.get(MyLogger))
consola.info('ENV:', process.env.NODE_ENV)
const url = await app.getUrl()
+ const pid = process.pid
+ const env = cluster.isPrimary
+ const prefix = env ? 'P' : 'W'
if (isDev) {
- consola.debug(`OpenApi: ${url}/api-docs`)
- consola.debug(`GraphQL playground: ${url}/graphql`)
+ consola.debug(`[${prefix + pid}] OpenApi: ${url}/api-docs`)
+ consola.debug(`[${prefix + pid}] GraphQL playground: ${url}/graphql`)
}
- consola.success(`Server listen on: ${url}`)
- consola.success(`Admin Dashboard: ${url}/qaqdmin`)
+ consola.success(`[${prefix + pid}] Server listen on: ${url}`)
+ consola.success(`[${prefix + pid}] Admin Dashboard: ${url}/qaqdmin`)
Logger.log(
'Server is up. ' + chalk.yellow('+' + (performance.now() | 0) + 'ms'),
)
@@ -95,4 +100,9 @@ async function bootstrap() {
module.hot.dispose(() => app.close())
}
}
-bootstrap()
+
+if (CLUSTER.enable) {
+ Cluster.register(+CLUSTER.workers, bootstrap)
+} else {
+ bootstrap()
+}
diff --git a/src/cluster.ts b/src/cluster.ts
new file mode 100644
index 00000000..aae93595
--- /dev/null
+++ b/src/cluster.ts
@@ -0,0 +1,36 @@
+import cluster from 'cluster'
+import os from 'os'
+
+export class Cluster {
+ static register(workers: Number, callback: Function): void {
+ if (cluster.isPrimary) {
+ consola.info(`Primary server started on ${process.pid}`)
+
+ //ensure workers exit cleanly
+ process.on('SIGINT', function () {
+ consola.info('Cluster shutting down...')
+ for (const id in cluster.workers) {
+ cluster.workers[id].kill()
+ }
+ // exit the master process
+ process.exit(0)
+ })
+
+ const cpus = os.cpus().length
+ if (workers > cpus) workers = cpus
+
+ for (let i = 0; i < workers; i++) {
+ cluster.fork()
+ }
+ cluster.on('online', function (worker) {
+ consola.info('Worker %s is online', worker.process.pid)
+ })
+ cluster.on('exit', (worker, code, signal) => {
+ consola.info(`Worker ${worker.process.pid} died. Restarting`)
+ cluster.fork()
+ })
+ } else {
+ callback()
+ }
+ }
+}
diff --git a/src/common/interceptors/analyze.interceptor.ts b/src/common/interceptors/analyze.interceptor.ts
index 84dc3935..b7c467fc 100644
--- a/src/common/interceptors/analyze.interceptor.ts
+++ b/src/common/interceptors/analyze.interceptor.ts
@@ -23,7 +23,6 @@ import { LOCAL_BOT_LIST_DATA_FILE_PATH } from '~/constants/path.constant'
import { AnalyzeModel } from '~/modules/analyze/analyze.model'
import { OptionModel } from '~/modules/configs/configs.model'
import { CacheService } from '~/processors/cache/cache.service'
-import { TaskQueueService } from '~/processors/helper/helper.tq.service'
import { getIp } from '~/utils/ip.util'
import { getRedisKey } from '~/utils/redis.util'
@@ -38,7 +37,6 @@ export class AnalyzeInterceptor implements NestInterceptor {
@InjectModel(OptionModel)
private readonly options: ReturnModelType,
private readonly cacheService: CacheService,
- private readonly taskService: TaskQueueService,
) {
this.init()
}
diff --git a/src/modules/aggregate/aggregate.service.ts b/src/modules/aggregate/aggregate.service.ts
index 47ca3c21..0d10ff79 100644
--- a/src/modules/aggregate/aggregate.service.ts
+++ b/src/modules/aggregate/aggregate.service.ts
@@ -304,12 +304,11 @@ export class AggregateService {
}
async getCounts() {
- const online = this.gateway.currentClientCount
-
const redisClient = this.cacheService.getClient()
const dateFormat = dayjs().format('YYYY-MM-DD')
const [
+ online,
posts,
notes,
pages,
@@ -322,6 +321,7 @@ export class AggregateService {
categories,
recently,
] = await Promise.all([
+ this.gateway.getcurrentClientCount(),
this.postService.model.countDocuments(),
this.noteService.model.countDocuments(),
this.categoryService.model.countDocuments(),
diff --git a/src/modules/auth/auth.module.ts b/src/modules/auth/auth.module.ts
index f15d08e5..969f97c8 100644
--- a/src/modules/auth/auth.module.ts
+++ b/src/modules/auth/auth.module.ts
@@ -1,6 +1,7 @@
import { Module } from '@nestjs/common'
import { JwtModule } from '@nestjs/jwt'
import { PassportModule } from '@nestjs/passport'
+import cluster from 'cluster'
import { machineIdSync } from 'node-machine-id'
import { SECURITY } from '~/app.config'
import { AdminEventsGateway } from '../../processors/gateway/admin/events.gateway'
@@ -10,17 +11,20 @@ import { JwtStrategy } from './jwt.strategy'
const getMachineId = () => {
const id = machineIdSync()
- consola.log('machine-id: ', id)
+ if (cluster.isPrimary) consola.log('machine-id: ', id)
return id
}
export const __secret: any =
SECURITY.jwtSecret ||
Buffer.from(getMachineId()).toString('base64').slice(0, 15) ||
'asjhczxiucipoiopiqm2376'
-consola.log(
- 'JWT Secret start with :',
- __secret.slice(0, 5) + '*'.repeat(__secret.length - 5),
-)
+
+if (cluster.isPrimary) {
+ consola.log(
+ 'JWT Secret start with :',
+ __secret.slice(0, 5) + '*'.repeat(__secret.length - 5),
+ )
+}
const jwtModule = JwtModule.registerAsync({
useFactory() {
diff --git a/src/processors/gateway/web/events.gateway.ts b/src/processors/gateway/web/events.gateway.ts
index f13a351f..74fc42df 100644
--- a/src/processors/gateway/web/events.gateway.ts
+++ b/src/processors/gateway/web/events.gateway.ts
@@ -36,7 +36,7 @@ export class WebEventsGateway
async sendOnlineNumber() {
return {
- online: this.currentClientCount,
+ online: await this.getcurrentClientCount(),
timestamp: new Date().toISOString(),
}
}
@@ -55,10 +55,10 @@ export class WebEventsGateway
})
}
- get currentClientCount() {
+ async getcurrentClientCount() {
const server = this.namespace.server
- const clientCount = server.of('/web').adapter.rooms.size
- return clientCount
+ const sockets = await server.of('/web').adapter.sockets(new Set())
+ return sockets.size
}
async handleConnection(socket: SocketIO.Socket) {
this.broadcast(EventTypes.VISITOR_ONLINE, await this.sendOnlineNumber())
@@ -74,7 +74,7 @@ export class WebEventsGateway
)) || 0
await redisClient.set(
getRedisKey(RedisKeys.MaxOnlineCount, dateFormat),
- Math.max(maxOnlineCount, this.currentClientCount),
+ Math.max(maxOnlineCount, await this.getcurrentClientCount()),
)
const key = getRedisKey(RedisKeys.MaxOnlineCount, dateFormat) + '_total'
const totalCount = +(await redisClient.get(key)) || 0
diff --git a/src/processors/helper/helper.cron.service.ts b/src/processors/helper/helper.cron.service.ts
index 32681b1b..5112a494 100644
--- a/src/processors/helper/helper.cron.service.ts
+++ b/src/processors/helper/helper.cron.service.ts
@@ -316,8 +316,4 @@ export class CronService {
throw err
}
}
-
- private get nowStr() {
- return dayjs().format('YYYY-MM-DD-HH:mm:ss')
- }
}
diff --git a/src/processors/logger/logger.service.ts b/src/processors/logger/logger.service.ts
index 6eb63964..656cac61 100644
--- a/src/processors/logger/logger.service.ts
+++ b/src/processors/logger/logger.service.ts
@@ -1,5 +1,6 @@
/* eslint-disable prefer-rest-params */
import { ConsoleLogger, ConsoleLoggerOptions } from '@nestjs/common'
+import cluster from 'cluster'
import { performance } from 'perf_hooks'
export class MyLogger extends ConsoleLogger {
@@ -68,16 +69,26 @@ export class MyLogger extends ConsoleLogger {
const print = consola[level]
const formatMessage = this.formatMessage(message, level)
const diff = this._updateAndGetTimestampDiff()
+
+ const workerPrefix = cluster.isWorker
+ ? chalk.hex('#fab1a0')(`*Worker - ${cluster.worker.id}*`)
+ : ''
if (context && !argv.length) {
- print(`[${chalk.yellow(context)}] `, formatMessage, diff)
+ print(`${workerPrefix} [${chalk.yellow(context)}] `, formatMessage, diff)
} else if (!argv.length) {
- print(this.defaultContextPrefix, formatMessage, diff)
+ print(`${workerPrefix} ` + this.defaultContextPrefix, formatMessage, diff)
} else {
- print(this.defaultContextPrefix, message, context, ...argv, diff)
+ print(
+ `${workerPrefix} ` + this.defaultContextPrefix,
+ message,
+ context,
+ ...argv,
+ diff,
+ )
}
}
private defaultContextPrefix = this.context
? `[${chalk.yellow(this.context)}] `
- : `[${chalk.hex('#fd79a8')('MixSpaceServer')}] `
+ : `[${chalk.hex('#fd79a8')('MServer')}] `
}