diff --git a/apps/api/src/controllers/ai.controller.ts b/apps/api/src/controllers/ai.controller.ts index 6162d036..02ede1ce 100644 --- a/apps/api/src/controllers/ai.controller.ts +++ b/apps/api/src/controllers/ai.controller.ts @@ -9,7 +9,7 @@ import { } from '@/utils/ai-tools'; import { HttpError } from '@/utils/errors'; import { db, getOrganizationByProjectIdCached } from '@openpanel/db'; -import { getProjectAccessCached } from '@openpanel/trpc/src/access'; +import { getProjectAccess } from '@openpanel/trpc/src/access'; import { type Message, appendResponseMessages, streamText } from 'ai'; import type { FastifyReply, FastifyRequest } from 'fastify'; @@ -37,7 +37,7 @@ export async function chat( } const organization = await getOrganizationByProjectIdCached(projectId); - const access = await getProjectAccessCached({ + const access = await getProjectAccess({ projectId, userId: session.userId, }); diff --git a/apps/api/src/controllers/webhook.controller.ts b/apps/api/src/controllers/webhook.controller.ts index c2e1c432..b164c9c3 100644 --- a/apps/api/src/controllers/webhook.controller.ts +++ b/apps/api/src/controllers/webhook.controller.ts @@ -113,6 +113,17 @@ export async function slackWebhook( } } +async function clearOrganizationCache(organizationId: string) { + const projects = await db.project.findMany({ + where: { + organizationId, + }, + }); + for (const project of projects) { + await getOrganizationByProjectIdCached.clear(project.id); + } +} + export async function polarWebhook( request: FastifyRequest<{ Querystring: unknown; @@ -141,8 +152,11 @@ export async function polarWebhook( }, data: { subscriptionPeriodEventsCount: 0, + subscriptionPeriodEventsCountExceededAt: null, }, }); + + await clearOrganizationCache(metadata.organizationId); } break; } @@ -205,15 +219,7 @@ export async function polarWebhook( }, }); - const projects = await db.project.findMany({ - where: { - organizationId: metadata.organizationId, - }, - }); - - for (const project of projects) { - await getOrganizationByProjectIdCached.clear(project.id); - } + await clearOrganizationCache(metadata.organizationId); await publishEvent('organization', 'subscription_updated', { organizationId: metadata.organizationId, diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index cbf28ee3..7592c9de 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -8,14 +8,18 @@ import Fastify from 'fastify'; import metricsPlugin from 'fastify-metrics'; import { generateId } from '@openpanel/common'; -import type { IServiceClientWithProject } from '@openpanel/db'; -import { getRedisPub } from '@openpanel/redis'; +import { + type IServiceClientWithProject, + runWithAlsSession, +} from '@openpanel/db'; +import { getCache, getRedisPub } from '@openpanel/redis'; import type { AppRouter } from '@openpanel/trpc'; import { appRouter, createContext } from '@openpanel/trpc'; import { EMPTY_SESSION, type SessionValidationResult, + decodeSessionToken, validateSessionToken, } from '@openpanel/auth'; import sourceMapSupport from 'source-map-support'; @@ -140,7 +144,14 @@ const startServer = async () => { instance.addHook('onRequest', async (req) => { if (req.cookies?.session) { try { - const session = await validateSessionToken(req.cookies.session); + const sessionId = decodeSessionToken(req.cookies.session); + const session = await runWithAlsSession(sessionId, () => + sessionId + ? getCache(`validateSession:${sessionId}`, 60 * 5, async () => + validateSessionToken(req.cookies.session), + ) + : validateSessionToken(req.cookies.session), + ); if (session.session) { req.session = session; } diff --git a/apps/start/src/hooks/use-session-extension.ts b/apps/start/src/hooks/use-session-extension.ts index fa33e48b..c27163af 100644 --- a/apps/start/src/hooks/use-session-extension.ts +++ b/apps/start/src/hooks/use-session-extension.ts @@ -24,9 +24,11 @@ export function useSessionExtension() { 1000 * 60 * 5, ); - extendSessionFn(); + // Delay initial call a bit to prioritize other requests + const timer = setTimeout(() => extendSessionFn(), 5000); return () => { + clearTimeout(timer); if (intervalRef.current) { clearInterval(intervalRef.current); } diff --git a/packages/auth/src/session.ts b/packages/auth/src/session.ts index 359b3f99..3d4bcebc 100644 --- a/packages/auth/src/session.ts +++ b/packages/auth/src/session.ts @@ -59,8 +59,14 @@ export async function createDemoSession( }; } +export const decodeSessionToken = (token: string): string | null => { + return token + ? encodeHexLowerCase(sha256(new TextEncoder().encode(token))) + : null; +}; + export async function validateSessionToken( - token: string | null, + token: string | null | undefined, ): Promise { if (process.env.DEMO_USER_ID) { return createDemoSession(process.env.DEMO_USER_ID); @@ -69,7 +75,10 @@ export async function validateSessionToken( if (!token) { return EMPTY_SESSION; } - const sessionId = encodeHexLowerCase(sha256(new TextEncoder().encode(token))); + const sessionId = decodeSessionToken(token); + if (!sessionId) { + return EMPTY_SESSION; + } const result = await db.$primary().session.findUnique({ where: { id: sessionId, diff --git a/packages/db/index.ts b/packages/db/index.ts index 623eb6ee..522f1299 100644 --- a/packages/db/index.ts +++ b/packages/db/index.ts @@ -23,3 +23,4 @@ export * from './src/buffers'; export * from './src/types'; export * from './src/clickhouse/query-builder'; export * from './src/services/overview.service'; +export * from './src/session-context'; diff --git a/packages/db/src/logger.ts b/packages/db/src/logger.ts new file mode 100644 index 00000000..6e023bf6 --- /dev/null +++ b/packages/db/src/logger.ts @@ -0,0 +1,3 @@ +import { createLogger } from '@openpanel/logger'; + +export const logger = createLogger({ name: 'db:prisma' }); diff --git a/packages/db/src/prisma-client.ts b/packages/db/src/prisma-client.ts index e7e2d86a..57360d53 100644 --- a/packages/db/src/prisma-client.ts +++ b/packages/db/src/prisma-client.ts @@ -1,11 +1,15 @@ import { createLogger } from '@openpanel/logger'; import { readReplicas } from '@prisma/extension-read-replicas'; -import { type Organization, PrismaClient } from './generated/prisma/client'; +import { + type Organization, + Prisma, + PrismaClient, +} from './generated/prisma/client'; +import { logger } from './logger'; +import { sessionConsistency } from './session-consistency'; export * from './generated/prisma/client'; -const logger = createLogger({ name: 'db' }); - const isWillBeCanceled = ( organization: Pick< Organization, @@ -30,11 +34,6 @@ const getPrismaClient = () => { const prisma = new PrismaClient({ log: ['error'], }) - .$extends( - readReplicas({ - url: process.env.DATABASE_URL_REPLICA ?? process.env.DATABASE_URL!, - }), - ) .$extends({ query: { async $allOperations({ operation, model, args, query }) { @@ -53,6 +52,8 @@ const getPrismaClient = () => { }, }, }) + + .$extends(sessionConsistency()) .$extends({ result: { organization: { @@ -258,7 +259,12 @@ const getPrismaClient = () => { }, }, }, - }); + }) + .$extends( + readReplicas({ + url: process.env.DATABASE_URL_REPLICA ?? process.env.DATABASE_URL!, + }), + ); return prisma; }; diff --git a/packages/db/src/services/organization.service.ts b/packages/db/src/services/organization.service.ts index e20c6185..5a3732ee 100644 --- a/packages/db/src/services/organization.service.ts +++ b/packages/db/src/services/organization.service.ts @@ -61,7 +61,7 @@ export async function getOrganizationByProjectId(projectId: string) { export const getOrganizationByProjectIdCached = cacheable( getOrganizationByProjectId, - 60 * 60 * 24, + 60 * 5, ); export async function getInvites(organizationId: string) { diff --git a/packages/db/src/session-consistency.ts b/packages/db/src/session-consistency.ts new file mode 100644 index 00000000..b833746a --- /dev/null +++ b/packages/db/src/session-consistency.ts @@ -0,0 +1,224 @@ +import { getRedisCache } from '@openpanel/redis'; +import type { Operation } from '@prisma/client/runtime/client'; +import { Prisma, type PrismaClient } from './generated/prisma/client'; +import { logger } from './logger'; +import { getAlsSessionId } from './session-context'; + +type BarePrismaClient = { + $queryRaw: (query: TemplateStringsArray, ...args: unknown[]) => Promise; +}; + +// WAL LSN tracking for read-after-write consistency +const LSN_CACHE_PREFIX = 'db:wal_lsn:'; +const LSN_CACHE_TTL = 5; +const MAX_RETRY_ATTEMPTS = 5; +const INITIAL_RETRY_DELAY_MS = 10; + +const READ_OPERATIONS: Operation[] = [ + 'findUnique', + 'findUniqueOrThrow', + 'findFirst', + 'findFirstOrThrow', + 'findMany', + 'aggregate', + 'groupBy', + 'count', +]; + +const WRITE_OPERATIONS: Operation[] = [ + 'create', + 'update', + 'delete', + 'createMany', + 'createManyAndReturn', + 'updateMany', + 'deleteMany', + 'upsert', +]; + +const isWriteOperation = (operation: string) => + WRITE_OPERATIONS.includes(operation as Operation); + +const isReadOperation = (operation: string) => + READ_OPERATIONS.includes(operation as Operation); + +async function getCurrentWalLsn( + prismaClient: BarePrismaClient, +): Promise { + try { + const result = await prismaClient.$queryRaw<[{ lsn: string }]>` + SELECT pg_current_wal_lsn()::text AS lsn + `; + return result[0]?.lsn || null; + } catch (error) { + logger.error('Failed to get WAL LSN', { error }); + return null; + } +} + +async function cacheWalLsnForSession( + sessionId: string, + lsn: string, +): Promise { + try { + const redis = getRedisCache(); + await redis.setex(`${LSN_CACHE_PREFIX}${sessionId}`, LSN_CACHE_TTL, lsn); + } catch (error) { + logger.error('Failed to cache WAL LSN', { error, sessionId }); + } +} + +async function getCachedWalLsn(sessionId: string): Promise { + try { + const redis = getRedisCache(); + return await redis.get(`${LSN_CACHE_PREFIX}${sessionId}`); + } catch (error) { + logger.error('Failed to get cached WAL LSN', { error, sessionId }); + return null; + } +} + +function compareWalLsn(lsn1: string, lsn2: string): number { + const [x1, y1] = lsn1.split('/').map((x) => BigInt(`0x${x}`)); + const [x2, y2] = lsn2.split('/').map((x) => BigInt(`0x${x}`)); + + const v1 = ((x1 ?? 0n) << 32n) + (y1 ?? 0n); + const v2 = ((x2 ?? 0n) << 32n) + (y2 ?? 0n); + + if (v1 < v2) return -1; + if (v1> v2) return 1; + return 0; +} + +async function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function waitForReplicaCatchup( + prismaClient: BarePrismaClient, + sessionId: string, +): Promise { + const expectedLsn = await getCachedWalLsn(sessionId); + + if (!expectedLsn) { + return true; + } + + for (let attempt = 0; attempt < MAX_RETRY_ATTEMPTS; attempt++) { + const currentLsn = await getCurrentWalLsn(prismaClient); + if (!currentLsn) { + return true; + } + + // Check if replica has caught up (current>= expected) + if (compareWalLsn(currentLsn, expectedLsn)>= 0) { + logger.debug('Replica caught up', { + attempt: attempt + 1, + currentLsn, + expectedLsn, + sessionId, + }); + return true; + } + + // Exponential backoff + if (attempt < MAX_RETRY_ATTEMPTS - 1) { + const delayMs = INITIAL_RETRY_DELAY_MS * 2 ** attempt; + logger.debug('Waiting for replica to catch up', { + attempt: attempt + 1, + delayMs, + currentLsn, + expectedLsn, + sessionId, + }); + await sleep(delayMs); + } + } + + logger.warn( + 'Replica did not catch up after max retries, falling back to primary', + { + sessionId, + expectedLsn, + }, + ); + return false; +} + +/** + * Prisma extension for session-based read-after-write consistency. + * + * This extension tracks WAL LSN positions after writes and ensures that + * subsequent reads within the same session see those writes, even when + * using read replicas. + * + * How it works: + * 1. After any write operation with a session ID, it captures the WAL LSN + * 2. Before read operations with a session ID, it checks if the replica has caught up + * 3. If the replica hasn't caught up after retries, it forces the read to the primary + * + */ +export function sessionConsistency() { + return Prisma.defineExtension((client) => + client.$extends({ + name: 'session-consistency', + query: { + $allOperations: async ({ + operation, + model, + args, + query, + // This is a hack to force reads to primary when replica hasn't caught up. + // The readReplicas extension routes queries to primary when in a transaction, + // so we set __internalParams.transaction = true to achieve this. + // @ts-expect-error - __internalParams is not in the types + __internalParams, + }) => { + const sessionId = getAlsSessionId(); + + // For write operations with session: cache WAL LSN after write + if (isWriteOperation(operation)) { + logger.info('Prisma operation', { + operation, + args, + model, + }); + + const result = await query(args); + + if (sessionId) { + // Get current WAL LSN and cache it for this session + const lsn = await getCurrentWalLsn(client); + if (lsn) { + await cacheWalLsnForSession(sessionId, lsn); + logger.debug('Cached WAL LSN after write', { + sessionId, + lsn, + operation, + model, + }); + } + } + + return result; + } + + // For read operations with session: try replica first, fallback to primary + if (isReadOperation(operation) && sessionId) { + const replicaCaughtUp = await waitForReplicaCatchup( + client, + sessionId, + ); + + if (!replicaCaughtUp) { + // This will force readReplicas extension to use primary + __internalParams.transaction = true; + } + } + + return query(args); + }, + }, + }), + ); +} diff --git a/packages/db/src/session-context.ts b/packages/db/src/session-context.ts new file mode 100644 index 00000000..275287bb --- /dev/null +++ b/packages/db/src/session-context.ts @@ -0,0 +1,12 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; + +type Ctx = { sessionId: string | null }; + +export const als = new AsyncLocalStorage(); + +export const runWithAlsSession = ( + sid: string | null | undefined, + fn: () => Promise, +) => als.run({ sessionId: sid || null }, fn); + +export const getAlsSessionId = () => als.getStore()?.sessionId ?? null; diff --git a/packages/redis/cachable.ts b/packages/redis/cachable.ts index ae59750d..55221d33 100644 --- a/packages/redis/cachable.ts +++ b/packages/redis/cachable.ts @@ -1,5 +1,9 @@ import { getRedisCache } from './redis'; +export const deleteCache = async (key: string) => { + return getRedisCache().del(key); +}; + export async function getCache( key: string, expireInSec: number, diff --git a/packages/trpc/src/access.ts b/packages/trpc/src/access.ts index 0cbfefe1..64d9249e 100644 --- a/packages/trpc/src/access.ts +++ b/packages/trpc/src/access.ts @@ -1,64 +1,67 @@ import { db, getProjectById } from '@openpanel/db'; import { cacheable } from '@openpanel/redis'; -export const getProjectAccessCached = cacheable(getProjectAccess, 60 * 5); -export async function getProjectAccess({ - userId, - projectId, -}: { - userId: string; - projectId: string; -}) { - try { - // Check if user has access to the project - const project = await getProjectById(projectId); - if (!project?.organizationId) { - return false; - } +export const getProjectAccess = cacheable( + 'getProjectAccess', + async ({ + userId, + projectId, + }: { + userId: string; + projectId: string; + }) => { + try { + // Check if user has access to the project + const project = await getProjectById(projectId); + if (!project?.organizationId) { + return false; + } - const [projectAccess, member] = await Promise.all([ - db.projectAccess.findMany({ - where: { - userId, - organizationId: project.organizationId, - }, - }), - db.member.findFirst({ - where: { - organizationId: project.organizationId, - userId, - }, - }), - ]); + const [projectAccess, member] = await Promise.all([ + db.$primary().projectAccess.findMany({ + where: { + userId, + organizationId: project.organizationId, + }, + }), + db.$primary().member.findFirst({ + where: { + organizationId: project.organizationId, + userId, + }, + }), + ]); - if (projectAccess.length === 0 && member) { - return true; - } + if (projectAccess.length === 0 && member) { + return true; + } - return projectAccess.find((item) => item.projectId === projectId); - } catch (err) { - return false; - } -} + return projectAccess.find((item) => item.projectId === projectId); + } catch (err) { + return false; + } + }, + 60 * 5, +); -export const getOrganizationAccessCached = cacheable( - getOrganizationAccess, +export const getOrganizationAccess = cacheable( + 'getOrganizationAccess', + async ({ + userId, + organizationId, + }: { + userId: string; + organizationId: string; + }) => { + return db.$primary().member.findFirst({ + where: { + userId, + organizationId, + }, + }); + }, 60 * 5, ); -export async function getOrganizationAccess({ - userId, - organizationId, -}: { - userId: string; - organizationId: string; -}) { - return db.member.findFirst({ - where: { - userId, - organizationId, - }, - }); -} export const getClientAccessCached = cacheable(getClientAccess, 60 * 5); export async function getClientAccess({ diff --git a/packages/trpc/src/routers/auth.ts b/packages/trpc/src/routers/auth.ts index 5d139d3e..ea4d7f8d 100644 --- a/packages/trpc/src/routers/auth.ts +++ b/packages/trpc/src/routers/auth.ts @@ -20,6 +20,7 @@ import { getUserAccount, } from '@openpanel/db'; import { sendEmail } from '@openpanel/email'; +import { deleteCache } from '@openpanel/redis'; import { zRequestResetPassword, zResetPassword, @@ -74,6 +75,7 @@ export const authRouter = createTRPCRouter({ deleteSessionTokenCookie(ctx.setCookie); if (ctx.session?.session?.id) { await invalidateSession(ctx.session.session.id); + await deleteCache(`validateSession:${ctx.session.session.id}`); } }), signInOAuth: publicProcedure @@ -333,6 +335,7 @@ export const authRouter = createTRPCRouter({ const session = await validateSessionToken(token); if (session.session) { + await deleteCache(`validateSession:${session.session.id}`); // Re-set the cookie with updated expiration setSessionTokenCookie(ctx.setCookie, token, session.session.expiresAt); return { diff --git a/packages/trpc/src/routers/chart.ts b/packages/trpc/src/routers/chart.ts index 6fb30572..a86215b7 100644 --- a/packages/trpc/src/routers/chart.ts +++ b/packages/trpc/src/routers/chart.ts @@ -32,7 +32,7 @@ import { differenceInWeeks, formatISO, } from 'date-fns'; -import { getProjectAccessCached } from '../access'; +import { getProjectAccess } from '../access'; import { TRPCAccessError } from '../errors'; import { cacheMiddleware, @@ -363,7 +363,7 @@ export const chartRouter = createTRPCRouter({ .input(zChartInput) .query(async ({ input, ctx }) => { if (ctx.session.userId) { - const access = await getProjectAccessCached({ + const access = await getProjectAccess({ projectId: input.projectId, userId: ctx.session.userId, }); diff --git a/packages/trpc/src/routers/event.ts b/packages/trpc/src/routers/event.ts index d710df97..cd050dfa 100644 --- a/packages/trpc/src/routers/event.ts +++ b/packages/trpc/src/routers/event.ts @@ -26,7 +26,7 @@ import { } from '@openpanel/validation'; import { clone } from 'ramda'; -import { getProjectAccessCached } from '../access'; +import { getProjectAccess } from '../access'; import { TRPCAccessError } from '../errors'; import { createTRPCRouter, protectedProcedure, publicProcedure } from '../trpc'; @@ -262,7 +262,7 @@ export const eventRouter = createTRPCRouter({ ) .query(async ({ input: { projectId, cursor, limit }, ctx }) => { if (ctx.session.userId) { - const access = await getProjectAccessCached({ + const access = await getProjectAccess({ projectId, userId: ctx.session.userId, }); diff --git a/packages/trpc/src/routers/integration.ts b/packages/trpc/src/routers/integration.ts index e2dce666..a4e42d96 100644 --- a/packages/trpc/src/routers/integration.ts +++ b/packages/trpc/src/routers/integration.ts @@ -9,7 +9,7 @@ import { zCreateSlackIntegration, zCreateWebhookIntegration, } from '@openpanel/validation'; -import { getOrganizationAccessCached } from '../access'; +import { getOrganizationAccess } from '../access'; import { TRPCAccessError } from '../errors'; import { createTRPCRouter, protectedProcedure } from '../trpc'; @@ -23,7 +23,7 @@ export const integrationRouter = createTRPCRouter({ }, }); - const access = await getOrganizationAccessCached({ + const access = await getOrganizationAccess({ userId: ctx.session.userId, organizationId: integration.organizationId, }); @@ -124,7 +124,7 @@ export const integrationRouter = createTRPCRouter({ }, }); - const access = await getOrganizationAccessCached({ + const access = await getOrganizationAccess({ userId: ctx.session.userId, organizationId: integration.organizationId, }); diff --git a/packages/trpc/src/trpc.ts b/packages/trpc/src/trpc.ts index a2080d68..1ffd2836 100644 --- a/packages/trpc/src/trpc.ts +++ b/packages/trpc/src/trpc.ts @@ -4,18 +4,15 @@ import { has } from 'ramda'; import superjson from 'superjson'; import { ZodError } from 'zod'; -import { - COOKIE_OPTIONS, - EMPTY_SESSION, - validateSessionToken, -} from '@openpanel/auth'; -import { getCache, getRedisCache } from '@openpanel/redis'; +import { COOKIE_OPTIONS, type SessionValidationResult } from '@openpanel/auth'; +import { runWithAlsSession } from '@openpanel/db'; +import { getRedisCache } from '@openpanel/redis'; import type { ISetCookie } from '@openpanel/validation'; import { createTrpcRedisLimiter, defaultFingerPrint, } from '@trpc-limiter/redis'; -import { getOrganizationAccessCached, getProjectAccessCached } from './access'; +import { getOrganizationAccess, getProjectAccess } from './access'; import { TRPCAccessError } from './errors'; export const rateLimitMiddleware = ({ @@ -44,10 +41,6 @@ export async function createContext({ req, res }: CreateFastifyContextOptions) { }); }; - const session = cookies?.session - ? await validateSessionToken(cookies.session!) - : EMPTY_SESSION; - if (process.env.NODE_ENV !== 'production') { await new Promise((res) => setTimeout(() => res(1), Math.min(Math.random() * 500, 200)), @@ -57,7 +50,7 @@ export async function createContext({ req, res }: CreateFastifyContextOptions) { return { req, res, - session, + session: (req as any).session as SessionValidationResult, // we do not get types for `setCookie` from fastify // so define it here and be safe in routers setCookie, @@ -102,37 +95,39 @@ const enforceUserIsAuthed = t.middleware(async ({ ctx, next }) => { // Only used on protected routes const enforceAccess = t.middleware(async ({ ctx, next, type, getRawInput }) => { - const rawInput = await getRawInput(); - if (type === 'mutation' && process.env.DEMO_USER_ID) { - throw new TRPCError({ - code: 'UNAUTHORIZED', - message: 'You are not allowed to do this in demo mode', - }); - } + return runWithAlsSession(ctx.session.session?.id, async () => { + const rawInput = await getRawInput(); + if (type === 'mutation' && process.env.DEMO_USER_ID) { + throw new TRPCError({ + code: 'UNAUTHORIZED', + message: 'You are not allowed to do this in demo mode', + }); + } - if (has('projectId', rawInput)) { - const access = await getProjectAccessCached({ - userId: ctx.session.userId!, - projectId: rawInput.projectId as string, - }); + if (has('projectId', rawInput)) { + const access = await getProjectAccess({ + userId: ctx.session.userId!, + projectId: rawInput.projectId as string, + }); - if (!access) { - throw TRPCAccessError('You do not have access to this project'); + if (!access) { + throw TRPCAccessError('You do not have access to this project'); + } } - } - if (has('organizationId', rawInput)) { - const access = await getOrganizationAccessCached({ - userId: ctx.session.userId!, - organizationId: rawInput.organizationId as string, - }); + if (has('organizationId', rawInput)) { + const access = await getOrganizationAccess({ + userId: ctx.session.userId!, + organizationId: rawInput.organizationId as string, + }); - if (!access) { - throw TRPCAccessError('You do not have access to this organization'); + if (!access) { + throw TRPCAccessError('You do not have access to this organization'); + } } - } - return next(); + return next(); + }); }); export const createTRPCRouter = t.router; @@ -157,11 +152,21 @@ const loggerMiddleware = t.middleware( }, ); -export const publicProcedure = t.procedure.use(loggerMiddleware); +const sessionScopeMiddleware = t.middleware(async ({ ctx, next }) => { + const sessionId = ctx.session.session?.id ?? null; + return runWithAlsSession(sessionId, async () => { + return next(); + }); +}); + +export const publicProcedure = t.procedure + .use(loggerMiddleware) + .use(sessionScopeMiddleware); export const protectedProcedure = t.procedure .use(enforceUserIsAuthed) .use(enforceAccess) - .use(loggerMiddleware); + .use(loggerMiddleware) + .use(sessionScopeMiddleware); const middlewareMarker = 'middlewareMarker' as 'middlewareMarker' & { __brand: 'middlewareMarker';

AltStyle によって変換されたページ (->オリジナル) /