Compare commits
1 Commits
master
...
node-redis
Author | SHA1 | Date | |
---|---|---|---|
|
833bc8a054 |
@ -139,7 +139,7 @@
|
||||
"random-seed": "0.3.0",
|
||||
"ratelimiter": "3.4.1",
|
||||
"re2": "1.17.3",
|
||||
"redis": "3.1.2",
|
||||
"redis": "4.0.3",
|
||||
"redis-lock": "0.1.4",
|
||||
"reflect-metadata": "0.1.13",
|
||||
"rename": "1.0.4",
|
||||
|
@ -1,19 +1,18 @@
|
||||
import * as redis from 'redis';
|
||||
import config from '@/config/index';
|
||||
|
||||
export function createConnection() {
|
||||
return redis.createClient(
|
||||
config.redis.port,
|
||||
config.redis.host,
|
||||
{
|
||||
export const redisClient = redis.createClient({
|
||||
socket: {
|
||||
port: config.redis.port,
|
||||
host: config.redis.host,
|
||||
},
|
||||
password: config.redis.pass,
|
||||
prefix: config.redis.prefix,
|
||||
db: config.redis.db || 0,
|
||||
}
|
||||
);
|
||||
}
|
||||
database: config.redis.db || 0,
|
||||
});
|
||||
|
||||
export const subsdcriber = createConnection();
|
||||
subsdcriber.subscribe(config.host);
|
||||
redisClient.connect();
|
||||
|
||||
export const redisClient = createConnection();
|
||||
export const redisSubscriber = redisClient.duplicate();
|
||||
|
||||
redisSubscriber.connect();
|
||||
|
@ -1,6 +1,6 @@
|
||||
import { Antennas } from '@/models/index';
|
||||
import { Antenna } from '@/models/entities/antenna';
|
||||
import { subsdcriber } from '../db/redis';
|
||||
import { redisSubscriber } from '../db/redis';
|
||||
|
||||
let antennasFetched = false;
|
||||
let antennas: Antenna[] = [];
|
||||
@ -14,11 +14,8 @@ export async function getAntennas() {
|
||||
return antennas;
|
||||
}
|
||||
|
||||
subsdcriber.on('message', async (_, data) => {
|
||||
const obj = JSON.parse(data);
|
||||
|
||||
if (obj.channel === 'internal') {
|
||||
const { type, body } = obj.message;
|
||||
redisSubscriber.subscribe('internal', async (message) => {
|
||||
const { type, body } = JSON.parse(message);
|
||||
switch (type) {
|
||||
case 'antennaCreated':
|
||||
antennas.push(body);
|
||||
@ -32,5 +29,4 @@ subsdcriber.on('message', async (_, data) => {
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -144,11 +144,13 @@ router.get('/dc/cb', async ctx => {
|
||||
return;
|
||||
}
|
||||
|
||||
const { redirect_uri, state } = await new Promise<any>((res, rej) => {
|
||||
redisClient.get(sessid, async (_, state) => {
|
||||
res(JSON.parse(state));
|
||||
});
|
||||
});
|
||||
const session = await redisClient.get(sessid);
|
||||
if (session == null) {
|
||||
ctx.throw(400, 'invalid session');
|
||||
return;
|
||||
}
|
||||
|
||||
const { redirect_uri, state } = JSON.parse(session);
|
||||
|
||||
if (ctx.query.state !== state) {
|
||||
ctx.throw(400, 'invalid session');
|
||||
@ -215,11 +217,13 @@ router.get('/dc/cb', async ctx => {
|
||||
return;
|
||||
}
|
||||
|
||||
const { redirect_uri, state } = await new Promise<any>((res, rej) => {
|
||||
redisClient.get(userToken, async (_, state) => {
|
||||
res(JSON.parse(state));
|
||||
});
|
||||
});
|
||||
const session = await redisClient.get(userToken);
|
||||
if (session == null) {
|
||||
ctx.throw(400, 'invalid session');
|
||||
return;
|
||||
}
|
||||
|
||||
const { redirect_uri, state } = JSON.parse(session);
|
||||
|
||||
if (ctx.query.state !== state) {
|
||||
ctx.throw(400, 'invalid session');
|
||||
|
@ -142,11 +142,13 @@ router.get('/gh/cb', async ctx => {
|
||||
return;
|
||||
}
|
||||
|
||||
const { redirect_uri, state } = await new Promise<any>((res, rej) => {
|
||||
redisClient.get(sessid, async (_, state) => {
|
||||
res(JSON.parse(state));
|
||||
});
|
||||
});
|
||||
const session = await redisClient.get(sessid);
|
||||
if (session == null) {
|
||||
ctx.throw(400, 'invalid session');
|
||||
return;
|
||||
}
|
||||
|
||||
const { redirect_uri, state } = JSON.parse(session);
|
||||
|
||||
if (ctx.query.state !== state) {
|
||||
ctx.throw(400, 'invalid session');
|
||||
@ -193,11 +195,13 @@ router.get('/gh/cb', async ctx => {
|
||||
return;
|
||||
}
|
||||
|
||||
const { redirect_uri, state } = await new Promise<any>((res, rej) => {
|
||||
redisClient.get(userToken, async (_, state) => {
|
||||
res(JSON.parse(state));
|
||||
});
|
||||
});
|
||||
const session = await redisClient.get(userToken);
|
||||
if (session == null) {
|
||||
ctx.throw(400, 'invalid session');
|
||||
return;
|
||||
}
|
||||
|
||||
const { redirect_uri, state } = JSON.parse(session);
|
||||
|
||||
if (ctx.query.state !== state) {
|
||||
ctx.throw(400, 'invalid session');
|
||||
|
@ -123,13 +123,13 @@ router.get('/tw/cb', async ctx => {
|
||||
return;
|
||||
}
|
||||
|
||||
const get = new Promise<any>((res, rej) => {
|
||||
redisClient.get(sessid, async (_, twCtx) => {
|
||||
res(twCtx);
|
||||
});
|
||||
});
|
||||
const session = await redisClient.get(sessid);
|
||||
if (session == null) {
|
||||
ctx.throw(400, 'invalid session');
|
||||
return;
|
||||
}
|
||||
|
||||
const twCtx = await get;
|
||||
const twCtx = JSON.parse(session);
|
||||
|
||||
const result = await twAuth!.done(JSON.parse(twCtx), ctx.query.oauth_verifier);
|
||||
|
||||
@ -152,13 +152,13 @@ router.get('/tw/cb', async ctx => {
|
||||
return;
|
||||
}
|
||||
|
||||
const get = new Promise<any>((res, rej) => {
|
||||
redisClient.get(userToken, async (_, twCtx) => {
|
||||
res(twCtx);
|
||||
});
|
||||
});
|
||||
const session = await redisClient.get(userToken);
|
||||
if (session == null) {
|
||||
ctx.throw(400, 'invalid session');
|
||||
return;
|
||||
}
|
||||
|
||||
const twCtx = await get;
|
||||
const twCtx = JSON.parse(session);
|
||||
|
||||
const result = await twAuth!.done(JSON.parse(twCtx), verifier);
|
||||
|
||||
|
@ -5,7 +5,6 @@ import call from '../call';
|
||||
import readNote from '@/services/note/read';
|
||||
import Channel from './channel';
|
||||
import channels from './channels/index';
|
||||
import { EventEmitter } from 'events';
|
||||
import { User } from '@/models/entities/user';
|
||||
import { Channel as ChannelModel } from '@/models/entities/channel';
|
||||
import { Users, Followings, Mutings, UserProfiles, ChannelFollowings, Blockings } from '@/models/index';
|
||||
@ -16,6 +15,7 @@ import { publishChannelStream, publishGroupMessagingStream, publishMessagingStre
|
||||
import { UserGroup } from '@/models/entities/user-group';
|
||||
import { StreamEventEmitter, StreamMessages } from './types';
|
||||
import { Packed } from '@/misc/schema';
|
||||
import { redisSubscriber } from '@/db/redis';
|
||||
|
||||
/**
|
||||
* Main stream connection
|
||||
@ -29,27 +29,22 @@ export default class Connection {
|
||||
public followingChannels: Set<ChannelModel['id']> = new Set();
|
||||
public token?: AccessToken;
|
||||
private wsConnection: websocket.connection;
|
||||
public subscriber: StreamEventEmitter;
|
||||
private channels: Channel[] = [];
|
||||
private subscribingNotes: any = {};
|
||||
private cachedNotes: Packed<'Note'>[] = [];
|
||||
|
||||
constructor(
|
||||
wsConnection: websocket.connection,
|
||||
subscriber: EventEmitter,
|
||||
user: User | null | undefined,
|
||||
token: AccessToken | null | undefined
|
||||
) {
|
||||
this.wsConnection = wsConnection;
|
||||
this.subscriber = subscriber;
|
||||
if (user) this.user = user;
|
||||
if (token) this.token = token;
|
||||
|
||||
this.wsConnection.on('message', this.onWsConnectionMessage);
|
||||
|
||||
this.subscriber.on('broadcast', data => {
|
||||
this.onBroadcastMessage(data);
|
||||
});
|
||||
redisSubscriber.subscribe('broadcast', this.onBroadcastMessage);
|
||||
|
||||
if (this.user) {
|
||||
this.updateFollowing();
|
||||
@ -58,12 +53,13 @@ export default class Connection {
|
||||
this.updateFollowingChannels();
|
||||
this.updateUserProfile();
|
||||
|
||||
this.subscriber.on(`user:${this.user.id}`, this.onUserEvent);
|
||||
redisSubscriber.subscribe(`user:${this.user.id}`, this.onUserEvent);
|
||||
}
|
||||
}
|
||||
|
||||
@autobind
|
||||
private onUserEvent(data: StreamMessages['user']['payload']) { // { type, body }と展開するとそれぞれ型が分離してしまう
|
||||
private onUserEvent(message: string) {
|
||||
const data = JSON.parse(message) as StreamMessages['user']['payload']; // { type, body }と展開するとそれぞれ型が分離してしまう
|
||||
switch (data.type) {
|
||||
case 'follow':
|
||||
this.following.add(data.body.id);
|
||||
@ -145,7 +141,8 @@ export default class Connection {
|
||||
}
|
||||
|
||||
@autobind
|
||||
private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) {
|
||||
private onBroadcastMessage(message: string) {
|
||||
const data = JSON.parse(message) as StreamMessages['broadcast']['payload'];
|
||||
this.sendMessageToWs(data.type, data.body);
|
||||
}
|
||||
|
||||
@ -223,6 +220,8 @@ export default class Connection {
|
||||
private onSubscribeNote(payload: any) {
|
||||
if (!payload.id) return;
|
||||
|
||||
// TODO: 購読できる条件を設ける
|
||||
|
||||
if (this.subscribingNotes[payload.id] == null) {
|
||||
this.subscribingNotes[payload.id] = 0;
|
||||
}
|
||||
@ -230,7 +229,7 @@ export default class Connection {
|
||||
this.subscribingNotes[payload.id]++;
|
||||
|
||||
if (this.subscribingNotes[payload.id] === 1) {
|
||||
this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
|
||||
redisSubscriber.subscribe(`noteStream:${payload.id}`, this.onNoteStreamMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@ -244,12 +243,13 @@ export default class Connection {
|
||||
this.subscribingNotes[payload.id]--;
|
||||
if (this.subscribingNotes[payload.id] <= 0) {
|
||||
delete this.subscribingNotes[payload.id];
|
||||
this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
|
||||
redisSubscriber.unsubscribe(`noteStream:${payload.id}`, this.onNoteStreamMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@autobind
|
||||
private async onNoteStreamMessage(data: StreamMessages['note']['payload']) {
|
||||
private async onNoteStreamMessage(message: string) {
|
||||
const data = JSON.parse(message) as StreamMessages['note']['payload'];
|
||||
this.sendMessageToWs('noteUpdated', {
|
||||
id: data.body.id,
|
||||
type: data.type,
|
||||
@ -418,5 +418,14 @@ export default class Connection {
|
||||
for (const c of this.channels.filter(c => c.dispose)) {
|
||||
if (c.dispose) c.dispose();
|
||||
}
|
||||
|
||||
for (const id of Object.keys(this.subscribingNotes)) {
|
||||
redisSubscriber.unsubscribe(`noteStream:${id}`, this.onNoteStreamMessage);
|
||||
}
|
||||
|
||||
redisSubscriber.unsubscribe('broadcast', this.onBroadcastMessage);
|
||||
if (this.user) {
|
||||
redisSubscriber.unsubscribe(`user:${this.user.id}`, this.onUserEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4,8 +4,6 @@ import * as websocket from 'websocket';
|
||||
import MainStreamConnection from './stream/index';
|
||||
import { ParsedUrlQuery } from 'querystring';
|
||||
import authenticate from './authenticate';
|
||||
import { EventEmitter } from 'events';
|
||||
import { subsdcriber as redisClient } from '../../db/redis';
|
||||
import { Users } from '@/models/index';
|
||||
|
||||
module.exports = (server: http.Server) => {
|
||||
@ -29,16 +27,7 @@ module.exports = (server: http.Server) => {
|
||||
|
||||
const connection = request.accept();
|
||||
|
||||
const ev = new EventEmitter();
|
||||
|
||||
async function onRedisMessage(_: string, data: string) {
|
||||
const parsed = JSON.parse(data);
|
||||
ev.emit(parsed.channel, parsed.message);
|
||||
}
|
||||
|
||||
redisClient.on('message', onRedisMessage);
|
||||
|
||||
const main = new MainStreamConnection(connection, ev, user, app);
|
||||
const main = new MainStreamConnection(connection, user, app);
|
||||
|
||||
const intervalId = user ? setInterval(() => {
|
||||
Users.update(user.id, {
|
||||
@ -52,9 +41,7 @@ module.exports = (server: http.Server) => {
|
||||
}
|
||||
|
||||
connection.once('close', () => {
|
||||
ev.removeAllListeners();
|
||||
main.dispose();
|
||||
redisClient.off('message', onRedisMessage);
|
||||
if (intervalId) clearInterval(intervalId);
|
||||
});
|
||||
|
||||
|
@ -139,11 +139,21 @@
|
||||
methods "^1.1.2"
|
||||
path-to-regexp "^6.1.0"
|
||||
|
||||
"@node-redis/bloom@^1.0.0":
|
||||
"@node-redis/bloom@1.0.1", "@node-redis/bloom@^1.0.0":
|
||||
version "1.0.1"
|
||||
resolved "https://registry.yarnpkg.com/@node-redis/bloom/-/bloom-1.0.1.tgz#144474a0b7dc4a4b91badea2cfa9538ce0a1854e"
|
||||
integrity sha512-mXEBvEIgF4tUzdIN89LiYsbi6//EdpFA7L8M+DHCvePXg+bfHWi+ct5VI6nHUFQE5+ohm/9wmgihCH3HSkeKsw==
|
||||
|
||||
"@node-redis/client@1.0.3":
|
||||
version "1.0.3"
|
||||
resolved "https://registry.yarnpkg.com/@node-redis/client/-/client-1.0.3.tgz#ece282b7ee07283d744e6ab1fa72f2d47641402c"
|
||||
integrity sha512-IXNgOG99PHGL3NxN3/e8J8MuX+H08I+OMNmheGmZBXngE0IntaCQwwrd7NzmiHA+zH3SKHiJ+6k3P7t7XYknMw==
|
||||
dependencies:
|
||||
cluster-key-slot "1.1.0"
|
||||
generic-pool "3.8.2"
|
||||
redis-parser "3.0.0"
|
||||
yallist "4.0.0"
|
||||
|
||||
"@node-redis/client@^1.0.2":
|
||||
version "1.0.2"
|
||||
resolved "https://registry.yarnpkg.com/@node-redis/client/-/client-1.0.2.tgz#7f09fb739675728fbc6e73536f7cd1be99bf7b8f"
|
||||
@ -154,17 +164,22 @@
|
||||
redis-parser "3.0.0"
|
||||
yallist "4.0.0"
|
||||
|
||||
"@node-redis/json@^1.0.2":
|
||||
"@node-redis/graph@1.0.0":
|
||||
version "1.0.0"
|
||||
resolved "https://registry.yarnpkg.com/@node-redis/graph/-/graph-1.0.0.tgz#baf8eaac4a400f86ea04d65ec3d65715fd7951ab"
|
||||
integrity sha512-mRSo8jEGC0cf+Rm7q8mWMKKKqkn6EAnA9IA2S3JvUv/gaWW/73vil7GLNwion2ihTptAm05I9LkepzfIXUKX5g==
|
||||
|
||||
"@node-redis/json@1.0.2", "@node-redis/json@^1.0.2":
|
||||
version "1.0.2"
|
||||
resolved "https://registry.yarnpkg.com/@node-redis/json/-/json-1.0.2.tgz#8ad2d0f026698dc1a4238cc3d1eb099a3bee5ab8"
|
||||
integrity sha512-qVRgn8WfG46QQ08CghSbY4VhHFgaTY71WjpwRBGEuqGPfWwfRcIf3OqSpR7Q/45X+v3xd8mvYjywqh0wqJ8T+g==
|
||||
|
||||
"@node-redis/search@^1.0.2":
|
||||
"@node-redis/search@1.0.2", "@node-redis/search@^1.0.2":
|
||||
version "1.0.2"
|
||||
resolved "https://registry.yarnpkg.com/@node-redis/search/-/search-1.0.2.tgz#8cfc91006ea787df801d41410283e1f59027f818"
|
||||
integrity sha512-gWhEeji+kTAvzZeguUNJdMSZNH2c5dv3Bci8Nn2f7VGuf6IvvwuZDSBOuOlirLVgayVuWzAG7EhwaZWK1VDnWQ==
|
||||
|
||||
"@node-redis/time-series@^1.0.1":
|
||||
"@node-redis/time-series@1.0.1", "@node-redis/time-series@^1.0.1":
|
||||
version "1.0.1"
|
||||
resolved "https://registry.yarnpkg.com/@node-redis/time-series/-/time-series-1.0.1.tgz#703149f8fa4f6fff377c61a0873911e7c1ba5cc3"
|
||||
integrity sha512-+nTn6EewVj3GlUXPuD3dgheWqo219jTxlo6R+pg24OeVvFHx9aFGGiyOgj3vBPhWUdRZ0xMcujXV5ki4fbLyMw==
|
||||
@ -2190,11 +2205,6 @@ denque@^1.1.0:
|
||||
resolved "https://registry.yarnpkg.com/denque/-/denque-1.4.1.tgz#6744ff7641c148c3f8a69c307e51235c1f4a37cf"
|
||||
integrity sha512-OfzPuSZKGcgr96rf1oODnfjqBFmr1DVoc/TrItj3Ohe0Ah1C5WX5Baquw/9U9KovnQ88EqmJbD66rKYUQYN1tQ==
|
||||
|
||||
denque@^1.5.0:
|
||||
version "1.5.0"
|
||||
resolved "https://registry.yarnpkg.com/denque/-/denque-1.5.0.tgz#773de0686ff2d8ec2ff92914316a47b73b1c73de"
|
||||
integrity sha512-CYiCSgIF1p6EUByQPlGkKnP1M9g0ZV3qMIrqMqZqdwazygIA/YP2vrbcyl1h/WppKJTdl1F85cXIle+394iDAQ==
|
||||
|
||||
depd@^1.1.2, depd@~1.1.2:
|
||||
version "1.1.2"
|
||||
resolved "https://registry.yarnpkg.com/depd/-/depd-1.1.2.tgz#9bcd52e14c097763e749b274c4346ed2e560b5a9"
|
||||
@ -5806,7 +5816,7 @@ reconnecting-websocket@^4.4.0:
|
||||
resolved "https://registry.yarnpkg.com/reconnecting-websocket/-/reconnecting-websocket-4.4.0.tgz#3b0e5b96ef119e78a03135865b8bb0af1b948783"
|
||||
integrity sha512-D2E33ceRPga0NvTDhJmphEgJ7FUYF0v4lr1ki0csq06OdlxKfugGzN0dSkxM/NfqCxYELK4KcaTOUOjTV6Dcng==
|
||||
|
||||
redis-commands@1.7.0, redis-commands@^1.7.0:
|
||||
redis-commands@1.7.0:
|
||||
version "1.7.0"
|
||||
resolved "https://registry.yarnpkg.com/redis-commands/-/redis-commands-1.7.0.tgz#15a6fea2d58281e27b1cd1acfb4b293e278c3a89"
|
||||
integrity sha512-nJWqw3bTFy21hX/CPKHth6sfhZbdiHP6bTawSgQBlKOVRG7EZkfHbbHwQJnrE4vsQf0CMNE+3gJ4Fmm16vdVlQ==
|
||||
@ -5839,15 +5849,17 @@ redis@*:
|
||||
"@node-redis/search" "^1.0.2"
|
||||
"@node-redis/time-series" "^1.0.1"
|
||||
|
||||
redis@3.1.2:
|
||||
version "3.1.2"
|
||||
resolved "https://registry.yarnpkg.com/redis/-/redis-3.1.2.tgz#766851117e80653d23e0ed536254677ab647638c"
|
||||
integrity sha512-grn5KoZLr/qrRQVwoSkmzdbw6pwF+/rwODtrOr6vuBRiR/f3rjSTGupbF90Zpqm2oenix8Do6RV7pYEkGwlKkw==
|
||||
redis@4.0.3:
|
||||
version "4.0.3"
|
||||
resolved "https://registry.yarnpkg.com/redis/-/redis-4.0.3.tgz#f60931175de6f5b5727240a08e58a9ed5cf0f9de"
|
||||
integrity sha512-SJMRXvgiQUYN0HaWwWv002J5ZgkhYXOlbLomzcrL3kP42yRNZ8Jx5nvLYhVpgmf10xcDpanFOxxJkphu2eyIFQ==
|
||||
dependencies:
|
||||
denque "^1.5.0"
|
||||
redis-commands "^1.7.0"
|
||||
redis-errors "^1.2.0"
|
||||
redis-parser "^3.0.0"
|
||||
"@node-redis/bloom" "1.0.1"
|
||||
"@node-redis/client" "1.0.3"
|
||||
"@node-redis/graph" "1.0.0"
|
||||
"@node-redis/json" "1.0.2"
|
||||
"@node-redis/search" "1.0.2"
|
||||
"@node-redis/time-series" "1.0.1"
|
||||
|
||||
reflect-metadata@0.1.13, reflect-metadata@^0.1.13:
|
||||
version "0.1.13"
|
||||
|
Loading…
Reference in New Issue
Block a user