Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/social-features-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -2895,7 +2895,7 @@ Each milestone creates only the DB tables and types it needs, and delivers testa
- Admin freezes a climb → no new proposals allowed
- Community status badges appear on climb cards

### Milestone 7: New Climb Feed + Subscriptions
### Milestone 7: New Climb Feed + Subscriptions [COMPLETED]

**User value**: "I can see new climbs being set on my board type and get notified."

Expand Down
29 changes: 29 additions & 0 deletions packages/backend/src/events/feed-fanout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,32 @@ export async function fanoutFeedItems(event: SocialEvent): Promise<void> {
await db.insert(dbSchema.feedItems).values(batch);
}
}

/**
* Fan out new climb feed items to followers of the setter.
*/
export async function fanoutNewClimbFeedItems(event: SocialEvent): Promise<void> {
const followers = await db
.select({ followerId: dbSchema.userFollows.followerId })
.from(dbSchema.userFollows)
.where(eq(dbSchema.userFollows.followingId, event.actorId));

if (followers.length === 0) return;

const metadata = buildFeedItemMetadata(event);

const rows = followers.map((f) => ({
recipientId: f.followerId,
actorId: event.actorId,
type: 'new_climb' as const,
entityType: 'climb' as SocialEntityType,
entityId: event.entityId,
boardUuid: null,
metadata,
}));

for (let i = 0; i < rows.length; i += FANOUT_BATCH_SIZE) {
const batch = rows.slice(i, i + FANOUT_BATCH_SIZE);
await db.insert(dbSchema.feedItems).values(batch);
}
}
131 changes: 130 additions & 1 deletion packages/backend/src/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import { pubsub } from '../pubsub/index';
import { db } from '../db/client';
import * as dbSchema from '@boardsesh/db/schema';
import { eq, and, sql } from 'drizzle-orm';
import { fanoutFeedItems } from './feed-fanout';
import { fanoutFeedItems, fanoutNewClimbFeedItems } from './feed-fanout';
import crypto from 'crypto';
import {
resolveClimbCreatedFollowerRecipients,
resolveClimbCreatedSubscriptionRecipients,
} from './recipient-resolution';

export const eventBroker = new EventBroker();

Expand Down Expand Up @@ -130,6 +134,131 @@ async function createInlineNotification(event: SocialEvent): Promise<void> {
// Multi-recipient notification (all climbers) — handled by NotificationWorker only
return;
}
case 'climb.created': {
const boardType = event.metadata.boardType;
const layoutId = parseInt(event.metadata.layoutId || '0', 10);
if (!boardType || !layoutId) return;

const followerRecipients = await resolveClimbCreatedFollowerRecipients(event.actorId);
const subscriberRecipients = await resolveClimbCreatedSubscriptionRecipients(
boardType,
layoutId,
event.actorId,
);
const followerIds = new Set(followerRecipients.map((r) => r.recipientId));
const recipients = [
...followerRecipients,
...subscriberRecipients.filter((r) => !followerIds.has(r.recipientId)),
].filter((r) => r.recipientId !== event.actorId);

if (recipients.length === 0) {
await fanoutNewClimbFeedItems(event);
return;
}

const actorRows = await db
.select({
name: dbSchema.users.name,
image: dbSchema.users.image,
displayName: dbSchema.userProfiles.displayName,
avatarUrl: dbSchema.userProfiles.avatarUrl,
})
.from(dbSchema.users)
.leftJoin(dbSchema.userProfiles, eq(dbSchema.users.id, dbSchema.userProfiles.userId))
.where(eq(dbSchema.users.id, event.actorId))
.limit(1);
const actor = actorRows[0];

for (const recipient of recipients) {
const uuid = crypto.randomUUID();
await db
.insert(dbSchema.notifications)
.values({
uuid,
recipientId: recipient.recipientId,
actorId: event.actorId,
type: recipient.notificationType,
entityType: 'climb',
entityId: event.entityId,
});

pubsub.publishNotificationEvent(recipient.recipientId, {
notification: {
uuid,
type: recipient.notificationType as NotificationType,
actorId: event.actorId,
actorDisplayName: actor?.displayName || actor?.name || undefined,
actorAvatarUrl: actor?.avatarUrl || actor?.image || undefined,
entityType: 'climb',
entityId: event.entityId,
climbName: event.metadata.climbName || undefined,
climbUuid: event.entityId,
boardType,
isRead: false,
createdAt: new Date().toISOString(),
},
});
}

await fanoutNewClimbFeedItems(event);

const [climb] = await db
.select({
uuid: dbSchema.boardClimbs.uuid,
name: dbSchema.boardClimbs.name,
layoutId: dbSchema.boardClimbs.layoutId,
angle: dbSchema.boardClimbs.angle,
frames: dbSchema.boardClimbs.frames,
createdAt: dbSchema.boardClimbs.createdAt,
setterDisplayName: dbSchema.userProfiles.displayName,
setterAvatarUrl: dbSchema.userProfiles.avatarUrl,
difficultyName: dbSchema.boardDifficultyGrades.boulderName,
})
.from(dbSchema.boardClimbs)
.leftJoin(dbSchema.users, eq(dbSchema.boardClimbs.userId, dbSchema.users.id))
.leftJoin(dbSchema.userProfiles, eq(dbSchema.users.id, dbSchema.userProfiles.userId))
.leftJoin(
dbSchema.boardClimbStats,
and(
eq(dbSchema.boardClimbStats.boardType, dbSchema.boardClimbs.boardType),
eq(dbSchema.boardClimbStats.climbUuid, dbSchema.boardClimbs.uuid),
eq(dbSchema.boardClimbStats.angle, dbSchema.boardClimbs.angle),
),
)
.leftJoin(
dbSchema.boardDifficultyGrades,
and(
eq(dbSchema.boardDifficultyGrades.boardType, dbSchema.boardClimbs.boardType),
eq(dbSchema.boardDifficultyGrades.difficulty, dbSchema.boardClimbStats.displayDifficulty),
),
)
.where(
and(
eq(dbSchema.boardClimbs.uuid, event.entityId),
eq(dbSchema.boardClimbs.boardType, boardType),
),
)
.limit(1);

if (climb) {
const channelKey = `${boardType}:${climb.layoutId}`;
pubsub.publishNewClimbEvent(channelKey, {
climb: {
uuid: climb.uuid,
name: climb.name ?? event.metadata.climbName,
boardType,
layoutId: climb.layoutId,
setterDisplayName: climb.setterDisplayName ?? actor?.displayName ?? actor?.name ?? undefined,
setterAvatarUrl: climb.setterAvatarUrl ?? actor?.avatarUrl ?? actor?.image ?? undefined,
angle: climb.angle ?? null,
frames: climb.frames ?? null,
difficultyName: climb.difficultyName ?? event.metadata.difficultyName ?? null,
createdAt: climb.createdAt ?? new Date().toISOString(),
},
});
}
return;
}
case 'ascent.logged': {
await fanoutFeedItems(event);
// No notification for ascent.logged - it's feed-only
Expand Down
128 changes: 124 additions & 4 deletions packages/backend/src/events/notification-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import {
resolveProposalApprovalRecipients,
resolveProposalRejectionRecipients,
resolveProposalCreatedRecipients,
resolveClimbCreatedFollowerRecipients,
resolveClimbCreatedSubscriptionRecipients,
} from './recipient-resolution';
import { fanoutFeedItems } from './feed-fanout';
import { fanoutFeedItems, fanoutNewClimbFeedItems } from './feed-fanout';
import crypto from 'crypto';

export class NotificationWorker {
Expand Down Expand Up @@ -59,6 +61,9 @@ export class NotificationWorker {
case 'proposal.created':
await this.handleProposalCreated(event);
break;
case 'climb.created':
await this.handleClimbCreated(event);
break;
default:
break;
}
Expand Down Expand Up @@ -236,6 +241,101 @@ export class NotificationWorker {
}
}

/**
* Handle climb.created events: notify followers and layout subscribers,
* fan out feed items, and publish realtime new-climb events.
*/
private async handleClimbCreated(event: SocialEvent): Promise<void> {
const boardType = event.metadata.boardType;
const layoutId = parseInt(event.metadata.layoutId || '0', 10);
const climbName = event.metadata.climbName || '';

if (!boardType || !layoutId) return;

const followerRecipients = await resolveClimbCreatedFollowerRecipients(event.actorId);
const subscriberRecipients = await resolveClimbCreatedSubscriptionRecipients(
boardType,
layoutId,
event.actorId,
);

const followerIds = new Set(followerRecipients.map((r) => r.recipientId));
const allRecipients = [
...followerRecipients,
...subscriberRecipients.filter((r) => !followerIds.has(r.recipientId)),
];

for (const recipient of allRecipients) {
await this.createAndPublishNotification(
recipient.recipientId,
event.actorId,
recipient.notificationType,
'climb',
event.entityId,
);
}

// Fan out feed items to followers only (not global subscribers)
await fanoutNewClimbFeedItems(event);

// Publish realtime new climb event to the board+layout channel
const [climb] = await db
.select({
uuid: dbSchema.boardClimbs.uuid,
name: dbSchema.boardClimbs.name,
layoutId: dbSchema.boardClimbs.layoutId,
angle: dbSchema.boardClimbs.angle,
frames: dbSchema.boardClimbs.frames,
createdAt: dbSchema.boardClimbs.createdAt,
setterDisplayName: dbSchema.userProfiles.displayName,
setterAvatarUrl: dbSchema.userProfiles.avatarUrl,
difficultyName: dbSchema.boardDifficultyGrades.boulderName,
})
.from(dbSchema.boardClimbs)
.leftJoin(dbSchema.users, eq(dbSchema.boardClimbs.userId, dbSchema.users.id))
.leftJoin(dbSchema.userProfiles, eq(dbSchema.users.id, dbSchema.userProfiles.userId))
.leftJoin(
dbSchema.boardClimbStats,
and(
eq(dbSchema.boardClimbStats.boardType, dbSchema.boardClimbs.boardType),
eq(dbSchema.boardClimbStats.climbUuid, dbSchema.boardClimbs.uuid),
eq(dbSchema.boardClimbStats.angle, dbSchema.boardClimbs.angle),
),
)
.leftJoin(
dbSchema.boardDifficultyGrades,
and(
eq(dbSchema.boardDifficultyGrades.boardType, dbSchema.boardClimbs.boardType),
eq(dbSchema.boardDifficultyGrades.difficulty, dbSchema.boardClimbStats.displayDifficulty),
),
)
.where(
and(
eq(dbSchema.boardClimbs.uuid, event.entityId),
eq(dbSchema.boardClimbs.boardType, boardType),
),
)
.limit(1);

if (climb) {
const channelKey = `${boardType}:${climb.layoutId}`;
pubsub.publishNewClimbEvent(channelKey, {
climb: {
uuid: climb.uuid,
name: climb.name ?? climbName,
boardType,
layoutId: climb.layoutId,
setterDisplayName: climb.setterDisplayName ?? event.metadata.setterDisplayName ?? null,
setterAvatarUrl: climb.setterAvatarUrl ?? event.metadata.setterAvatarUrl ?? null,
angle: climb.angle ?? null,
frames: climb.frames ?? null,
difficultyName: climb.difficultyName ?? event.metadata.difficultyName ?? null,
createdAt: climb.createdAt ?? new Date().toISOString(),
},
});
}
}

private async isDuplicate(
actorId: string,
recipientId: string,
Expand Down Expand Up @@ -336,6 +436,26 @@ export class NotificationWorker {
}
}

let climbName: string | undefined;
let climbUuid: string | undefined;
let climbBoardType: string | undefined;

if (type === 'new_climb' || type === 'new_climb_global') {
const [climb] = await db
.select({
name: dbSchema.boardClimbs.name,
boardType: dbSchema.boardClimbs.boardType,
})
.from(dbSchema.boardClimbs)
.where(eq(dbSchema.boardClimbs.uuid, entityId))
.limit(1);
if (climb) {
climbName = climb.name ?? undefined;
climbUuid = entityId;
climbBoardType = climb.boardType;
}
}

return {
uuid,
type,
Expand All @@ -345,9 +465,9 @@ export class NotificationWorker {
entityType: entityType as dbSchema.SocialEntityType,
entityId,
commentBody,
climbName: undefined,
climbUuid: undefined,
boardType: undefined,
climbName,
climbUuid,
boardType: climbBoardType,
isRead: false,
createdAt: new Date().toISOString(),
};
Expand Down
43 changes: 43 additions & 0 deletions packages/backend/src/events/recipient-resolution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,46 @@ export function resolveFollowRecipient(
notificationType: 'new_follower',
};
}

/**
* Resolve recipients when a user creates a climb: all followers of the setter.
*/
export async function resolveClimbCreatedFollowerRecipients(
setterId: string,
): Promise<RecipientInfo[]> {
const followers = await db
.select({ followerId: dbSchema.userFollows.followerId })
.from(dbSchema.userFollows)
.where(eq(dbSchema.userFollows.followingId, setterId));

return followers.map((f) => ({
recipientId: f.followerId,
notificationType: 'new_climb',
}));
}

/**
* Resolve recipients subscribed to a board type + layout for new climb notifications.
*/
export async function resolveClimbCreatedSubscriptionRecipients(
boardType: string,
layoutId: number,
excludeUserId?: string,
): Promise<RecipientInfo[]> {
const rows = await db
.select({ userId: dbSchema.newClimbSubscriptions.userId })
.from(dbSchema.newClimbSubscriptions)
.where(
and(
eq(dbSchema.newClimbSubscriptions.boardType, boardType),
eq(dbSchema.newClimbSubscriptions.layoutId, layoutId),
),
);

return rows
.filter((r) => r.userId !== excludeUserId)
.map((r) => ({
recipientId: r.userId,
notificationType: 'new_climb_global',
}));
}
Loading
Loading