Compare commits

...

1 Commits

Author SHA1 Message Date
syuilo
833bc8a054 wip 2022-02-19 17:15:49 +09:00
9 changed files with 124 additions and 113 deletions

View File

@ -139,7 +139,7 @@
"random-seed": "0.3.0",
"ratelimiter": "3.4.1",
"re2": "1.17.3",
"redis": "3.1.2",
"redis": "4.0.3",
"redis-lock": "0.1.4",
"reflect-metadata": "0.1.13",
"rename": "1.0.4",

View File

@ -1,19 +1,18 @@
import * as redis from 'redis';
import config from '@/config/index';
export function createConnection() {
return redis.createClient(
config.redis.port,
config.redis.host,
{
export const redisClient = redis.createClient({
socket: {
port: config.redis.port,
host: config.redis.host,
},
password: config.redis.pass,
prefix: config.redis.prefix,
db: config.redis.db || 0,
}
);
}
database: config.redis.db || 0,
});
export const subsdcriber = createConnection();
subsdcriber.subscribe(config.host);
redisClient.connect();
export const redisClient = createConnection();
export const redisSubscriber = redisClient.duplicate();
redisSubscriber.connect();

View File

@ -1,6 +1,6 @@
import { Antennas } from '@/models/index';
import { Antenna } from '@/models/entities/antenna';
import { subsdcriber } from '../db/redis';
import { redisSubscriber } from '../db/redis';
let antennasFetched = false;
let antennas: Antenna[] = [];
@ -14,11 +14,8 @@ export async function getAntennas() {
return antennas;
}
subsdcriber.on('message', async (_, data) => {
const obj = JSON.parse(data);
if (obj.channel === 'internal') {
const { type, body } = obj.message;
redisSubscriber.subscribe('internal', async (message) => {
const { type, body } = JSON.parse(message);
switch (type) {
case 'antennaCreated':
antennas.push(body);
@ -32,5 +29,4 @@ subsdcriber.on('message', async (_, data) => {
default:
break;
}
}
});

View File

@ -144,11 +144,13 @@ router.get('/dc/cb', async ctx => {
return;
}
const { redirect_uri, state } = await new Promise<any>((res, rej) => {
redisClient.get(sessid, async (_, state) => {
res(JSON.parse(state));
});
});
const session = await redisClient.get(sessid);
if (session == null) {
ctx.throw(400, 'invalid session');
return;
}
const { redirect_uri, state } = JSON.parse(session);
if (ctx.query.state !== state) {
ctx.throw(400, 'invalid session');
@ -215,11 +217,13 @@ router.get('/dc/cb', async ctx => {
return;
}
const { redirect_uri, state } = await new Promise<any>((res, rej) => {
redisClient.get(userToken, async (_, state) => {
res(JSON.parse(state));
});
});
const session = await redisClient.get(userToken);
if (session == null) {
ctx.throw(400, 'invalid session');
return;
}
const { redirect_uri, state } = JSON.parse(session);
if (ctx.query.state !== state) {
ctx.throw(400, 'invalid session');

View File

@ -142,11 +142,13 @@ router.get('/gh/cb', async ctx => {
return;
}
const { redirect_uri, state } = await new Promise<any>((res, rej) => {
redisClient.get(sessid, async (_, state) => {
res(JSON.parse(state));
});
});
const session = await redisClient.get(sessid);
if (session == null) {
ctx.throw(400, 'invalid session');
return;
}
const { redirect_uri, state } = JSON.parse(session);
if (ctx.query.state !== state) {
ctx.throw(400, 'invalid session');
@ -193,11 +195,13 @@ router.get('/gh/cb', async ctx => {
return;
}
const { redirect_uri, state } = await new Promise<any>((res, rej) => {
redisClient.get(userToken, async (_, state) => {
res(JSON.parse(state));
});
});
const session = await redisClient.get(userToken);
if (session == null) {
ctx.throw(400, 'invalid session');
return;
}
const { redirect_uri, state } = JSON.parse(session);
if (ctx.query.state !== state) {
ctx.throw(400, 'invalid session');

View File

@ -123,13 +123,13 @@ router.get('/tw/cb', async ctx => {
return;
}
const get = new Promise<any>((res, rej) => {
redisClient.get(sessid, async (_, twCtx) => {
res(twCtx);
});
});
const session = await redisClient.get(sessid);
if (session == null) {
ctx.throw(400, 'invalid session');
return;
}
const twCtx = await get;
const twCtx = JSON.parse(session);
const result = await twAuth!.done(JSON.parse(twCtx), ctx.query.oauth_verifier);
@ -152,13 +152,13 @@ router.get('/tw/cb', async ctx => {
return;
}
const get = new Promise<any>((res, rej) => {
redisClient.get(userToken, async (_, twCtx) => {
res(twCtx);
});
});
const session = await redisClient.get(userToken);
if (session == null) {
ctx.throw(400, 'invalid session');
return;
}
const twCtx = await get;
const twCtx = JSON.parse(session);
const result = await twAuth!.done(JSON.parse(twCtx), verifier);

View File

@ -5,7 +5,6 @@ import call from '../call';
import readNote from '@/services/note/read';
import Channel from './channel';
import channels from './channels/index';
import { EventEmitter } from 'events';
import { User } from '@/models/entities/user';
import { Channel as ChannelModel } from '@/models/entities/channel';
import { Users, Followings, Mutings, UserProfiles, ChannelFollowings, Blockings } from '@/models/index';
@ -16,6 +15,7 @@ import { publishChannelStream, publishGroupMessagingStream, publishMessagingStre
import { UserGroup } from '@/models/entities/user-group';
import { StreamEventEmitter, StreamMessages } from './types';
import { Packed } from '@/misc/schema';
import { redisSubscriber } from '@/db/redis';
/**
* Main stream connection
@ -29,27 +29,22 @@ export default class Connection {
public followingChannels: Set<ChannelModel['id']> = new Set();
public token?: AccessToken;
private wsConnection: websocket.connection;
public subscriber: StreamEventEmitter;
private channels: Channel[] = [];
private subscribingNotes: any = {};
private cachedNotes: Packed<'Note'>[] = [];
constructor(
wsConnection: websocket.connection,
subscriber: EventEmitter,
user: User | null | undefined,
token: AccessToken | null | undefined
) {
this.wsConnection = wsConnection;
this.subscriber = subscriber;
if (user) this.user = user;
if (token) this.token = token;
this.wsConnection.on('message', this.onWsConnectionMessage);
this.subscriber.on('broadcast', data => {
this.onBroadcastMessage(data);
});
redisSubscriber.subscribe('broadcast', this.onBroadcastMessage);
if (this.user) {
this.updateFollowing();
@ -58,12 +53,13 @@ export default class Connection {
this.updateFollowingChannels();
this.updateUserProfile();
this.subscriber.on(`user:${this.user.id}`, this.onUserEvent);
redisSubscriber.subscribe(`user:${this.user.id}`, this.onUserEvent);
}
}
@autobind
private onUserEvent(data: StreamMessages['user']['payload']) { // { type, body }と展開するとそれぞれ型が分離してしまう
private onUserEvent(message: string) {
const data = JSON.parse(message) as StreamMessages['user']['payload']; // { type, body }と展開するとそれぞれ型が分離してしまう
switch (data.type) {
case 'follow':
this.following.add(data.body.id);
@ -145,7 +141,8 @@ export default class Connection {
}
@autobind
private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) {
private onBroadcastMessage(message: string) {
const data = JSON.parse(message) as StreamMessages['broadcast']['payload'];
this.sendMessageToWs(data.type, data.body);
}
@ -223,6 +220,8 @@ export default class Connection {
private onSubscribeNote(payload: any) {
if (!payload.id) return;
// TODO: 購読できる条件を設ける
if (this.subscribingNotes[payload.id] == null) {
this.subscribingNotes[payload.id] = 0;
}
@ -230,7 +229,7 @@ export default class Connection {
this.subscribingNotes[payload.id]++;
if (this.subscribingNotes[payload.id] === 1) {
this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
redisSubscriber.subscribe(`noteStream:${payload.id}`, this.onNoteStreamMessage);
}
}
@ -244,12 +243,13 @@ export default class Connection {
this.subscribingNotes[payload.id]--;
if (this.subscribingNotes[payload.id] <= 0) {
delete this.subscribingNotes[payload.id];
this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
redisSubscriber.unsubscribe(`noteStream:${payload.id}`, this.onNoteStreamMessage);
}
}
@autobind
private async onNoteStreamMessage(data: StreamMessages['note']['payload']) {
private async onNoteStreamMessage(message: string) {
const data = JSON.parse(message) as StreamMessages['note']['payload'];
this.sendMessageToWs('noteUpdated', {
id: data.body.id,
type: data.type,
@ -418,5 +418,14 @@ export default class Connection {
for (const c of this.channels.filter(c => c.dispose)) {
if (c.dispose) c.dispose();
}
for (const id of Object.keys(this.subscribingNotes)) {
redisSubscriber.unsubscribe(`noteStream:${id}`, this.onNoteStreamMessage);
}
redisSubscriber.unsubscribe('broadcast', this.onBroadcastMessage);
if (this.user) {
redisSubscriber.unsubscribe(`user:${this.user.id}`, this.onUserEvent);
}
}
}

View File

@ -4,8 +4,6 @@ import * as websocket from 'websocket';
import MainStreamConnection from './stream/index';
import { ParsedUrlQuery } from 'querystring';
import authenticate from './authenticate';
import { EventEmitter } from 'events';
import { subsdcriber as redisClient } from '../../db/redis';
import { Users } from '@/models/index';
module.exports = (server: http.Server) => {
@ -29,16 +27,7 @@ module.exports = (server: http.Server) => {
const connection = request.accept();
const ev = new EventEmitter();
async function onRedisMessage(_: string, data: string) {
const parsed = JSON.parse(data);
ev.emit(parsed.channel, parsed.message);
}
redisClient.on('message', onRedisMessage);
const main = new MainStreamConnection(connection, ev, user, app);
const main = new MainStreamConnection(connection, user, app);
const intervalId = user ? setInterval(() => {
Users.update(user.id, {
@ -52,9 +41,7 @@ module.exports = (server: http.Server) => {
}
connection.once('close', () => {
ev.removeAllListeners();
main.dispose();
redisClient.off('message', onRedisMessage);
if (intervalId) clearInterval(intervalId);
});

View File

@ -139,11 +139,21 @@
methods "^1.1.2"
path-to-regexp "^6.1.0"
"@node-redis/bloom@^1.0.0":
"@node-redis/bloom@1.0.1", "@node-redis/bloom@^1.0.0":
version "1.0.1"
resolved "https://registry.yarnpkg.com/@node-redis/bloom/-/bloom-1.0.1.tgz#144474a0b7dc4a4b91badea2cfa9538ce0a1854e"
integrity sha512-mXEBvEIgF4tUzdIN89LiYsbi6//EdpFA7L8M+DHCvePXg+bfHWi+ct5VI6nHUFQE5+ohm/9wmgihCH3HSkeKsw==
"@node-redis/client@1.0.3":
version "1.0.3"
resolved "https://registry.yarnpkg.com/@node-redis/client/-/client-1.0.3.tgz#ece282b7ee07283d744e6ab1fa72f2d47641402c"
integrity sha512-IXNgOG99PHGL3NxN3/e8J8MuX+H08I+OMNmheGmZBXngE0IntaCQwwrd7NzmiHA+zH3SKHiJ+6k3P7t7XYknMw==
dependencies:
cluster-key-slot "1.1.0"
generic-pool "3.8.2"
redis-parser "3.0.0"
yallist "4.0.0"
"@node-redis/client@^1.0.2":
version "1.0.2"
resolved "https://registry.yarnpkg.com/@node-redis/client/-/client-1.0.2.tgz#7f09fb739675728fbc6e73536f7cd1be99bf7b8f"
@ -154,17 +164,22 @@
redis-parser "3.0.0"
yallist "4.0.0"
"@node-redis/json@^1.0.2":
"@node-redis/graph@1.0.0":
version "1.0.0"
resolved "https://registry.yarnpkg.com/@node-redis/graph/-/graph-1.0.0.tgz#baf8eaac4a400f86ea04d65ec3d65715fd7951ab"
integrity sha512-mRSo8jEGC0cf+Rm7q8mWMKKKqkn6EAnA9IA2S3JvUv/gaWW/73vil7GLNwion2ihTptAm05I9LkepzfIXUKX5g==
"@node-redis/json@1.0.2", "@node-redis/json@^1.0.2":
version "1.0.2"
resolved "https://registry.yarnpkg.com/@node-redis/json/-/json-1.0.2.tgz#8ad2d0f026698dc1a4238cc3d1eb099a3bee5ab8"
integrity sha512-qVRgn8WfG46QQ08CghSbY4VhHFgaTY71WjpwRBGEuqGPfWwfRcIf3OqSpR7Q/45X+v3xd8mvYjywqh0wqJ8T+g==
"@node-redis/search@^1.0.2":
"@node-redis/search@1.0.2", "@node-redis/search@^1.0.2":
version "1.0.2"
resolved "https://registry.yarnpkg.com/@node-redis/search/-/search-1.0.2.tgz#8cfc91006ea787df801d41410283e1f59027f818"
integrity sha512-gWhEeji+kTAvzZeguUNJdMSZNH2c5dv3Bci8Nn2f7VGuf6IvvwuZDSBOuOlirLVgayVuWzAG7EhwaZWK1VDnWQ==
"@node-redis/time-series@^1.0.1":
"@node-redis/time-series@1.0.1", "@node-redis/time-series@^1.0.1":
version "1.0.1"
resolved "https://registry.yarnpkg.com/@node-redis/time-series/-/time-series-1.0.1.tgz#703149f8fa4f6fff377c61a0873911e7c1ba5cc3"
integrity sha512-+nTn6EewVj3GlUXPuD3dgheWqo219jTxlo6R+pg24OeVvFHx9aFGGiyOgj3vBPhWUdRZ0xMcujXV5ki4fbLyMw==
@ -2190,11 +2205,6 @@ denque@^1.1.0:
resolved "https://registry.yarnpkg.com/denque/-/denque-1.4.1.tgz#6744ff7641c148c3f8a69c307e51235c1f4a37cf"
integrity sha512-OfzPuSZKGcgr96rf1oODnfjqBFmr1DVoc/TrItj3Ohe0Ah1C5WX5Baquw/9U9KovnQ88EqmJbD66rKYUQYN1tQ==
denque@^1.5.0:
version "1.5.0"
resolved "https://registry.yarnpkg.com/denque/-/denque-1.5.0.tgz#773de0686ff2d8ec2ff92914316a47b73b1c73de"
integrity sha512-CYiCSgIF1p6EUByQPlGkKnP1M9g0ZV3qMIrqMqZqdwazygIA/YP2vrbcyl1h/WppKJTdl1F85cXIle+394iDAQ==
depd@^1.1.2, depd@~1.1.2:
version "1.1.2"
resolved "https://registry.yarnpkg.com/depd/-/depd-1.1.2.tgz#9bcd52e14c097763e749b274c4346ed2e560b5a9"
@ -5806,7 +5816,7 @@ reconnecting-websocket@^4.4.0:
resolved "https://registry.yarnpkg.com/reconnecting-websocket/-/reconnecting-websocket-4.4.0.tgz#3b0e5b96ef119e78a03135865b8bb0af1b948783"
integrity sha512-D2E33ceRPga0NvTDhJmphEgJ7FUYF0v4lr1ki0csq06OdlxKfugGzN0dSkxM/NfqCxYELK4KcaTOUOjTV6Dcng==
redis-commands@1.7.0, redis-commands@^1.7.0:
redis-commands@1.7.0:
version "1.7.0"
resolved "https://registry.yarnpkg.com/redis-commands/-/redis-commands-1.7.0.tgz#15a6fea2d58281e27b1cd1acfb4b293e278c3a89"
integrity sha512-nJWqw3bTFy21hX/CPKHth6sfhZbdiHP6bTawSgQBlKOVRG7EZkfHbbHwQJnrE4vsQf0CMNE+3gJ4Fmm16vdVlQ==
@ -5839,15 +5849,17 @@ redis@*:
"@node-redis/search" "^1.0.2"
"@node-redis/time-series" "^1.0.1"
redis@3.1.2:
version "3.1.2"
resolved "https://registry.yarnpkg.com/redis/-/redis-3.1.2.tgz#766851117e80653d23e0ed536254677ab647638c"
integrity sha512-grn5KoZLr/qrRQVwoSkmzdbw6pwF+/rwODtrOr6vuBRiR/f3rjSTGupbF90Zpqm2oenix8Do6RV7pYEkGwlKkw==
redis@4.0.3:
version "4.0.3"
resolved "https://registry.yarnpkg.com/redis/-/redis-4.0.3.tgz#f60931175de6f5b5727240a08e58a9ed5cf0f9de"
integrity sha512-SJMRXvgiQUYN0HaWwWv002J5ZgkhYXOlbLomzcrL3kP42yRNZ8Jx5nvLYhVpgmf10xcDpanFOxxJkphu2eyIFQ==
dependencies:
denque "^1.5.0"
redis-commands "^1.7.0"
redis-errors "^1.2.0"
redis-parser "^3.0.0"
"@node-redis/bloom" "1.0.1"
"@node-redis/client" "1.0.3"
"@node-redis/graph" "1.0.0"
"@node-redis/json" "1.0.2"
"@node-redis/search" "1.0.2"
"@node-redis/time-series" "1.0.1"
reflect-metadata@0.1.13, reflect-metadata@^0.1.13:
version "0.1.13"