From 833bc8a05404c131afc723a106188b9ac0dbc97b Mon Sep 17 00:00:00 2001 From: syuilo Date: Sat, 19 Feb 2022 17:15:49 +0900 Subject: [PATCH] wip --- packages/backend/package.json | 2 +- packages/backend/src/db/redis.ts | 27 +++++------ packages/backend/src/misc/antenna-cache.ts | 34 ++++++------- .../backend/src/server/api/service/discord.ts | 24 ++++++---- .../backend/src/server/api/service/github.ts | 24 ++++++---- .../backend/src/server/api/service/twitter.ts | 28 +++++------ .../backend/src/server/api/stream/index.ts | 35 +++++++++----- packages/backend/src/server/api/streaming.ts | 15 +----- packages/backend/yarn.lock | 48 ++++++++++++------- 9 files changed, 124 insertions(+), 113 deletions(-) diff --git a/packages/backend/package.json b/packages/backend/package.json index b9433f7f3..77039b29f 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -139,7 +139,7 @@ "random-seed": "0.3.0", "ratelimiter": "3.4.1", "re2": "1.17.3", - "redis": "3.1.2", + "redis": "4.0.3", "redis-lock": "0.1.4", "reflect-metadata": "0.1.13", "rename": "1.0.4", diff --git a/packages/backend/src/db/redis.ts b/packages/backend/src/db/redis.ts index 9fc2b6241..79b0fa47f 100644 --- a/packages/backend/src/db/redis.ts +++ b/packages/backend/src/db/redis.ts @@ -1,19 +1,18 @@ import * as redis from 'redis'; import config from '@/config/index'; -export function createConnection() { - return redis.createClient( - config.redis.port, - config.redis.host, - { - password: config.redis.pass, - prefix: config.redis.prefix, - db: config.redis.db || 0, - } - ); -} +export const redisClient = redis.createClient({ + socket: { + port: config.redis.port, + host: config.redis.host, + }, + password: config.redis.pass, + prefix: config.redis.prefix, + database: config.redis.db || 0, +}); -export const subsdcriber = createConnection(); -subsdcriber.subscribe(config.host); +redisClient.connect(); -export const redisClient = createConnection(); +export const redisSubscriber = redisClient.duplicate(); + +redisSubscriber.connect(); diff --git a/packages/backend/src/misc/antenna-cache.ts b/packages/backend/src/misc/antenna-cache.ts index a23eeb45e..ff12f8f41 100644 --- a/packages/backend/src/misc/antenna-cache.ts +++ b/packages/backend/src/misc/antenna-cache.ts @@ -1,6 +1,6 @@ import { Antennas } from '@/models/index'; import { Antenna } from '@/models/entities/antenna'; -import { subsdcriber } from '../db/redis'; +import { redisSubscriber } from '../db/redis'; let antennasFetched = false; let antennas: Antenna[] = []; @@ -14,23 +14,19 @@ export async function getAntennas() { return antennas; } -subsdcriber.on('message', async (_, data) => { - const obj = JSON.parse(data); - - if (obj.channel === 'internal') { - const { type, body } = obj.message; - switch (type) { - case 'antennaCreated': - antennas.push(body); - break; - case 'antennaUpdated': - antennas[antennas.findIndex(a => a.id === body.id)] = body; - break; - case 'antennaDeleted': - antennas = antennas.filter(a => a.id !== body.id); - break; - default: - break; - } +redisSubscriber.subscribe('internal', async (message) => { + const { type, body } = JSON.parse(message); + switch (type) { + case 'antennaCreated': + antennas.push(body); + break; + case 'antennaUpdated': + antennas[antennas.findIndex(a => a.id === body.id)] = body; + break; + case 'antennaDeleted': + antennas = antennas.filter(a => a.id !== body.id); + break; + default: + break; } }); diff --git a/packages/backend/src/server/api/service/discord.ts b/packages/backend/src/server/api/service/discord.ts index dd731c422..0c5ed4310 100644 --- a/packages/backend/src/server/api/service/discord.ts +++ b/packages/backend/src/server/api/service/discord.ts @@ -144,11 +144,13 @@ router.get('/dc/cb', async ctx => { return; } - const { redirect_uri, state } = await new Promise((res, rej) => { - redisClient.get(sessid, async (_, state) => { - res(JSON.parse(state)); - }); - }); + const session = await redisClient.get(sessid); + if (session == null) { + ctx.throw(400, 'invalid session'); + return; + } + + const { redirect_uri, state } = JSON.parse(session); if (ctx.query.state !== state) { ctx.throw(400, 'invalid session'); @@ -215,11 +217,13 @@ router.get('/dc/cb', async ctx => { return; } - const { redirect_uri, state } = await new Promise((res, rej) => { - redisClient.get(userToken, async (_, state) => { - res(JSON.parse(state)); - }); - }); + const session = await redisClient.get(userToken); + if (session == null) { + ctx.throw(400, 'invalid session'); + return; + } + + const { redirect_uri, state } = JSON.parse(session); if (ctx.query.state !== state) { ctx.throw(400, 'invalid session'); diff --git a/packages/backend/src/server/api/service/github.ts b/packages/backend/src/server/api/service/github.ts index b23219986..cc7eac534 100644 --- a/packages/backend/src/server/api/service/github.ts +++ b/packages/backend/src/server/api/service/github.ts @@ -142,11 +142,13 @@ router.get('/gh/cb', async ctx => { return; } - const { redirect_uri, state } = await new Promise((res, rej) => { - redisClient.get(sessid, async (_, state) => { - res(JSON.parse(state)); - }); - }); + const session = await redisClient.get(sessid); + if (session == null) { + ctx.throw(400, 'invalid session'); + return; + } + + const { redirect_uri, state } = JSON.parse(session); if (ctx.query.state !== state) { ctx.throw(400, 'invalid session'); @@ -193,11 +195,13 @@ router.get('/gh/cb', async ctx => { return; } - const { redirect_uri, state } = await new Promise((res, rej) => { - redisClient.get(userToken, async (_, state) => { - res(JSON.parse(state)); - }); - }); + const session = await redisClient.get(userToken); + if (session == null) { + ctx.throw(400, 'invalid session'); + return; + } + + const { redirect_uri, state } = JSON.parse(session); if (ctx.query.state !== state) { ctx.throw(400, 'invalid session'); diff --git a/packages/backend/src/server/api/service/twitter.ts b/packages/backend/src/server/api/service/twitter.ts index bca00b792..90708212f 100644 --- a/packages/backend/src/server/api/service/twitter.ts +++ b/packages/backend/src/server/api/service/twitter.ts @@ -123,13 +123,13 @@ router.get('/tw/cb', async ctx => { return; } - const get = new Promise((res, rej) => { - redisClient.get(sessid, async (_, twCtx) => { - res(twCtx); - }); - }); - - const twCtx = await get; + const session = await redisClient.get(sessid); + if (session == null) { + ctx.throw(400, 'invalid session'); + return; + } + + const twCtx = JSON.parse(session); const result = await twAuth!.done(JSON.parse(twCtx), ctx.query.oauth_verifier); @@ -152,13 +152,13 @@ router.get('/tw/cb', async ctx => { return; } - const get = new Promise((res, rej) => { - redisClient.get(userToken, async (_, twCtx) => { - res(twCtx); - }); - }); - - const twCtx = await get; + const session = await redisClient.get(userToken); + if (session == null) { + ctx.throw(400, 'invalid session'); + return; + } + + const twCtx = JSON.parse(session); const result = await twAuth!.done(JSON.parse(twCtx), verifier); diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts index e0bb8033a..fb578e7e2 100644 --- a/packages/backend/src/server/api/stream/index.ts +++ b/packages/backend/src/server/api/stream/index.ts @@ -5,7 +5,6 @@ import call from '../call'; import readNote from '@/services/note/read'; import Channel from './channel'; import channels from './channels/index'; -import { EventEmitter } from 'events'; import { User } from '@/models/entities/user'; import { Channel as ChannelModel } from '@/models/entities/channel'; import { Users, Followings, Mutings, UserProfiles, ChannelFollowings, Blockings } from '@/models/index'; @@ -16,6 +15,7 @@ import { publishChannelStream, publishGroupMessagingStream, publishMessagingStre import { UserGroup } from '@/models/entities/user-group'; import { StreamEventEmitter, StreamMessages } from './types'; import { Packed } from '@/misc/schema'; +import { redisSubscriber } from '@/db/redis'; /** * Main stream connection @@ -29,27 +29,22 @@ export default class Connection { public followingChannels: Set = new Set(); public token?: AccessToken; private wsConnection: websocket.connection; - public subscriber: StreamEventEmitter; private channels: Channel[] = []; private subscribingNotes: any = {}; private cachedNotes: Packed<'Note'>[] = []; constructor( wsConnection: websocket.connection, - subscriber: EventEmitter, user: User | null | undefined, token: AccessToken | null | undefined ) { this.wsConnection = wsConnection; - this.subscriber = subscriber; if (user) this.user = user; if (token) this.token = token; this.wsConnection.on('message', this.onWsConnectionMessage); - this.subscriber.on('broadcast', data => { - this.onBroadcastMessage(data); - }); + redisSubscriber.subscribe('broadcast', this.onBroadcastMessage); if (this.user) { this.updateFollowing(); @@ -58,12 +53,13 @@ export default class Connection { this.updateFollowingChannels(); this.updateUserProfile(); - this.subscriber.on(`user:${this.user.id}`, this.onUserEvent); + redisSubscriber.subscribe(`user:${this.user.id}`, this.onUserEvent); } } @autobind - private onUserEvent(data: StreamMessages['user']['payload']) { // { type, body }と展開するとそれぞれ型が分離してしまう + private onUserEvent(message: string) { + const data = JSON.parse(message) as StreamMessages['user']['payload']; // { type, body }と展開するとそれぞれ型が分離してしまう switch (data.type) { case 'follow': this.following.add(data.body.id); @@ -145,7 +141,8 @@ export default class Connection { } @autobind - private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) { + private onBroadcastMessage(message: string) { + const data = JSON.parse(message) as StreamMessages['broadcast']['payload']; this.sendMessageToWs(data.type, data.body); } @@ -223,6 +220,8 @@ export default class Connection { private onSubscribeNote(payload: any) { if (!payload.id) return; + // TODO: 購読できる条件を設ける + if (this.subscribingNotes[payload.id] == null) { this.subscribingNotes[payload.id] = 0; } @@ -230,7 +229,7 @@ export default class Connection { this.subscribingNotes[payload.id]++; if (this.subscribingNotes[payload.id] === 1) { - this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); + redisSubscriber.subscribe(`noteStream:${payload.id}`, this.onNoteStreamMessage); } } @@ -244,12 +243,13 @@ export default class Connection { this.subscribingNotes[payload.id]--; if (this.subscribingNotes[payload.id] <= 0) { delete this.subscribingNotes[payload.id]; - this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage); + redisSubscriber.unsubscribe(`noteStream:${payload.id}`, this.onNoteStreamMessage); } } @autobind - private async onNoteStreamMessage(data: StreamMessages['note']['payload']) { + private async onNoteStreamMessage(message: string) { + const data = JSON.parse(message) as StreamMessages['note']['payload']; this.sendMessageToWs('noteUpdated', { id: data.body.id, type: data.type, @@ -418,5 +418,14 @@ export default class Connection { for (const c of this.channels.filter(c => c.dispose)) { if (c.dispose) c.dispose(); } + + for (const id of Object.keys(this.subscribingNotes)) { + redisSubscriber.unsubscribe(`noteStream:${id}`, this.onNoteStreamMessage); + } + + redisSubscriber.unsubscribe('broadcast', this.onBroadcastMessage); + if (this.user) { + redisSubscriber.unsubscribe(`user:${this.user.id}`, this.onUserEvent); + } } } diff --git a/packages/backend/src/server/api/streaming.ts b/packages/backend/src/server/api/streaming.ts index b706b1b8d..4cef2c9aa 100644 --- a/packages/backend/src/server/api/streaming.ts +++ b/packages/backend/src/server/api/streaming.ts @@ -4,8 +4,6 @@ import * as websocket from 'websocket'; import MainStreamConnection from './stream/index'; import { ParsedUrlQuery } from 'querystring'; import authenticate from './authenticate'; -import { EventEmitter } from 'events'; -import { subsdcriber as redisClient } from '../../db/redis'; import { Users } from '@/models/index'; module.exports = (server: http.Server) => { @@ -29,16 +27,7 @@ module.exports = (server: http.Server) => { const connection = request.accept(); - const ev = new EventEmitter(); - - async function onRedisMessage(_: string, data: string) { - const parsed = JSON.parse(data); - ev.emit(parsed.channel, parsed.message); - } - - redisClient.on('message', onRedisMessage); - - const main = new MainStreamConnection(connection, ev, user, app); + const main = new MainStreamConnection(connection, user, app); const intervalId = user ? setInterval(() => { Users.update(user.id, { @@ -52,9 +41,7 @@ module.exports = (server: http.Server) => { } connection.once('close', () => { - ev.removeAllListeners(); main.dispose(); - redisClient.off('message', onRedisMessage); if (intervalId) clearInterval(intervalId); }); diff --git a/packages/backend/yarn.lock b/packages/backend/yarn.lock index c1932a905..217e84c89 100644 --- a/packages/backend/yarn.lock +++ b/packages/backend/yarn.lock @@ -139,11 +139,21 @@ methods "^1.1.2" path-to-regexp "^6.1.0" -"@node-redis/bloom@^1.0.0": +"@node-redis/bloom@1.0.1", "@node-redis/bloom@^1.0.0": version "1.0.1" resolved "https://registry.yarnpkg.com/@node-redis/bloom/-/bloom-1.0.1.tgz#144474a0b7dc4a4b91badea2cfa9538ce0a1854e" integrity sha512-mXEBvEIgF4tUzdIN89LiYsbi6//EdpFA7L8M+DHCvePXg+bfHWi+ct5VI6nHUFQE5+ohm/9wmgihCH3HSkeKsw== +"@node-redis/client@1.0.3": + version "1.0.3" + resolved "https://registry.yarnpkg.com/@node-redis/client/-/client-1.0.3.tgz#ece282b7ee07283d744e6ab1fa72f2d47641402c" + integrity sha512-IXNgOG99PHGL3NxN3/e8J8MuX+H08I+OMNmheGmZBXngE0IntaCQwwrd7NzmiHA+zH3SKHiJ+6k3P7t7XYknMw== + dependencies: + cluster-key-slot "1.1.0" + generic-pool "3.8.2" + redis-parser "3.0.0" + yallist "4.0.0" + "@node-redis/client@^1.0.2": version "1.0.2" resolved "https://registry.yarnpkg.com/@node-redis/client/-/client-1.0.2.tgz#7f09fb739675728fbc6e73536f7cd1be99bf7b8f" @@ -154,17 +164,22 @@ redis-parser "3.0.0" yallist "4.0.0" -"@node-redis/json@^1.0.2": +"@node-redis/graph@1.0.0": + version "1.0.0" + resolved "https://registry.yarnpkg.com/@node-redis/graph/-/graph-1.0.0.tgz#baf8eaac4a400f86ea04d65ec3d65715fd7951ab" + integrity sha512-mRSo8jEGC0cf+Rm7q8mWMKKKqkn6EAnA9IA2S3JvUv/gaWW/73vil7GLNwion2ihTptAm05I9LkepzfIXUKX5g== + +"@node-redis/json@1.0.2", "@node-redis/json@^1.0.2": version "1.0.2" resolved "https://registry.yarnpkg.com/@node-redis/json/-/json-1.0.2.tgz#8ad2d0f026698dc1a4238cc3d1eb099a3bee5ab8" integrity sha512-qVRgn8WfG46QQ08CghSbY4VhHFgaTY71WjpwRBGEuqGPfWwfRcIf3OqSpR7Q/45X+v3xd8mvYjywqh0wqJ8T+g== -"@node-redis/search@^1.0.2": +"@node-redis/search@1.0.2", "@node-redis/search@^1.0.2": version "1.0.2" resolved "https://registry.yarnpkg.com/@node-redis/search/-/search-1.0.2.tgz#8cfc91006ea787df801d41410283e1f59027f818" integrity sha512-gWhEeji+kTAvzZeguUNJdMSZNH2c5dv3Bci8Nn2f7VGuf6IvvwuZDSBOuOlirLVgayVuWzAG7EhwaZWK1VDnWQ== -"@node-redis/time-series@^1.0.1": +"@node-redis/time-series@1.0.1", "@node-redis/time-series@^1.0.1": version "1.0.1" resolved "https://registry.yarnpkg.com/@node-redis/time-series/-/time-series-1.0.1.tgz#703149f8fa4f6fff377c61a0873911e7c1ba5cc3" integrity sha512-+nTn6EewVj3GlUXPuD3dgheWqo219jTxlo6R+pg24OeVvFHx9aFGGiyOgj3vBPhWUdRZ0xMcujXV5ki4fbLyMw== @@ -2190,11 +2205,6 @@ denque@^1.1.0: resolved "https://registry.yarnpkg.com/denque/-/denque-1.4.1.tgz#6744ff7641c148c3f8a69c307e51235c1f4a37cf" integrity sha512-OfzPuSZKGcgr96rf1oODnfjqBFmr1DVoc/TrItj3Ohe0Ah1C5WX5Baquw/9U9KovnQ88EqmJbD66rKYUQYN1tQ== -denque@^1.5.0: - version "1.5.0" - resolved "https://registry.yarnpkg.com/denque/-/denque-1.5.0.tgz#773de0686ff2d8ec2ff92914316a47b73b1c73de" - integrity sha512-CYiCSgIF1p6EUByQPlGkKnP1M9g0ZV3qMIrqMqZqdwazygIA/YP2vrbcyl1h/WppKJTdl1F85cXIle+394iDAQ== - depd@^1.1.2, depd@~1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/depd/-/depd-1.1.2.tgz#9bcd52e14c097763e749b274c4346ed2e560b5a9" @@ -5806,7 +5816,7 @@ reconnecting-websocket@^4.4.0: resolved "https://registry.yarnpkg.com/reconnecting-websocket/-/reconnecting-websocket-4.4.0.tgz#3b0e5b96ef119e78a03135865b8bb0af1b948783" integrity sha512-D2E33ceRPga0NvTDhJmphEgJ7FUYF0v4lr1ki0csq06OdlxKfugGzN0dSkxM/NfqCxYELK4KcaTOUOjTV6Dcng== -redis-commands@1.7.0, redis-commands@^1.7.0: +redis-commands@1.7.0: version "1.7.0" resolved "https://registry.yarnpkg.com/redis-commands/-/redis-commands-1.7.0.tgz#15a6fea2d58281e27b1cd1acfb4b293e278c3a89" integrity sha512-nJWqw3bTFy21hX/CPKHth6sfhZbdiHP6bTawSgQBlKOVRG7EZkfHbbHwQJnrE4vsQf0CMNE+3gJ4Fmm16vdVlQ== @@ -5839,15 +5849,17 @@ redis@*: "@node-redis/search" "^1.0.2" "@node-redis/time-series" "^1.0.1" -redis@3.1.2: - version "3.1.2" - resolved "https://registry.yarnpkg.com/redis/-/redis-3.1.2.tgz#766851117e80653d23e0ed536254677ab647638c" - integrity sha512-grn5KoZLr/qrRQVwoSkmzdbw6pwF+/rwODtrOr6vuBRiR/f3rjSTGupbF90Zpqm2oenix8Do6RV7pYEkGwlKkw== +redis@4.0.3: + version "4.0.3" + resolved "https://registry.yarnpkg.com/redis/-/redis-4.0.3.tgz#f60931175de6f5b5727240a08e58a9ed5cf0f9de" + integrity sha512-SJMRXvgiQUYN0HaWwWv002J5ZgkhYXOlbLomzcrL3kP42yRNZ8Jx5nvLYhVpgmf10xcDpanFOxxJkphu2eyIFQ== dependencies: - denque "^1.5.0" - redis-commands "^1.7.0" - redis-errors "^1.2.0" - redis-parser "^3.0.0" + "@node-redis/bloom" "1.0.1" + "@node-redis/client" "1.0.3" + "@node-redis/graph" "1.0.0" + "@node-redis/json" "1.0.2" + "@node-redis/search" "1.0.2" + "@node-redis/time-series" "1.0.1" reflect-metadata@0.1.13, reflect-metadata@^0.1.13: version "0.1.13"