feat: taskqueue

This commit is contained in:
Innei
2021-09-19 17:17:18 +08:00
parent 1e9ad69422
commit 2dea189ac5
10 changed files with 104 additions and 26 deletions

View File

@@ -1,4 +1,4 @@
import { Controller, Delete, Get, HttpCode, Query } from '@nestjs/common'
import { Controller, Delete, Get, HttpCode, Query, Scope } from '@nestjs/common'
import dayjs from 'dayjs'
import { Auth } from '~/common/decorator/auth.decorator'
import { Paginator } from '~/common/decorator/http.decorator'
@@ -11,7 +11,7 @@ import { getTodayEarly, getWeekStart } from '~/utils/time.util'
import { AnalyzeDto } from './analyze.dto'
import { AnalyzeService } from './analyze.service'
@Controller('analyze')
@Controller({ path: 'analyze', scope: Scope.REQUEST })
@ApiName
@Auth()
export class AnalyzeController {

View File

@@ -3,6 +3,7 @@ import {
Injectable,
InternalServerErrorException,
Logger,
Scope,
} from '@nestjs/common'
import {
existsSync,
@@ -22,13 +23,14 @@ import { AdminEventsGateway } from '~/processors/gateway/admin/events.gateway'
import { EventTypes } from '~/processors/gateway/events.types'
import { getFolderSize } from '~/utils/system.util'
@Injectable()
@Injectable({ scope: Scope.REQUEST })
export class BackupService {
private logger: Logger
constructor(private readonly adminGateway: AdminEventsGateway) {
this.logger = new Logger(BackupService.name)
}
async list() {
const backupPath = BACKUP_DIR
if (!existsSync(backupPath)) {

View File

@@ -19,7 +19,7 @@ const generateDefaultConfig: () => IConfig = () => ({
description: 'Hello World~',
},
url: {
wsUrl: 'http://localhost:8080', //todo
wsUrl: 'http://localhost:2333', //todo
adminUrl: 'http://localhost:9528',
serverUrl: 'http://localhost:2333',
webUrl: 'http://localhost:2323',

View File

@@ -18,6 +18,7 @@ import { ApiName } from '~/common/decorator/openapi.decorator'
import { CRON_DESCRIPTION } from '~/constants/meta.constant'
import { SCHEDULE_CRON_OPTIONS } from '~/constants/system.constant'
import { CronService } from '~/processors/helper/helper.cron.service'
import { TaskQueueService } from '~/processors/helper/helper.tq.service'
import { PM2QueryDto } from './health.dto'
@Controller({
@@ -31,6 +32,7 @@ export class HealthController {
private schedulerRegistry: SchedulerRegistry,
private readonly cronService: CronService,
private readonly reflector: Reflector,
private readonly taskQueue: TaskQueueService,
) {}
@Get('/cron')
@@ -71,9 +73,22 @@ export class HealthController {
if (!hasMethod) {
throw new BadRequestException(`${name} is not a cron`)
}
process.nextTick(async () => {
await this.cronService[name]()
})
this.taskQueue.add(name, async () =>
this.cronService[name].call(this.cronService),
)
}
@Get('/cron/task/:name')
async getCronTaskStatus(@Param('name') name: string) {
if (!isString(name)) {
throw new BadRequestException('name must be string')
}
const task = this.taskQueue.get(name)
if (!task) {
throw new BadRequestException(`${name} is not a cron in task queue`)
}
return task
}
@Get('/log/list/pm2')

View File

@@ -1,9 +1,10 @@
import { Controller, Get } from '@nestjs/common'
import { Controller, Get, Scope } from '@nestjs/common'
import { ApiName } from '~/common/decorator/openapi.decorator'
import { UserService } from '../user/user.service'
@Controller({
path: '/init',
scope: Scope.REQUEST,
})
@ApiName
export class InitController {

View File

@@ -29,7 +29,7 @@ export class PageProxyController {
if (!adminExtra.enableAdminProxy) {
return '<h1>Admin Proxy is disabled</h1>'
}
const indexEntryUrl = `https://cdn.jsdelivr.net/gh/mx-space/admin-next@gh-pages/index.html`
const indexEntryUrl = `https://raw.githubusercontent.com/mx-space/admin-next/gh-pages/index.html`
let entry = await (await fetch(indexEntryUrl)).text()
entry = entry.replace(
`<!-- injectable script -->`,

View File

@@ -5,12 +5,6 @@ import { NoteModel } from '~/modules/note/note.model'
import { PageModel } from '~/modules/page/page.model'
import { PostModel } from '~/modules/post/post.model'
declare enum ModelRefTypes {
Post,
Note,
Page,
}
@Injectable()
export class DatabaseService {
constructor(

View File

@@ -22,7 +22,6 @@ import { ConfigsService } from '~/modules/configs/configs.service'
import { NoteService } from '~/modules/note/note.service'
import { PageService } from '~/modules/page/page.service'
import { PostService } from '~/modules/post/post.service'
import { isDev } from '~/utils/index.util'
import { getRedisKey } from '~/utils/redis.util'
import { CacheService } from '../cache/cache.service'
import { HttpService } from './helper.http.service'
@@ -69,8 +68,9 @@ export class CronService {
})
return json
} catch {
} catch (err) {
this.logger.warn('更新 Bot 列表错误')
throw err
}
}
@@ -93,14 +93,11 @@ export class CronService {
this.logger.log('--> 备份成功')
} catch (e) {
if (isDev) {
console.log(e)
}
this.logger.error(
'--> 备份失败, 请确保已安装 zip 或 mongo-tools, mongo-tools 的版本需要与 mongod 版本一致, ' +
e.message,
)
return
throw e
}
// 开始上传 COS
@@ -142,6 +139,7 @@ export class CronService {
this.logger.log('--> 上传成功')
} else {
this.logger.error('--> 上传失败了')
throw err
}
},
)
@@ -184,7 +182,7 @@ export class CronService {
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT, {
name: 'resetLikedOrReadArticleRecord',
})
@CronDescription('清理喜欢数成功')
@CronDescription('清理喜欢数')
async resetLikedOrReadArticleRecord() {
const redis = this.cacheService.getClient()
@@ -211,14 +209,17 @@ export class CronService {
@Cron(CronExpression.EVERY_DAY_AT_3AM, { name: 'pushToBaiduSearch' })
@CronDescription('推送到百度搜索')
async pushToBaiduSearch() {
const configs = this.configs.get('baiduSearchOptions')
const {
url: { webUrl },
baiduSearchOptions: configs,
} = await this.configs.waitForConfigReady()
if (configs.enable) {
const token = configs.token
if (!token) {
this.logger.error('[BaiduSearchPushTask] token 为空')
return
}
const siteUrl = this.configs.get('url').webUrl
const pushUrls = await this.aggregateService.getSiteMapContent()
const urls = pushUrls
@@ -229,7 +230,7 @@ export class CronService {
try {
const res = await this.http.axiosRef.post(
`http://data.zz.baidu.com/urls?site=${siteUrl}&token=${token}`,
`http://data.zz.baidu.com/urls?site=${webUrl}&token=${token}`,
urls,
{
headers: {
@@ -241,6 +242,7 @@ export class CronService {
return res.data
} catch (e) {
this.logger.error('百度推送错误: ' + e.message)
throw e
}
}
return null
@@ -321,8 +323,9 @@ export class CronService {
autoGenerateObjectIDIfNotExist: false,
})
this.logger.log('--> 推送到 algoliasearch 成功')
} catch {
} catch (err) {
Logger.error('algolia推送错误', 'AlgoliaSearch')
throw err
}
}
}

View File

@@ -9,6 +9,7 @@ import { CronService } from './helper.cron.service'
import { EmailService } from './helper.email.service'
import { HttpService } from './helper.http.service'
import { ImageService } from './helper.image.service'
import { TaskQueueService } from './helper.tq.service'
import { UploadService } from './helper.upload.service'
import { AssetService } from './hepler.asset.service'
@@ -20,6 +21,7 @@ const providers: Provider<any>[] = [
CountingService,
UploadService,
AssetService,
TaskQueueService,
]
@Module({

View File

@@ -0,0 +1,61 @@
import { Injectable } from '@nestjs/common'
import { isAsyncFunction } from 'util/types'
type ITask = Map<
string,
{
status: 'pending' | 'fulfill' | 'reject'
updatedAt: Date
message?: string
}
>
@Injectable()
export class TaskQueueService {
tasks: ITask
constructor() {
this.tasks = new Map()
}
add(name: string, task: () => Promise<any>) {
this.tasks.set(name, { status: 'pending', updatedAt: new Date() })
if (isAsyncFunction(task)) {
task()
.then(() => {
this.tasks.set(name, { status: 'fulfill', updatedAt: new Date() })
})
.catch((err) => {
console.debug(err)
this.tasks.set(name, {
status: 'reject',
updatedAt: new Date(),
message: err.message,
})
})
} else {
try {
task()
this.tasks.set(name, { status: 'fulfill', updatedAt: new Date() })
} catch (err) {
console.debug(err)
this.tasks.set(name, {
status: 'reject',
updatedAt: new Date(),
message: err.message,
})
}
}
}
get(name: string) {
const task = this.tasks.get(name)
return !task ? null : { ...task }
}
get length() {
return this.tasks.size
}
}