From 0593a0d0d0710b799d34d479e346bfc9ac02152f Mon Sep 17 00:00:00 2001 From: Innei Date: Tue, 18 Jan 2022 19:02:47 +0800 Subject: [PATCH] refactor: use redis subpib instead of cluster post --- pm2.dev.config.js | 6 ++- src/common/adapters/socket.adapter.ts | 7 ++-- src/modules/configs/configs.service.ts | 38 ++++++++----------- src/processors/cache/cache.service.ts | 9 +++++ src/processors/helper/helper.email.service.ts | 16 ++------ src/utils/cluster.util.ts | 16 -------- src/utils/consola.util.ts | 4 +- src/utils/index.ts | 2 +- src/utils/redis-subpub.ts | 26 +++++++++++++ 9 files changed, 65 insertions(+), 59 deletions(-) delete mode 100644 src/utils/cluster.util.ts create mode 100644 src/utils/redis-subpub.ts diff --git a/pm2.dev.config.js b/pm2.dev.config.js index 22c8261e..c6374d14 100644 --- a/pm2.dev.config.js +++ b/pm2.dev.config.js @@ -4,11 +4,13 @@ module.exports = { name: 'mx-server', script: 'dist/src/main.js', autorestart: true, - exec_mode: 'fork', + exec_mode: 'cluster', + instances: 2, watch: false, + max_memory_restart: '230M', env: { - NODE_ENV: 'production', + NODE_ENV: 'development', }, args: '--allowed_origins=dev.* --cluster', diff --git a/src/common/adapters/socket.adapter.ts b/src/common/adapters/socket.adapter.ts index 3874219c..a401e4f5 100644 --- a/src/common/adapters/socket.adapter.ts +++ b/src/common/adapters/socket.adapter.ts @@ -1,13 +1,12 @@ import { IoAdapter } from '@nestjs/platform-socket.io' import { createAdapter } from '@socket.io/redis-adapter' -import IORedis from 'ioredis' -import { REDIS } from '~/app.config' +import { redisSubPub } from '~/utils' export class RedisIoAdapter extends IoAdapter { createIOServer(port: number, options?: any) { const server = super.createIOServer(port, options) - const pubClient = new IORedis({ host: REDIS.host, port: REDIS.port }) - const subClient = pubClient.duplicate() + + const { pubClient, subClient } = redisSubPub const redisAdapter = createAdapter(pubClient, subClient) server.adapter(redisAdapter) diff --git a/src/modules/configs/configs.service.ts b/src/modules/configs/configs.service.ts index 4801c427..66d73343 100644 --- a/src/modules/configs/configs.service.ts +++ b/src/modules/configs/configs.service.ts @@ -18,7 +18,7 @@ import { API_VERSION } from '~/app.config' import { RedisKeys } from '~/constants/cache.constant' import { EventBusEvents } from '~/constants/event.constant' import { CacheService } from '~/processors/cache/cache.service' -import { sleep, workerEmit } from '~/utils' +import { sleep } from '~/utils' import { getRedisKey } from '~/utils/redis.util' import * as optionDtos from '../configs/configs.dto' import { UserModel } from '../user/user.model' @@ -89,30 +89,22 @@ export class ConfigsService { await redis.set(getRedisKey(RedisKeys.ConfigCache), JSON.stringify(config)) } - public waitForConfigReady() { - // eslint-disable-next-line no-async-promise-executor - return new Promise>(async (r, j) => { - // 开始等待, 后续调用直接返回 + public async waitForConfigReady() { + if (this.configInitd) { + return await this.getConfig() + } + + const maxCount = 10 + let curCount = 0 + do { if (this.configInitd) { - r(await this.getConfig()) - return + return await this.getConfig() } + await sleep(100) + curCount += 1 + } while (curCount < maxCount) - const maxCount = 10 - let curCount = 0 - do { - if (this.configInitd) { - r(await this.getConfig()) - return - } - await sleep(100) - curCount += 1 - } while (curCount < maxCount) - - j(`重试 ${curCount} 次获取配置失败, 请检查数据库连接`) - - return - }) + throw `重试 ${curCount} 次获取配置失败, 请检查数据库连接` } public get defaultConfig() { @@ -205,7 +197,7 @@ export class ConfigsService { if (cluster.isPrimary) { this.eventEmitter.emit(EventBusEvents.EmailInit) } else { - workerEmit(EventBusEvents.EmailInit) + this.redis.publish(EventBusEvents.EmailInit, '') } } diff --git a/src/processors/cache/cache.service.ts b/src/processors/cache/cache.service.ts index 9cd05c67..a70deb30 100755 --- a/src/processors/cache/cache.service.ts +++ b/src/processors/cache/cache.service.ts @@ -1,6 +1,7 @@ import { CACHE_MANAGER, Inject, Injectable, Logger } from '@nestjs/common' import { Cache } from 'cache-manager' import { Redis } from 'ioredis' +import { redisSubPub } from '~/utils' // Cache 客户端管理器 @@ -43,6 +44,14 @@ export class CacheService { return this.cache.set(key, value, options) } + public publish(event: string, data: any) { + return redisSubPub.publish(event, data) + } + + public async subscribe(event: string, callback: (data: any) => void) { + return redisSubPub.subscribe(event, callback) + } + public getClient() { return this.redisClient } diff --git a/src/processors/helper/helper.email.service.ts b/src/processors/helper/helper.email.service.ts index d763f943..abee4ff3 100644 --- a/src/processors/helper/helper.email.service.ts +++ b/src/processors/helper/helper.email.service.ts @@ -6,6 +6,7 @@ import { createTransport } from 'nodemailer' import { EventBusEvents } from '~/constants/event.constant' import { ConfigsService } from '~/modules/configs/configs.service' import { LinkModel } from '~/modules/link/link.model' +import { CacheService } from '../cache/cache.service' import { AssetService } from './helper.asset.service' export enum ReplyMailType { @@ -25,23 +26,14 @@ export class EmailService { constructor( private readonly configsService: ConfigsService, private readonly assetService: AssetService, + private readonly cacheService: CacheService, ) { this.init() this.logger = new Logger(EmailService.name) if (cluster.isWorker) { - // listen email option change in node cluster - process.on('message', (message: any) => { - const { event } = message - - switch (event) { - case EventBusEvents.EmailInit: - this.init() - break - - default: - break - } + cacheService.subscribe(EventBusEvents.EmailInit, () => { + this.init() }) } } diff --git a/src/utils/cluster.util.ts b/src/utils/cluster.util.ts deleted file mode 100644 index 1278e326..00000000 --- a/src/utils/cluster.util.ts +++ /dev/null @@ -1,16 +0,0 @@ -/** - * 服务集群工具 - */ - -import cluster from 'cluster' -import { EventBusEvents } from '~/constants/event.constant' -/** - * Worker 事件 Emit - * @param event - * @param data - */ -export const workerEmit = (event: EventBusEvents, data?: any) => { - if (cluster.isWorker) { - process.send({ event, data, workerId: cluster.worker.id }) - } -} diff --git a/src/utils/consola.util.ts b/src/utils/consola.util.ts index 949cd150..278fe68a 100644 --- a/src/utils/consola.util.ts +++ b/src/utils/consola.util.ts @@ -11,8 +11,10 @@ class DateTimeReporter extends FancyReporter { }) } } -export const consola = consola_.create({ +const consola = consola_.create({ reporters: [new DateTimeReporter()], level: isDev || argv.verbose ? LogLevel.Trace : LogLevel.Info, }) +// HINT: must be called before any other log calls, export it in the end of your file consola.wrapAll() +export { consola } diff --git a/src/utils/index.ts b/src/utils/index.ts index d196fe35..2937e2d4 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -1,9 +1,9 @@ -export * from './cluster.util' export * from './crud.util' export * from './dayjs.util' export * from './ip.util' export * from './nest.util' export * from './query.util' +export * from './redis-subpub' export * from './redis.util' export * from './system.util' export * from './time.util' diff --git a/src/utils/redis-subpub.ts b/src/utils/redis-subpub.ts new file mode 100644 index 00000000..b1b2255d --- /dev/null +++ b/src/utils/redis-subpub.ts @@ -0,0 +1,26 @@ +import IORedis from 'ioredis' +import { REDIS } from '~/app.config' +class RedisSubPub { + public pubClient: IORedis.Redis + public subClient: IORedis.Redis + constructor(private channelPrefix: string = 'mx-channel#') { + const pubClient = new IORedis({ host: REDIS.host, port: REDIS.port }) + const subClient = pubClient.duplicate() + this.pubClient = pubClient + this.subClient = subClient + } + public async publish(event: string, data: any) { + const channel = this.channelPrefix + event + await this.pubClient.publish(channel, JSON.stringify(data)) + } + + public async subscribe(event: string, callback: (data: any) => void) { + const channel = this.channelPrefix + event + this.subClient.subscribe(channel) + this.subClient.on('message', (channel, message) => { + callback(JSON.parse(message)) + }) + } +} + +export const redisSubPub = new RedisSubPub()