Fix: チャンネルのフォロー・アンフォローの反映速度を改善 (#12149)
* チャンネルのフォロー・アンフォローの反映速度を改善 * fix lint * userFollowingChannelsCacheの場所をCacheServiceからChannelFollowingServiceに移動 --------- Co-authored-by: osamu <46447427+sam-osamu@users.noreply.github.com>
This commit is contained in:
parent
5e76675a0c
commit
a8ee67cace
@ -33,6 +33,7 @@
|
||||
- Fix: 自分のフォローしているユーザーの自分のフォローしていないユーザーの visibility: followers な投稿への返信がストリーミングで流れてくる問題を修正
|
||||
- Fix: RedisへのTLキャッシュが有効の場合にHTL/LTL/STLが空になることがある問題を修正
|
||||
- Fix: STLでフォローしていないチャンネルが取得される問題を修正
|
||||
- Fix: フォローしているチャンネルをフォロー解除した時(またはその逆)、タイムラインに反映される間隔を改善
|
||||
|
||||
## 2023.10.2
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import * as Redis from 'ioredis';
|
||||
import type { BlockingsRepository, ChannelFollowingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing } from '@/models/_.js';
|
||||
import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing } from '@/models/_.js';
|
||||
import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js';
|
||||
import type { MiLocalUser, MiUser } from '@/models/User.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
@ -26,7 +26,6 @@ export class CacheService implements OnApplicationShutdown {
|
||||
public userBlockedCache: RedisKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ
|
||||
public renoteMutingsCache: RedisKVCache<Set<string>>;
|
||||
public userFollowingsCache: RedisKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>;
|
||||
public userFollowingChannelsCache: RedisKVCache<Set<string>>;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.redis)
|
||||
@ -53,9 +52,6 @@ export class CacheService implements OnApplicationShutdown {
|
||||
@Inject(DI.followingsRepository)
|
||||
private followingsRepository: FollowingsRepository,
|
||||
|
||||
@Inject(DI.channelFollowingsRepository)
|
||||
private channelFollowingsRepository: ChannelFollowingsRepository,
|
||||
|
||||
private userEntityService: UserEntityService,
|
||||
) {
|
||||
//this.onMessage = this.onMessage.bind(this);
|
||||
@ -150,13 +146,7 @@ export class CacheService implements OnApplicationShutdown {
|
||||
fromRedisConverter: (value) => JSON.parse(value),
|
||||
});
|
||||
|
||||
this.userFollowingChannelsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userFollowingChannels', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
memoryCacheLifetime: 1000 * 60, // 1m
|
||||
fetcher: (key) => this.channelFollowingsRepository.find({ where: { followerId: key }, select: ['followeeId'] }).then(xs => new Set(xs.map(x => x.followeeId))),
|
||||
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
|
||||
fromRedisConverter: (value) => new Set(JSON.parse(value)),
|
||||
});
|
||||
// NOTE: チャンネルのフォロー状況キャッシュはChannelFollowingServiceで行っている
|
||||
|
||||
this.redisForSub.on('message', this.onMessage);
|
||||
}
|
||||
@ -221,7 +211,6 @@ export class CacheService implements OnApplicationShutdown {
|
||||
this.userBlockedCache.dispose();
|
||||
this.renoteMutingsCache.dispose();
|
||||
this.userFollowingsCache.dispose();
|
||||
this.userFollowingChannelsCache.dispose();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
104
packages/backend/src/core/ChannelFollowingService.ts
Normal file
104
packages/backend/src/core/ChannelFollowingService.ts
Normal file
@ -0,0 +1,104 @@
|
||||
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
|
||||
import Redis from 'ioredis';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import type { ChannelFollowingsRepository } from '@/models/_.js';
|
||||
import { MiChannel } from '@/models/_.js';
|
||||
import { IdService } from '@/core/IdService.js';
|
||||
import { GlobalEvents, GlobalEventService } from '@/core/GlobalEventService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import type { MiLocalUser } from '@/models/User.js';
|
||||
import { RedisKVCache } from '@/misc/cache.js';
|
||||
|
||||
@Injectable()
|
||||
export class ChannelFollowingService implements OnModuleInit {
|
||||
public userFollowingChannelsCache: RedisKVCache<Set<string>>;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.redis)
|
||||
private redisClient: Redis.Redis,
|
||||
@Inject(DI.redisForSub)
|
||||
private redisForSub: Redis.Redis,
|
||||
@Inject(DI.channelFollowingsRepository)
|
||||
private channelFollowingsRepository: ChannelFollowingsRepository,
|
||||
private idService: IdService,
|
||||
private globalEventService: GlobalEventService,
|
||||
) {
|
||||
this.userFollowingChannelsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userFollowingChannels', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
memoryCacheLifetime: 1000 * 60, // 1m
|
||||
fetcher: (key) => this.channelFollowingsRepository.find({
|
||||
where: { followerId: key },
|
||||
select: ['followeeId'],
|
||||
}).then(xs => new Set(xs.map(x => x.followeeId))),
|
||||
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
|
||||
fromRedisConverter: (value) => new Set(JSON.parse(value)),
|
||||
});
|
||||
|
||||
this.redisForSub.on('message', this.onMessage);
|
||||
}
|
||||
|
||||
onModuleInit() {
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async follow(
|
||||
requestUser: MiLocalUser,
|
||||
targetChannel: MiChannel,
|
||||
): Promise<void> {
|
||||
await this.channelFollowingsRepository.insert({
|
||||
id: this.idService.gen(),
|
||||
followerId: requestUser.id,
|
||||
followeeId: targetChannel.id,
|
||||
});
|
||||
|
||||
this.globalEventService.publishInternalEvent('followChannel', {
|
||||
userId: requestUser.id,
|
||||
channelId: targetChannel.id,
|
||||
});
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async unfollow(
|
||||
requestUser: MiLocalUser,
|
||||
targetChannel: MiChannel,
|
||||
): Promise<void> {
|
||||
await this.channelFollowingsRepository.delete({
|
||||
followerId: requestUser.id,
|
||||
followeeId: targetChannel.id,
|
||||
});
|
||||
|
||||
this.globalEventService.publishInternalEvent('unfollowChannel', {
|
||||
userId: requestUser.id,
|
||||
channelId: targetChannel.id,
|
||||
});
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async onMessage(_: string, data: string): Promise<void> {
|
||||
const obj = JSON.parse(data);
|
||||
|
||||
if (obj.channel === 'internal') {
|
||||
const { type, body } = obj.message as GlobalEvents['internal']['payload'];
|
||||
switch (type) {
|
||||
case 'followChannel': {
|
||||
this.userFollowingChannelsCache.refresh(body.userId);
|
||||
break;
|
||||
}
|
||||
case 'unfollowChannel': {
|
||||
this.userFollowingChannelsCache.delete(body.userId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public dispose(): void {
|
||||
this.userFollowingChannelsCache.dispose();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public onApplicationShutdown(signal?: string | undefined): void {
|
||||
this.dispose();
|
||||
}
|
||||
}
|
@ -63,6 +63,7 @@ import { SearchService } from './SearchService.js';
|
||||
import { ClipService } from './ClipService.js';
|
||||
import { FeaturedService } from './FeaturedService.js';
|
||||
import { FunoutTimelineService } from './FunoutTimelineService.js';
|
||||
import { ChannelFollowingService } from './ChannelFollowingService.js';
|
||||
import { ChartLoggerService } from './chart/ChartLoggerService.js';
|
||||
import FederationChart from './chart/charts/federation.js';
|
||||
import NotesChart from './chart/charts/notes.js';
|
||||
@ -193,6 +194,7 @@ const $SearchService: Provider = { provide: 'SearchService', useExisting: Search
|
||||
const $ClipService: Provider = { provide: 'ClipService', useExisting: ClipService };
|
||||
const $FeaturedService: Provider = { provide: 'FeaturedService', useExisting: FeaturedService };
|
||||
const $FunoutTimelineService: Provider = { provide: 'FunoutTimelineService', useExisting: FunoutTimelineService };
|
||||
const $ChannelFollowingService: Provider = { provide: 'ChannelFollowingService', useExisting: ChannelFollowingService };
|
||||
|
||||
const $ChartLoggerService: Provider = { provide: 'ChartLoggerService', useExisting: ChartLoggerService };
|
||||
const $FederationChart: Provider = { provide: 'FederationChart', useExisting: FederationChart };
|
||||
@ -327,6 +329,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
|
||||
ClipService,
|
||||
FeaturedService,
|
||||
FunoutTimelineService,
|
||||
ChannelFollowingService,
|
||||
ChartLoggerService,
|
||||
FederationChart,
|
||||
NotesChart,
|
||||
@ -454,6 +457,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
|
||||
$ClipService,
|
||||
$FeaturedService,
|
||||
$FunoutTimelineService,
|
||||
$ChannelFollowingService,
|
||||
$ChartLoggerService,
|
||||
$FederationChart,
|
||||
$NotesChart,
|
||||
@ -582,6 +586,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
|
||||
ClipService,
|
||||
FeaturedService,
|
||||
FunoutTimelineService,
|
||||
ChannelFollowingService,
|
||||
FederationChart,
|
||||
NotesChart,
|
||||
UsersChart,
|
||||
@ -708,6 +713,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
|
||||
$ClipService,
|
||||
$FeaturedService,
|
||||
$FunoutTimelineService,
|
||||
$ChannelFollowingService,
|
||||
$FederationChart,
|
||||
$NotesChart,
|
||||
$UsersChart,
|
||||
|
@ -15,6 +15,7 @@ import { bindThis } from '@/decorators.js';
|
||||
import { CacheService } from '@/core/CacheService.js';
|
||||
import { MiLocalUser } from '@/models/User.js';
|
||||
import { UserService } from '@/core/UserService.js';
|
||||
import { ChannelFollowingService } from '@/core/ChannelFollowingService.js';
|
||||
import { AuthenticateService, AuthenticationError } from './AuthenticateService.js';
|
||||
import MainStreamConnection from './stream/Connection.js';
|
||||
import { ChannelsService } from './stream/ChannelsService.js';
|
||||
@ -39,6 +40,7 @@ export class StreamingApiServerService {
|
||||
private channelsService: ChannelsService,
|
||||
private notificationService: NotificationService,
|
||||
private usersService: UserService,
|
||||
private channelFollowingService: ChannelFollowingService,
|
||||
) {
|
||||
}
|
||||
|
||||
@ -93,6 +95,7 @@ export class StreamingApiServerService {
|
||||
this.noteReadService,
|
||||
this.notificationService,
|
||||
this.cacheService,
|
||||
this.channelFollowingService,
|
||||
user, app,
|
||||
);
|
||||
|
||||
|
@ -5,9 +5,9 @@
|
||||
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { Endpoint } from '@/server/api/endpoint-base.js';
|
||||
import type { ChannelFollowingsRepository, ChannelsRepository } from '@/models/_.js';
|
||||
import { IdService } from '@/core/IdService.js';
|
||||
import type { ChannelsRepository } from '@/models/_.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import { ChannelFollowingService } from '@/core/ChannelFollowingService.js';
|
||||
import { ApiError } from '../../error.js';
|
||||
|
||||
export const meta = {
|
||||
@ -41,11 +41,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||
constructor(
|
||||
@Inject(DI.channelsRepository)
|
||||
private channelsRepository: ChannelsRepository,
|
||||
|
||||
@Inject(DI.channelFollowingsRepository)
|
||||
private channelFollowingsRepository: ChannelFollowingsRepository,
|
||||
|
||||
private idService: IdService,
|
||||
private channelFollowingService: ChannelFollowingService,
|
||||
) {
|
||||
super(meta, paramDef, async (ps, me) => {
|
||||
const channel = await this.channelsRepository.findOneBy({
|
||||
@ -56,11 +52,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||
throw new ApiError(meta.errors.noSuchChannel);
|
||||
}
|
||||
|
||||
await this.channelFollowingsRepository.insert({
|
||||
id: this.idService.gen(),
|
||||
followerId: me.id,
|
||||
followeeId: channel.id,
|
||||
});
|
||||
await this.channelFollowingService.follow(me, channel);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -5,8 +5,9 @@
|
||||
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { Endpoint } from '@/server/api/endpoint-base.js';
|
||||
import type { ChannelFollowingsRepository, ChannelsRepository } from '@/models/_.js';
|
||||
import type { ChannelsRepository } from '@/models/_.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import { ChannelFollowingService } from '@/core/ChannelFollowingService.js';
|
||||
import { ApiError } from '../../error.js';
|
||||
|
||||
export const meta = {
|
||||
@ -40,9 +41,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||
constructor(
|
||||
@Inject(DI.channelsRepository)
|
||||
private channelsRepository: ChannelsRepository,
|
||||
|
||||
@Inject(DI.channelFollowingsRepository)
|
||||
private channelFollowingsRepository: ChannelFollowingsRepository,
|
||||
private channelFollowingService: ChannelFollowingService,
|
||||
) {
|
||||
super(meta, paramDef, async (ps, me) => {
|
||||
const channel = await this.channelsRepository.findOneBy({
|
||||
@ -53,10 +52,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||
throw new ApiError(meta.errors.noSuchChannel);
|
||||
}
|
||||
|
||||
await this.channelFollowingsRepository.delete({
|
||||
followerId: me.id,
|
||||
followeeId: channel.id,
|
||||
});
|
||||
await this.channelFollowingService.unfollow(me, channel);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ import { bindThis } from '@/decorators.js';
|
||||
import { CacheService } from '@/core/CacheService.js';
|
||||
import { MiFollowing, MiUserProfile } from '@/models/_.js';
|
||||
import type { StreamEventEmitter, GlobalEvents } from '@/core/GlobalEventService.js';
|
||||
import { ChannelFollowingService } from '@/core/ChannelFollowingService.js';
|
||||
import type { ChannelsService } from './ChannelsService.js';
|
||||
import type { EventEmitter } from 'events';
|
||||
import type Channel from './channel.js';
|
||||
@ -42,6 +43,7 @@ export default class Connection {
|
||||
private noteReadService: NoteReadService,
|
||||
private notificationService: NotificationService,
|
||||
private cacheService: CacheService,
|
||||
private channelFollowingService: ChannelFollowingService,
|
||||
|
||||
user: MiUser | null | undefined,
|
||||
token: MiAccessToken | null | undefined,
|
||||
@ -56,7 +58,7 @@ export default class Connection {
|
||||
const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes] = await Promise.all([
|
||||
this.cacheService.userProfileCache.fetch(this.user.id),
|
||||
this.cacheService.userFollowingsCache.fetch(this.user.id),
|
||||
this.cacheService.userFollowingChannelsCache.fetch(this.user.id),
|
||||
this.channelFollowingService.userFollowingChannelsCache.fetch(this.user.id),
|
||||
this.cacheService.userMutingsCache.fetch(this.user.id),
|
||||
this.cacheService.userBlockedCache.fetch(this.user.id),
|
||||
this.cacheService.renoteMutingsCache.fetch(this.user.id),
|
||||
|
Loading…
Reference in New Issue
Block a user