fix: admin gateway singleton

This commit is contained in:
Innei
2022-02-08 23:24:16 +08:00
parent a001921bd7
commit 8f73e61c4f
15 changed files with 180 additions and 70 deletions

View File

@@ -14,6 +14,10 @@
<li>
<a href="./socket-admin.html"> socket-admin </a>
</li>
<li>
<a href="./socket-test.html"> socket-test </a>
</li>
</ul>
</body>
</html>

View File

@@ -132,6 +132,7 @@
"@nestjs/testing": "8.2.6",
"@types/bcrypt": "5.0.0",
"@types/cache-manager": "3.4.2",
"@types/cron": "1.7.3",
"@types/ejs": "3.1.0",
"@types/html-minifier": "4.0.2",
"@types/ioredis": "4.28.7",
@@ -145,6 +146,7 @@
"@types/passport-jwt": "3.0.6",
"@types/ua-parser-js": "0.7.36",
"@vercel/ncc": "0.33.1",
"cron": "*",
"cross-env": "7.0.3",
"eslint": "*",
"husky": "7.0.4",

25
pnpm-lock.yaml generated
View File

@@ -29,6 +29,7 @@ specifiers:
'@typegoose/typegoose': 9.6.0
'@types/bcrypt': 5.0.0
'@types/cache-manager': 3.4.2
'@types/cron': 1.7.3
'@types/ejs': 3.1.0
'@types/html-minifier': 4.0.2
'@types/ioredis': 4.28.7
@@ -54,6 +55,7 @@ specifiers:
class-validator: 0.13.2
consola: '*'
cos-nodejs-sdk-v5: 2.11.6
cron: '*'
cross-env: 7.0.3
dayjs: 1.10.7
ejs: 3.1.6
@@ -182,6 +184,7 @@ devDependencies:
'@nestjs/testing': 8.2.6_732a54a2558f64827b8cc4f9baac4f1f
'@types/bcrypt': 5.0.0
'@types/cache-manager': 3.4.2
'@types/cron': 1.7.3
'@types/ejs': 3.1.0
'@types/html-minifier': 4.0.2
'@types/ioredis': 4.28.7
@@ -195,6 +198,7 @@ devDependencies:
'@types/passport-jwt': 3.0.6
'@types/ua-parser-js': 0.7.36
'@vercel/ncc': 0.33.1
cron: 1.7.2
cross-env: 7.0.3
eslint: 8.8.0
husky: 7.0.4
@@ -2151,6 +2155,13 @@ packages:
/@types/cors/2.8.12:
resolution: {integrity: sha512-vt+kDhq/M2ayberEtJcIN/hxXy1Pk+59g2FV/ZQceeaTyCtCucjL2Q7FXlFjtWn4n15KCr1NE2lNNFhp0lEThw==}
/@types/cron/1.7.3:
resolution: {integrity: sha512-iPmUXyIJG1Js+ldPYhOQcYU3kCAQ2FWrSkm1FJPoii2eYSn6wEW6onPukNTT0bfiflexNSRPl6KWmAIqS+36YA==}
dependencies:
'@types/node': 16.11.22
moment: 2.29.1
dev: true
/@types/ejs/3.1.0:
resolution: {integrity: sha512-DCg+Ka+uDQ31lJ/UtEXVlaeV3d6t81gifaVWKJy4MYVVgvJttyX/viREy+If7fz+tK/gVxTGMtyrFPnm4gjrVA==}
dev: true
@@ -3873,7 +3884,6 @@ packages:
resolution: {integrity: sha512-+SaJ2OfeRvfQqwXQ2kgr0Y5pzBR/lijf5OpnnaruwWnmI799JfWr2jN2ItOV9s3A/+TFOt6mxvKzQq5F0Jp6VQ==}
dependencies:
moment-timezone: 0.5.33
dev: false
/cross-env/7.0.3:
resolution: {integrity: sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==}
@@ -6750,11 +6760,9 @@ packages:
resolution: {integrity: sha512-PTc2vcT8K9J5/9rDEPe5czSIKgLoGsH8UNpA4qZTVw0Vd/Uz19geE9abbIOQKaAQFcnQ3v5YEXrbSc5BpshH+w==}
dependencies:
moment: 2.29.1
dev: false
/moment/2.29.1:
resolution: {integrity: sha512-kHmoybcPV8Sqy59DwNDY3Jefr64lK/by/da0ViFcuA4DH0vQg5Q6Ze5VimxkfQNSC+Mls/Kx53s7TjP1RhFEDQ==}
dev: false
/mongodb-connection-string-url/2.4.2:
resolution: {integrity: sha512-mZUXF6nUzRWk5J3h41MsPv13ukWlH4jOMSk6astVeoZ1EbdTJyF5I3wxKkvqBAOoVtzLgyEYUvDjrGdcPlKjAw==}
@@ -7597,7 +7605,7 @@ packages:
resolution: {integrity: sha1-hSBLVNuoLVdC4oyWdW70OvUOM4Q=}
engines: {node: '>= 0.10'}
dependencies:
resolve: 1.22.0
resolve: 1.21.0
dev: true
/redis-commands/1.7.0:
@@ -7718,6 +7726,15 @@ packages:
engines: {node: '>=10'}
dev: true
/resolve/1.21.0:
resolution: {integrity: sha512-3wCbTpk5WJlyE4mSOtDLhqQmGFi0/TD9VPwmiolnk8U0wRgMEktqCXd3vy5buTO3tljvalNvKrjHEfrd2WpEKA==}
hasBin: true
dependencies:
is-core-module: 2.8.1
path-parse: 1.0.7
supports-preserve-symlinks-flag: 1.0.0
dev: true
/resolve/1.22.0:
resolution: {integrity: sha512-Hhtrw0nLeSrFQ7phPp4OOcVjLPIeMnRlr5mcnVuMe7M/7eBn98A3hmFRLoFo3DLZkivSYwhRUJTyPyWAk56WLw==}
hasBin: true

View File

@@ -1,4 +1,5 @@
const isTEST = !!process.env.TEST
const isDev = process.env.NODE_ENV === 'development'
import cluster from 'cluster'
import { argv } from 'zx'

View File

@@ -17,7 +17,7 @@ import { ResponseInterceptor } from './common/interceptors/response.interceptor'
import { AttachHeaderTokenMiddleware } from './common/middlewares/attach-auth.middleware'
import {
DATA_DIR,
LOGGER_DIR,
LOG_DIR,
TEMP_DIR,
USER_ASSET_DIR,
} from './constants/path.constant'
@@ -58,8 +58,8 @@ function mkdirs() {
Logger.log(chalk.blue('数据目录已经建好: ' + DATA_DIR))
mkdirSync(TEMP_DIR, { recursive: true })
Logger.log(chalk.blue('临时目录已经建好: ' + TEMP_DIR))
mkdirSync(LOGGER_DIR, { recursive: true })
Logger.log(chalk.blue('日志目录已经建好: ' + LOGGER_DIR))
mkdirSync(LOG_DIR, { recursive: true })
Logger.log(chalk.blue('日志目录已经建好: ' + LOG_DIR))
mkdirSync(USER_ASSET_DIR, { recursive: true })
Logger.log(chalk.blue('资源目录已经建好: ' + USER_ASSET_DIR))
}

View File

@@ -13,7 +13,7 @@ import { FastifyReply, FastifyRequest } from 'fastify'
import { WriteStream } from 'fs'
import { resolve } from 'path'
import { HTTP_REQUEST_TIME } from '~/constants/meta.constant'
import { LOGGER_DIR } from '~/constants/path.constant'
import { LOG_DIR } from '~/constants/path.constant'
import { REFLECTOR } from '~/constants/system.constant'
import { isDev } from '~/utils'
import { getIp } from '../../utils/ip.util'
@@ -53,7 +53,7 @@ export class AllExceptionsFilter implements ExceptionFilter {
if (!isDev) {
this.errorLogPipe =
this.errorLogPipe ??
fs.createWriteStream(resolve(LOGGER_DIR, 'error.log'), {
fs.createWriteStream(resolve(LOG_DIR, 'error.log'), {
flags: 'a+',
encoding: 'utf-8',
})

View File

@@ -11,7 +11,7 @@ export const DATA_DIR = isDev
: join(HOME, '.mx-space')
export const USER_ASSET_DIR = join(DATA_DIR, 'assets')
export const LOGGER_DIR = join(DATA_DIR, 'log')
export const LOG_DIR = join(DATA_DIR, 'log')
export const LOCAL_BOT_LIST_DATA_FILE_PATH = join(DATA_DIR, 'bot_list.json')

View File

@@ -4,7 +4,6 @@ import { PassportModule } from '@nestjs/passport'
import cluster from 'cluster'
import { machineIdSync } from 'node-machine-id'
import { CLUSTER, SECURITY } from '~/app.config'
import { AdminEventsGateway } from '../../processors/gateway/admin/events.gateway'
import { AuthController } from './auth.controller'
import { AuthService } from './auth.service'
import { JwtStrategy } from './jwt.strategy'
@@ -38,7 +37,7 @@ const jwtModule = JwtModule.registerAsync({
})
@Module({
imports: [PassportModule, jwtModule],
providers: [AuthService, JwtStrategy, AdminEventsGateway],
providers: [AuthService, JwtStrategy],
controllers: [AuthController],
exports: [JwtStrategy, AuthService, jwtModule],
})

View File

@@ -19,7 +19,6 @@ import { Readable } from 'stream'
import { Auth } from '~/common/decorator/auth.decorator'
import { HTTPDecorators } from '~/common/decorator/http.decorator'
import { ApiName } from '~/common/decorator/openapi.decorator'
import { CronService } from '~/processors/helper/helper.cron.service'
import { UploadService } from '~/processors/helper/helper.upload.service'
import { BackupService } from './backup.service'
@@ -30,7 +29,6 @@ export class BackupController {
constructor(
private readonly backupService: BackupService,
private readonly uploadService: UploadService,
private readonly cronService: CronService,
) {}
@Get('/new')

View File

@@ -1,7 +1,6 @@
import { CACHE_MANAGER, Inject, Injectable, Logger } from '@nestjs/common'
import { Cache } from 'cache-manager'
import { Redis } from 'ioredis'
import { redisSubPub } from '~/utils/redis-subpub.util'
// Cache 客户端管理器
@@ -44,14 +43,23 @@ export class CacheService {
return this.cache.set(key, value, options)
}
public publish(event: string, data: any) {
public async publish(event: string, data: any) {
const { redisSubPub } = await import('../../utils/redis-subpub.util')
return redisSubPub.publish(event, data)
}
public async subscribe(event: string, callback: (data: any) => void) {
const { redisSubPub } = await import('../../utils/redis-subpub.util')
return redisSubPub.subscribe(event, callback)
}
public async unsubscribe(event: string, callback: (data: any) => void) {
const { redisSubPub } = await import('../../utils/redis-subpub.util')
return redisSubPub.unsubscribe(event, callback)
}
public getClient() {
return this.redisClient
}

View File

@@ -1,4 +1,3 @@
import { Logger } from '@nestjs/common'
import { OnEvent } from '@nestjs/event-emitter'
import { JwtService } from '@nestjs/jwt'
import {
@@ -10,9 +9,12 @@ import {
WebSocketServer,
} from '@nestjs/websockets'
import { Emitter } from '@socket.io/redis-emitter'
import { resolve } from 'path'
import SocketIO, { Socket } from 'socket.io'
import { EventBusEvents } from '~/constants/event.constant'
import { LOG_DIR } from '~/constants/path.constant'
import { CacheService } from '~/processors/cache/cache.service'
import { getTodayLogFilePath } from '~/utils/consola.util'
import { AuthService } from '../../../modules/auth/auth.service'
import { BaseGateway } from '../base.gateway'
import { EventTypes } from '../events.types'
@@ -27,7 +29,6 @@ export class AdminEventsGateway
private readonly cacheService: CacheService,
) {
super()
this.bindStdOut()
}
tokenSocketIdMap = new Map<string, string>()
@@ -77,16 +78,41 @@ export class AdminEventsGateway
this.tokenSocketIdMap.set(token.toString(), sid)
}
subscribeSocketToHandlerMap = new Map<string, Function>()
@SubscribeMessage('log')
async subscribeStdOut(client: Socket) {
if (this.subscribeSocketToHandlerMap.has(client.id)) {
return
}
const queue = [] as Function[]
const handler = (data) => {
queue.push(() =>
client.send(this.gatewayMessageFormat(EventTypes.STDOUT, data)),
)
queue.shift()()
}
this.subscribeSocketToHandlerMap.set(client.id, handler)
this.cacheService.subscribe('log', handler)
fs.createReadStream(resolve(LOG_DIR, getTodayLogFilePath()), {
encoding: 'utf-8',
highWaterMark: 20,
}).on('data', handler)
}
@SubscribeMessage('unlog')
unsubscribeStdOut(client: Socket) {
const idx = this.subscribeStdOutClient.findIndex(
(client_) => client_ === client,
)
Logger.debug(chalk.yellow(client.id, idx))
if (~idx) {
this.subscribeStdOutClient.splice(idx, 1)
const cb = this.subscribeSocketToHandlerMap.get(client.id)
if (cb) {
this.cacheService.unsubscribe('log', cb as any)
}
this.subscribeSocketToHandlerMap.delete(client.id)
}
handleDisconnect(client: SocketIO.Socket) {
super.handleDisconnect(client)
this.unsubscribeStdOut(client)
@@ -106,46 +132,6 @@ export class AdminEventsGateway
return false
}
subscribeStdOutClient: Socket[] = []
@SubscribeMessage('log')
async subscribeStdOut(client: Socket) {
if (
this.subscribeStdOutClient.includes(client) ||
this.subscribeStdOutClient.some((client_) => client_.id === client.id)
) {
return
}
this.subscribeStdOutClient.push(client)
Logger.debug(
chalk.yellow(client.id, this.subscribeStdOutClient.length),
'SubscribeStdOut',
)
}
bindStdOut() {
const handler = (data: any) => {
this.subscribeStdOutClient.forEach((client) => {
client.send(this.gatewayMessageFormat(EventTypes.STDOUT, data))
})
}
const stream = {
stdout: process.stdout.write,
stderr: process.stderr.write,
}
process.stdout.write = function (...rest: any[]) {
handler(rest[0])
return stream.stdout.apply(this, rest)
}
process.stderr.write = function (...rest: any[]) {
handler(rest[0])
return stream.stderr.apply(this, rest)
}
}
broadcast(event: EventTypes, data: any) {
const client = new Emitter(this.cacheService.getClient())
client.of('/admin').emit('message', this.gatewayMessageFormat(event, data))

View File

@@ -4,14 +4,16 @@ import { Cron, CronExpression } from '@nestjs/schedule'
import COS from 'cos-nodejs-sdk-v5'
import dayjs from 'dayjs'
import { existsSync } from 'fs'
import { rm, writeFile } from 'fs/promises'
import { readdir, rm, writeFile } from 'fs/promises'
import mkdirp from 'mkdirp'
import { InjectModel } from 'nestjs-typegoose'
import { join } from 'path'
import { CronDescription } from '~/common/decorator/cron-description.decorator'
import { RedisKeys } from '~/constants/cache.constant'
import { EventBusEvents } from '~/constants/event.constant'
import {
LOCAL_BOT_LIST_DATA_FILE_PATH,
LOG_DIR,
TEMP_DIR,
} from '~/constants/path.constant'
import { AggregateService } from '~/modules/aggregate/aggregate.service'
@@ -189,6 +191,26 @@ export class CronService {
this.logger.log('--> 清理临时文件成功')
}
@Cron(CronExpression.EVERY_WEEKEND, { name: 'cleanTempDirectory' })
@CronDescription('清理日志文件')
async cleanLogFile() {
await rm(LOG_DIR, { recursive: true })
mkdirp.sync(LOG_DIR)
const files = (await readdir(LOG_DIR)).filter(
(file) => file !== 'error.log',
)
for (const file of files) {
const filePath = join(LOG_DIR, file)
const state = fs.statSync(filePath)
const oldThanWeek = dayjs().diff(state.mtime, 'day') > 7
if (oldThanWeek) {
fs.rm(filePath)
}
}
this.logger.log('--> 清理日志文件成功')
}
@Cron(CronExpression.EVERY_DAY_AT_3AM, { name: 'pushToBaiduSearch' })
@CronDescription('推送到百度搜索')
async pushToBaiduSearch() {

View File

@@ -1,8 +1,47 @@
import consola_, { FancyReporter, LogLevel } from 'consola'
import { CronExpression } from '@nestjs/schedule'
import consola_, {
ConsolaReporterArgs,
ConsolaReporterLogObject,
FancyReporter,
LogLevel,
} from 'consola'
import { CronJob } from 'cron'
import dayjs from 'dayjs'
import { createWriteStream, WriteStream } from 'fs'
import { resolve } from 'path'
import { argv } from 'zx'
import { isDev } from './tool.util'
import { LOG_DIR } from '~/constants/path.constant'
import { isDev, isTest } from './tool.util'
export const getTodayLogFilePath = () =>
resolve(LOG_DIR, 'stdout_' + dayjs().format('YYYY-MM-DD') + '.log')
class DateTimeReporter extends FancyReporter {
private fs: WriteStream
private job: CronJob
constructor() {
super()
this.fs = createWriteStream(getTodayLogFilePath(), {
encoding: 'utf-8',
flags: 'a+',
})
this.fs.write(
'\n========================================================\n',
)
this.job = new CronJob(CronExpression.EVERY_DAY_AT_MIDNIGHT, () => {
this.fs.close()
this.fs = createWriteStream(getTodayLogFilePath(), {
encoding: 'utf-8',
flags: 'a+',
})
this.fs.write(
'\n========================================================\n',
)
})
}
formatDate(date: Date) {
return date.toLocaleString(undefined, {
hour12: false,
@@ -10,6 +49,22 @@ class DateTimeReporter extends FancyReporter {
dateStyle: 'short',
})
}
public log(logObj: ConsolaReporterLogObject, args: ConsolaReporterArgs) {
super.log(logObj, args)
if (!isTest) {
import('./redis-subpub.util').then(({ redisSubPub }) => {
// HACK: consola not officially interface of formatLogObj method
const formatOutput =
// @ts-expect-error
super.formatLogObj(logObj, { width: args.columns || 0 }) + '\n'
if (this.fs) {
this.fs.write(formatOutput)
}
redisSubPub.publish('log', formatOutput)
})
}
}
}
const consola = consola_.create({
reporters: [new DateTimeReporter()],

View File

@@ -23,13 +23,31 @@ class RedisSubPub {
await this.pubClient.publish(channel, JSON.stringify(data))
}
ctc = new Map<Function, Callback>()
public async subscribe(event: string, callback: (data: any) => void) {
const channel = this.channelPrefix + event
this.subClient.subscribe(channel)
this.subClient.on('message', (channel, message) => {
const cb = (channel, message) => {
callback(JSON.parse(message))
})
}
this.ctc.set(callback, cb)
this.subClient.on('message', cb)
}
public async unsubscribe(event: string, callback: (data: any) => void) {
const channel = this.channelPrefix + event
this.subClient.unsubscribe(channel)
const cb = this.ctc.get(callback)
if (cb) {
this.subClient.off('message', cb)
this.ctc.delete(callback)
}
}
}
export const redisSubPub = new RedisSubPub()
type Callback = (channel: string, message: string) => void

View File

@@ -30,7 +30,7 @@
}
},
"include": [
"src/*",
"src/**/*",
"*.d.ts"
],
"exclude": [