-
Notifications
You must be signed in to change notification settings - Fork 242
new importer #214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
new importer #214
Conversation
WalkthroughImplements a comprehensive provider-based import system for external analytics sources, including a new importer package, database schema updates (PostgreSQL and ClickHouse), worker job handler, TRPC API routes, validation schemas, and frontend UI for managing data imports from Umami, Plausible, and Mixpanel. Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant UI as Frontend<br/>(AddImport Modal)
participant TRPC as TRPC Router<br/>(import.create)
participant DB as PostgreSQL<br/>(imports table)
participant Queue as Redis Queue<br/>(importQueue)
participant Worker as Import Worker
participant Provider as BaseImportProvider<br/>(e.g., Umami)
participant Staging as ClickHouse<br/>(events_imports)
participant Prod as ClickHouse<br/>(events)
User->>UI: Configure import (provider, config)
UI->>TRPC: import.create({provider, config})
TRPC->>DB: Create import record (status: pending)
DB-->>TRPC: Return importId
TRPC->>Queue: Enqueue job with importId
TRPC->>DB: Update import with jobId
TRPC-->>UI: Success toast, refresh list
Note over Worker: Job Processing
Worker->>DB: Load import record
Worker->>Provider: Initialize provider (Umami, Plausible, Mixpanel)
rect rgb(200, 220, 240)
Note over Worker,Staging: Phase 1: Load & Batch
Worker->>Provider: parseSource() async generator
Provider-->>Worker: Yields raw events (stream)
Worker->>Provider: validate(rawEvent)
Worker->>Provider: transformEvent(rawEvent)
Worker->>Staging: insertImportBatch (chunk of 5000)
Staging-->>Worker: Return batch result
Worker->>DB: updateImportProgress()
end
rect rgb(220, 240, 220)
Note over Worker,Prod: Phase 2-4: Monthly Processing
loop For each month-spanning slice
Worker->>Staging: reconstructSessions(importId, from, to)
Staging->>Staging: Build session_start/end events
Staging-->>Worker: Sessions reconstructed
Worker->>Staging: migrateImportToProduction(importId, from, to)
Staging->>Prod: Bulk insert migrated events
Prod-->>Staging: Events migrated
Worker->>DB: updateImportProgress()
end
end
rect rgb(240, 220, 220)
Note over Worker,DB: Phase 5: Completion
Worker->>DB: markImportComplete()
DB-->>Worker: Status: completed
Worker->>DB: getImportProgress()
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes The changes introduce a multi-layered import system spanning databases (PostgreSQL + ClickHouse), provider abstraction with generics, worker job orchestration, API routes with access control, and frontend UI. While individual cohorts follow consistent patterns (straightforward schema migrations, service functions, TRPC procedures), the heterogeneity of components and density of new abstractions (BaseImportProvider lifecycle, session reconstruction logic, batch staging workflow) demands careful cross-layer reasoning and validation of end-to-end data flow. Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 28
🧹 Nitpick comments (35)
apps/start/src/modals/index.tsx (1)
8-8: Consider lazy‐loading AddImport to keep initial bundle slimAddImport brings TRPC/React Query; load it on demand.
Apply:
+import { lazy } from 'react'; -import AddImport from './add-import'; +const AddImport = lazy(() => import('./add-import'));Ensure ModalProvider or the modal surface renders within a .
Also applies to: 41-41
packages/importer/src/base-provider.ts (1)
7-8: Make core hooks async‐friendly and identifiers immutableAllow providers to perform async transforms/validation; mark identifiers readonly.
export abstract class BaseImportProvider< TRawEvent extends BaseRawEvent = BaseRawEvent, > { - abstract provider: string; - abstract version: string; + abstract readonly provider: string; + abstract readonly version: string; + + // Utility type to allow sync or async implementations + export type MaybePromise<T> = T | Promise<T>; /** * Stream-read and parse source (file/API) → yields raw events * This should be implemented as an async generator to handle large files efficiently */ abstract parseSource(): AsyncGenerator<TRawEvent, void, unknown>; /** * Convert provider format → IClickhouseEvent */ - abstract transformEvent(rawEvent: TRawEvent): IClickhouseEvent; + abstract transformEvent(rawEvent: TRawEvent): MaybePromise<IClickhouseEvent>; /** * Validate raw event structure */ - abstract validate(rawEvent: TRawEvent): boolean; + abstract validate(rawEvent: TRawEvent): MaybePromise<boolean>;Also applies to: 19-24
packages/importer/src/utils/referrer-parser.ts (3)
5-35: Type the map and normalize hosts; reduce duplicatesStrong types + normalization avoid listing www/m. variants twice and improve coverage.
-import { stripTrailingSlash } from '@openpanel/common'; +import { stripTrailingSlash } from '@openpanel/common'; +type ReferrerType = 'search' | 'social' | 'referral'; +type ReferrerEntry = { type: ReferrerType; name: string }; +const normalizeHost = (h: string) => h.replace(/^(www\.|m\.)/i, '').toLowerCase(); -const referrers: Record<string, { type: string; name: string }> = { +const referrers: Record<string, ReferrerEntry> = { // Search engines - 'google.com': { type: 'search', name: 'Google' }, - 'www.google.com': { type: 'search', name: 'Google' }, + 'google.com': { type: 'search', name: 'Google' }, 'bing.com': { type: 'search', name: 'Bing' }, - 'www.bing.com': { type: 'search', name: 'Bing' }, 'yahoo.com': { type: 'search', name: 'Yahoo!' }, - 'www.yahoo.com': { type: 'search', name: 'Yahoo!' }, 'duckduckgo.com': { type: 'search', name: 'DuckDuckGo' }, - 'www.duckduckgo.com': { type: 'search', name: 'DuckDuckGo' }, 'baidu.com': { type: 'search', name: 'Baidu' }, - 'www.baidu.com': { type: 'search', name: 'Baidu' }, // Social media - 'facebook.com': { type: 'social', name: 'Facebook' }, - 'www.facebook.com': { type: 'social', name: 'Facebook' }, + 'facebook.com': { type: 'social', name: 'Facebook' }, 'twitter.com': { type: 'social', name: 'Twitter' }, - 'www.twitter.com': { type: 'social', name: 'Twitter' }, 'x.com': { type: 'social', name: 'X' }, - 'www.x.com': { type: 'social', name: 'X' }, 'linkedin.com': { type: 'social', name: 'LinkedIn' }, - 'www.linkedin.com': { type: 'social', name: 'LinkedIn' }, 'instagram.com': { type: 'social', name: 'Instagram' }, - 'www.instagram.com': { type: 'social', name: 'Instagram' }, 'tiktok.com': { type: 'social', name: 'TikTok' }, - 'www.tiktok.com': { type: 'social', name: 'TikTok' }, 'youtube.com': { type: 'social', name: 'YouTube' }, - 'www.youtube.com': { type: 'social', name: 'YouTube' }, 'reddit.com': { type: 'social', name: 'Reddit' }, - 'www.reddit.com': { type: 'social', name: 'Reddit' }, + 't.co': { type: 'social', name: 'Twitter' }, + 'lnkd.in': { type: 'social', name: 'LinkedIn' }, }; export function parseReferrer(url: string | undefined) { - const hostname = getHostname(url); - const match = referrers[hostname] ?? referrers[hostname.replace('www.', '')]; + const hostname = normalizeHost(getHostname(url)); + const match = referrers[hostname];Also applies to: 49-52
49-57: Guard output URL to http(s) onlyAvoid echoing non-web schemes (e.g., javascript:) back to callers.
export function parseReferrer(url: string | undefined) { const hostname = getHostname(url); const match = referrers[hostname] ?? referrers[hostname.replace('www.', '')]; - return { + let safeUrl = ''; + try { + if (url) { + const u = new URL(url); + if (u.protocol === 'http:' || u.protocol === 'https:') { + safeUrl = stripTrailingSlash(url); + } + } + } catch { + // ignore + } + + return { name: match?.name ?? '', type: match?.type ?? 'referral', - url: stripTrailingSlash(url ?? ''), + url: safeUrl, }; }
60-91: Minor: normalize source before lookup and keep types consistentLower-case once; reuse normalizeHost for direct key match; return typed result.
-export function getReferrerWithQuery( - query: Record<string, string> | undefined, -) { +export function getReferrerWithQuery( + query: Record<string, string> | undefined, +): { name: string; type: ReferrerType; url: string } | null { if (!query) { return null; } - const source = query.utm_source ?? query.ref ?? query.utm_referrer ?? ''; + const source = (query.utm_source ?? query.ref ?? query.utm_referrer ?? '').trim(); if (source === '') { return null; } - const match = - Object.values(referrers).find( - (referrer) => referrer.name.toLowerCase() === source.toLowerCase(), - ) || referrers[source]; + const lower = source.toLowerCase(); + const match = + Object.values(referrers).find((r) => r.name.toLowerCase() === lower) || + referrers[normalizeHost(lower)]; if (match) { return { name: match.name, type: match.type, url: '', }; }packages/db/prisma/migrations/20251018214030_fix/migration.sql (1)
1-13: Consider more descriptive migration naming.The migration name "fix" is not descriptive. Consider renaming to something like "remove_redundant_import_columns" or "migrate_import_config_to_json" to better indicate the purpose of this schema change.
packages/db/code-migrations/5-add-imports-table.sql (1)
32-32: Consider using an Enum for import_status.The
import_statusfield usesLowCardinality(String)with a default of 'pending'. For better type safety and query performance, consider using an Enum type instead:`import_status` LowCardinality(Enum8('pending' = 1, 'processing' = 2, 'completed' = 3, 'failed' = 4)) DEFAULT 'pending'This ensures only valid status values can be stored and provides better compression.
packages/db/prisma/schema.prisma (1)
470-486: Consider using an enum for the status field.The
statusfield is defined as aStringwith a comment indicating it should be one of'pending' | 'processing' | 'completed' | 'failed'. For better type safety and database constraints, define a Prisma enum:enum ImportStatus { pending processing completed failed } model Import { // ... status ImportStatus // instead of String // ... }This provides compile-time type checking and database-level validation.
packages/trpc/src/root.ts (1)
7-7: Router key named "import" (reserved keyword) — consider renaming for DX.Valid in JS/TS, but can confuse tooling/usages (e.g., appRouter.import vs dynamic import()). Consider renaming to imports/importer for clarity.
Also applies to: 44-44
packages/queue/src/queues.ts (1)
163-176: Harden import jobs: add attempts/backoff/timeout; optionally expose QueueEvents.Imports call external providers; transient failures are expected. Provide sensible defaults and metrics hook.
Apply:
export const importQueue = new Queue<ImportQueuePayload>('import', { connection: getRedisQueue(), defaultJobOptions: { removeOnComplete: 10, - removeOnFail: 50, + removeOnFail: 50, + attempts: 3, + backoff: { type: 'exponential', delay: 30_000 }, + timeout: 1000 * 60 * 30, // 30 min; tune as needed }, }); + +// Optional: enable events for monitoring dashboards/Bull Board progress +export const importQueueEvents = new QueueEvents('import', { + connection: getRedisQueue(), +});apps/worker/src/boot-workers.ts (3)
138-142: Also drain import queue before shutdown to avoid cutting imports mid-run.- await waitForQueueToEmpty(cronQueue); + await waitForQueueToEmpty(cronQueue); + await waitForQueueToEmpty(importQueue, 10 * 60_000); // 10 min cap; tune as needed
155-167: Dev path skips graceful shutdown — can leave jobs in inconsistent state.Unify behavior and call the same exit handler in dev.
- process.on(evt, (code) => { - if (process.env.NODE_ENV === 'production') { - exitHandler(evt, code); - } else { - logger.info('Shutting down for development', { - event: evt, - code, - }); - process.exit(0); - } - }); + process.on(evt, (code) => { + exitHandler(evt, code ?? 0); + });
4-13: Remove unused imports and dead/commented code for clarity.
eventsGroupQueue,GroupWorker,eventsJob,incomingEventPureare no longer used; commented worker block can be dropped.Also applies to: 18-23, 33-48
packages/trpc/src/routers/import.ts (3)
121-131: Guard job removal on delete with error handling (active/locked jobs can throw).const job = await importQueue.getJob(importRecord.jobId); - if (job) { - await job.remove(); - } + if (job) { + try { + await job.remove(); + } catch (e) { + // best-effort removal; proceed with deletion + } + }
165-171: Retry should set stable jobId and attempts/backoff like create.- const job = await importQueue.add('import', { - type: 'import', - payload: { - importId: importRecord.id, - }, - }); + const job = await importQueue.add( + 'import', + { type: 'import', payload: { importId: importRecord.id } }, + { jobId: importRecord.id, attempts: 3, backoff: { type: 'exponential', delay: 30_000 } }, + );
3-6: Unused type import (IImportConfig).
IImportConfigis not referenced. Remove to satisfy lint.packages/importer/src/index.ts (1)
1-14: Public API surface looks good; minor polishConsider also re-exporting provider config/types (e.g., IUmamiImportConfig) from providers to keep consumers on a single import path. Optional.
IMPORT_REVIEW.md (1)
320-337: Move this review doc to /docs or PR notes to avoid shipping in package tarballsKeeps repo clean and avoids confusing consumers.
apps/worker/src/jobs/import.ts (4)
1-11: Avoid deep imports; prefer package barrel exportsImporting services via '@openpanel/db/src/services/import.service' couples to internal paths. Re-export these from '@openpanel/db' and import from there to reduce breakage.
75-81: Progress totalBatches fluctuates; compute once or report pctAs totalEvents grows, totalBatches changes mid-run. Either omit totalBatches here or compute a percentage for UI.
- totalBatches: Math.ceil(totalEvents / BATCH_SIZE), + progressPct: totalEvents ? Math.round((processedEvents / totalEvents) * 100) : 0,
17-17: Make BATCH_SIZE configurableRead from config/env to tune per deployment.
-const BATCH_SIZE = 5000; +const BATCH_SIZE = Number(process.env.IMPORT_BATCH_SIZE ?? 5000);
200-209: Validate provider-specific config with a schema before instantiationAvoid passing untyped Prisma JSON into providers. Parse with provider schema (zod) and fail early.
function createProvider( record: Prisma.ImportGetPayload<{ include: { project: true } }>, ) { switch (record.config.provider) { case 'umami': - return new UmamiProvider(record.projectId, record.config); + // Example: zUmamiImportConfig.safeParse(record.config) + return new UmamiProvider( + record.projectId, + (record.config as any) /* TODO: parse/validate */, + ); default: throw new Error(`Unknown provider: ${record.config.provider}`); } }I can wire in zod parsing if you expose the provider config schemas from @openpanel/importer.
packages/db/prisma/migrations/20251018205153_add_import_table/migration.sql (2)
9-16: Constrain status valuesAdd a CHECK constraint (or enum) to prevent invalid states.
"status" TEXT NOT NULL, @@ ) ; + +-- Restrict status values +ALTER TABLE "public"."imports" + ADD CONSTRAINT "imports_status_check" + CHECK ("status" IN ('pending','processing','completed','failed'));
18-22: Add indexes for common queriesList/filters will hit projectId/status/createdAt frequently.
-- AddForeignKey ALTER TABLE "public"."imports" ADD CONSTRAINT "imports_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "public"."projects"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- Indexes for performance +CREATE INDEX IF NOT EXISTS "idx_imports_project_createdAt" + ON "public"."imports" ("projectId", "createdAt" DESC); +CREATE INDEX IF NOT EXISTS "idx_imports_status_createdAt" + ON "public"."imports" ("status", "createdAt" DESC);apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.imports.tsx (5)
5-10: Remove unused icon importCheckCircle is unused.
-import { - CheckCircle, - CheckCircleIcon, - Download, - Loader2Icon, - XCircleIcon, -} from 'lucide-react'; +import { CheckCircleIcon, Download, Loader2Icon, XCircleIcon } from 'lucide-react';
83-104: Add onError to mutations for user feedbackSurface errors with toasts; also consider logging.
const deleteImport = useMutation( trpc.import.delete.mutationOptions({ onSuccess: () => { toast.success('Import deleted', { description: 'The import has been successfully deleted.', }); queryClient.invalidateQueries(trpc.import.list.pathFilter()); }, + onError: (err) => { + toast.error('Delete failed', { description: String(err) }); + }, }), ); @@ const retryImport = useMutation( trpc.import.retry.mutationOptions({ onSuccess: () => { toast.success('Import retried', { description: 'The import has been queued for processing again.', }); queryClient.invalidateQueries(trpc.import.list.pathFilter()); }, + onError: (err) => { + toast.error('Retry failed', { description: String(err) }); + }, }), );
244-261: Disable action buttons while pending to avoid double submitsTie disabled state to mutation pending flags.
- {imp.status === 'failed' && ( + {imp.status === 'failed' && ( <Button variant="outline" size="sm" - onClick={() => retryImport.mutate({ id: imp.id })} + disabled={retryImport.isPending} + onClick={() => retryImport.mutate({ id: imp.id })} > Retry </Button> )} <Button variant="ghost" size="sm" - onClick={() => deleteImport.mutate({ id: imp.id })} + disabled={deleteImport.isPending} + onClick={() => deleteImport.mutate({ id: imp.id })} > Delete </Button>
41-66: Consider serving logos locally or via asset proxyExternal Brandfetch URLs can break and may violate CSP. If feasible, mirror assets or use a proxy.
141-161: Consider extending IntegrationCardLogoImage to accept a customizable alt prop for improved accessibilityThe
altattribute is currently hardcoded to the generic "Integration Logo" at line 66 ofapps/start/src/components/integrations/integration-card.tsx. Extending the component to accept an optionalaltprop and passingprovider.namewould provide more descriptive alternative text for each integration, improving accessibility and SEO.packages/validation/src/index.ts (1)
444-445: Local file support vs schema
fileUrlisz.string().url(), but the Umami provider supports local files. Either (a) allowfile:///or absolute paths, or (b) drop local-path support in the provider. Today this schema blocks local imports.- fileUrl: z.string().url(), + fileUrl: z.string().refine( + (v) => /^https?:\/\//i.test(v) || /^file:\/\//i.test(v) || v.startsWith('/'), + 'Must be an http(s) URL, file:// URL, or absolute path', + ),packages/importer/src/providers/umami.ts (2)
178-195: Byte counting viabody.on('data')competes with pipelineAttach a lightweight Transform to count bytes between
bodyanddecompressto preserve backpressure and avoid dual consumers.- if (maxBytes) { - body.on('data', (chunk: Buffer) => { - seenBytes += chunk.length; - if (seenBytes > maxBytes) { - controller.abort(); - body.destroy(new Error(`Stream exceeded size limit (${seenBytes} > ${maxBytes})`)); - } - }); - } + const byteCounter = new (await import('node:stream')).Transform({ + transform(chunk, _enc, cb) { + seenBytes += chunk.length; + if (maxBytes && seenBytes > maxBytes) { + cb(new Error(`Stream exceeded size limit (${seenBytes} > ${maxBytes})`)); + return; + } + cb(null, chunk); + }, + }); @@ - if (decompress) { - await pipeline(body, decompress, parser, { signal: controller.signal }); - } else { - await pipeline(body, parser, { signal: controller.signal }); - } + if (decompress) { + await pipeline(body, byteCounter, decompress, parser, { signal: controller.signal }); + } else { + await pipeline(body, byteCounter, parser, { signal: controller.signal }); + }Also applies to: 211-227
91-99: Optional: support local Brotli (.br) like remote pathMirror remote handling and support
.brlocally if needed.- const isGzipped = filePath.endsWith('.gz'); + const isGzipped = filePath.endsWith('.gz'); + const isBrotli = filePath.endsWith('.br'); @@ - if (isGzipped) { + if (isGzipped) { await pipeline(fileStream, createGunzip(), parser); - } else { + } else if (isBrotli) { + await pipeline(fileStream, createBrotliDecompress(), parser); + } else { await pipeline(fileStream, parser); }packages/db/src/services/import.service.ts (3)
78-92: Consider using CTEs or subqueries for better maintainability.The
argMin/argMaxtuple approach works but is fragile—the 17-element tuple order must match the destructuring on lines 147-173. If fields are added, removed, or reordered, this becomes error-prone and difficult to debug.A clearer alternative would be to use CTEs with
ROW_NUMBER()or separate queries for first/last events, making field selection explicit:WITH ranked_events AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY device_id, session_id, project_id, profile_id ORDER BY created_at ASC) as first_rank, ROW_NUMBER() OVER (PARTITION BY device_id, session_id, project_id, profile_id ORDER BY created_at DESC) as last_rank FROM ${TABLE_NAMES.events_imports} WHERE ${rangeWhere} AND name NOT IN ('session_start', 'session_end') ) SELECT ... FROM ranked_events WHERE first_rank = 1 OR last_rank = 1
26-525: Consider adding explicit error handling and logging.None of the functions include try-catch blocks or error enrichment. While throwing errors to the caller is acceptable, adding contextual error handling would improve debuggability and allow for better error classification (transient vs. permanent failures).
Consider wrapping critical operations:
export async function insertImportBatch( events: IClickhouseEvent[], importId: string, ): Promise<ImportStageResult> { try { if (events.length === 0) { return { importId, totalEvents: 0, insertedEvents: 0 }; } const eventsWithMetadata = events.map((event) => ({ ...event, import_id: importId, import_status: 'pending', imported_at_meta: new Date(), })); await ch.insert({ table: TABLE_NAMES.events_imports, values: eventsWithMetadata, format: 'JSONEachRow', }); return { importId, totalEvents: events.length, insertedEvents: events.length, }; } catch (error) { // Add context and rethrow or log throw new Error(`Failed to insert import batch for ${importId}: ${error instanceof Error ? error.message : String(error)}`); } }This pattern would help with debugging import failures in production.
278-356: Consider batch migration for very large imports.The
INSERT INTO ... SELECTapproach is efficient for most cases, but for imports with millions of events, it might:
- Block for extended periods
- Consume significant memory
- Provide no progress feedback during the migration
For very large imports, consider implementing batch migration:
export async function migrateImportToProduction( importId: string, from?: string, to?: string, batchSize = 100000, // configurable batch size ): Promise<void> { let offset = 0; let migrated = 0; while (true) { const migrationQuery = ` INSERT INTO ${TABLE_NAMES.events} (...) SELECT ... FROM ${TABLE_NAMES.events_imports} WHERE import_id = {importId:String} ${from ? 'AND created_at >= {from:String}' : ''} ${to ? 'AND created_at < {to:String}' : ''} ORDER BY created_at ASC LIMIT {batchSize:UInt32} OFFSET {offset:UInt32} `; const result = await ch.command({ query: migrationQuery, query_params: { importId, from, to, batchSize, offset }, }); // If no rows affected, we're done if (result.query_id && migrated === 0) break; offset += batchSize; // Could call updateImportProgress here } }This is optional and depends on expected import sizes.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
packages/importer/a70ff57d-f632-4292-a98b-658d7734fbec.csvis excluded by!**/*.csvpnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (35)
IMPORT_REVIEW.md(1 hunks)apps/start/package.json(1 hunks)apps/start/src/components/integrations/integration-card.tsx(1 hunks)apps/start/src/components/ui/badge.tsx(1 hunks)apps/start/src/modals/add-import.tsx(1 hunks)apps/start/src/modals/index.tsx(2 hunks)apps/start/src/routeTree.gen.ts(11 hunks)apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.imports.tsx(1 hunks)apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.tsx(1 hunks)apps/worker/package.json(1 hunks)apps/worker/src/boot-workers.ts(5 hunks)apps/worker/src/index.ts(2 hunks)apps/worker/src/jobs/import.ts(1 hunks)packages/db/code-migrations/5-add-imports-table.sql(1 hunks)packages/db/code-migrations/5-add-imports-table.ts(1 hunks)packages/db/prisma/migrations/20251018205153_add_import_table/migration.sql(1 hunks)packages/db/prisma/migrations/20251018214030_fix/migration.sql(1 hunks)packages/db/prisma/schema.prisma(2 hunks)packages/db/src/clickhouse/client.ts(1 hunks)packages/db/src/services/event.service.ts(1 hunks)packages/db/src/services/import.service.ts(1 hunks)packages/db/src/types.ts(2 hunks)packages/importer/package.json(1 hunks)packages/importer/src/base-provider.ts(1 hunks)packages/importer/src/index.ts(1 hunks)packages/importer/src/providers/umami.ts(1 hunks)packages/importer/src/types.ts(1 hunks)packages/importer/src/utils/referrer-parser.ts(1 hunks)packages/importer/tsconfig.json(1 hunks)packages/importer/vitest.config.ts(1 hunks)packages/queue/src/queues.ts(1 hunks)packages/trpc/package.json(1 hunks)packages/trpc/src/root.ts(2 hunks)packages/trpc/src/routers/import.ts(1 hunks)packages/validation/src/index.ts(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/start/**/*.{ts,tsx}
📄 CodeRabbit inference engine (apps/start/.cursorrules)
apps/start/**/*.{ts,tsx}: Instrument server functions created with createServerFn by wrapping their implementation in Sentry.startSpan and importing Sentry from '@sentry/tanstackstart-react'
When using Sentry in the TanStack Start React app, import it as: import * as Sentry from '@sentry/tanstackstart-react'
Files:
apps/start/src/components/integrations/integration-card.tsxapps/start/src/modals/index.tsxapps/start/src/modals/add-import.tsxapps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.tsxapps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.imports.tsxapps/start/src/components/ui/badge.tsxapps/start/src/routeTree.gen.ts
🧬 Code graph analysis (17)
packages/db/src/types.ts (1)
packages/validation/src/index.ts (1)
IImportConfig(464-467)
packages/importer/src/utils/referrer-parser.ts (1)
packages/common/src/string.ts (1)
stripTrailingSlash(1-3)
packages/queue/src/queues.ts (1)
packages/redis/redis.ts (1)
getRedisQueue(93-105)
apps/worker/src/jobs/import.ts (3)
packages/queue/src/queues.ts (1)
ImportQueuePayload(163-168)packages/importer/src/providers/umami.ts (2)
UmamiRawEvent(55-55)UmamiProvider(57-426)packages/db/src/services/import.service.ts (8)
insertImportBatch(26-53)updateImportProgress(387-400)getImportDateBounds(508-525)reconstructSessions(61-272)migrateImportToProduction(278-356)markImportComplete(361-382)getImportProgress(434-488)markImportFailed(405-429)
apps/start/src/modals/index.tsx (1)
apps/start/src/modals/add-import.tsx (1)
AddImport(29-164)
packages/importer/vitest.config.ts (1)
vitest.shared.ts (1)
getSharedVitestConfig(4-27)
apps/worker/src/boot-workers.ts (2)
packages/queue/src/queues.ts (1)
importQueue(170-176)apps/worker/src/jobs/import.ts (1)
importJob(19-198)
packages/trpc/src/root.ts (1)
packages/trpc/src/routers/import.ts (1)
importRouter(11-183)
packages/db/code-migrations/5-add-imports-table.ts (2)
packages/db/code-migrations/helpers.ts (1)
getIsCluster(17-24)packages/db/src/clickhouse/migration.ts (2)
createTable(68-116)runClickhouseMigrationCommands(322-454)
apps/start/src/modals/add-import.tsx (2)
apps/start/src/integrations/tanstack-query/root-provider.tsx (1)
Provider(68-86)packages/importer/src/types.ts (1)
ImportConfig(7-12)
packages/trpc/src/routers/import.ts (6)
packages/trpc/src/trpc.ts (2)
createTRPCRouter(138-138)protectedProcedure(161-164)packages/trpc/src/access.ts (1)
getProjectAccessCached(4-4)packages/trpc/src/errors.ts (1)
TRPCAccessError(3-7)packages/db/src/prisma-client.ts (1)
db(270-270)packages/validation/src/index.ts (1)
zCreateImport(469-477)packages/queue/src/queues.ts (1)
importQueue(170-176)
packages/db/src/services/import.service.ts (3)
packages/db/src/services/event.service.ts (1)
IClickhouseEvent(68-102)packages/db/src/clickhouse/client.ts (4)
ch(131-141)TABLE_NAMES(48-60)convertClickhouseDateToJs(218-220)formatClickhouseDate(188-199)packages/db/src/prisma-client.ts (1)
db(270-270)
apps/worker/src/index.ts (1)
packages/queue/src/queues.ts (1)
importQueue(170-176)
apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.imports.tsx (2)
apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.tsx (1)
Route(12-34)apps/start/src/components/integrations/integration-card.tsx (3)
IntegrationCard(91-112)IntegrationCardLogoImage(50-69)IntegrationCardFooter(3-15)
packages/importer/src/base-provider.ts (2)
packages/importer/src/types.ts (3)
BaseRawEvent(40-42)ImportJobMetadata(69-73)ErrorContext(45-51)packages/db/src/services/event.service.ts (1)
IClickhouseEvent(68-102)
packages/importer/src/providers/umami.ts (4)
packages/validation/src/index.ts (1)
IUmamiImportConfig(450-450)packages/db/src/services/event.service.ts (2)
IClickhouseEvent(68-102)query(701-847)packages/common/src/url.ts (2)
parsePath(11-49)isSameDomain(51-63)packages/importer/src/utils/referrer-parser.ts (2)
parseReferrer(49-58)getReferrerWithQuery(60-91)
packages/importer/src/types.ts (2)
packages/db/src/services/event.service.ts (1)
IServiceImportedEventPayload(144-147)packages/db/src/services/import.service.ts (2)
ImportProgress(16-21)ImportStageResult(10-14)
🪛 Gitleaks (8.28.0)
apps/worker/src/index.ts
[high] 60-60: Identified a pattern that may indicate AWS credentials, risking unauthorized cloud resource access and data breaches on AWS platforms.
(aws-access-token)
🪛 LanguageTool
IMPORT_REVIEW.md
[uncategorized] ~35-~35: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ... batchNumber: number ) ``` --- ## 🟡 Medium Priority Issues ### 2. **Missing UUID Dependenc...
(EN_COMPOUND_ADJECTIVE_INTERNAL)
[grammar] ~156-~156: Ensure spelling is correct
Context: ...d for EVERY event ``` Improvement: Cleanup periodically (e.g., every 1000 events) ...
(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: build-and-push-api
- GitHub Check: build-and-push-worker
🔇 Additional comments (24)
apps/start/package.json (1)
6-6: Clarify the motivation for renaming "testing" → "testing2".The script was renamed from
"testing"to"testing2", but the name"testing2"is not descriptive and appears to be a placeholder. Additionally, this change seems orthogonal to the PR objective ("new importer").Please clarify:
- What is the intended purpose of renaming this script?
- Is
"testing2"a temporary name, or was a different name intended?- Was the original
"testing"script referenced in CI/CD pipelines, documentation, or developer workflows? If so, those references need to be updated to avoid breaking them.If this change is unintended, consider reverting it.
apps/start/src/components/integrations/integration-card.tsx (1)
50-69: LGTM! Consistent API enhancement.The addition of the optional
classNameprop alignsIntegrationCardLogoImagewith the other components in this file (IntegrationCardFooter,IntegrationCardHeader,IntegrationCardHeaderButtons) that already acceptclassName. The prop is properly forwarded toIntegrationCardLogo, which correctly merges it using thecn()utility.apps/start/src/components/ui/badge.tsx (2)
12-13: LGTM! Success variant follows good practices.The emerald-700 background with white text provides good contrast and is semantically appropriate for success states.
16-16: LGTM! Info variant is well-implemented.The blue-500 background with white text provides adequate contrast and follows the established pattern.
packages/importer/tsconfig.json (1)
1-9: LGTM! Clean TypeScript configuration.The configuration is properly structured for the new importer package, extending the shared base config and setting appropriate compiler options for a source-in-src, output-to-dist layout.
packages/importer/package.json (1)
27-27:bullmqdependency placement is correct.Verification shows
bullmqis exclusively indevDependencieswith no runtime imports or usage in the importer source code. The package does not directly enqueue or process jobs—job operations are handled through the@openpanel/queueworkspace dependency. The current placement is appropriate.packages/db/src/types.ts (1)
2-2: Alignment with validation types looks goodThe Prisma JSON alias cleanly mirrors the validation union. No issues spotted.
Also applies to: 16-16
packages/trpc/package.json (1)
19-19: No circular dependency detected—safe to proceed.Verification confirms @openpanel/queue does not depend on @openpanel/trpc. The dependency is unidirectional: @openpanel/trpc (line 19) → @openpanel/queue. No cycle risk.
apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.tsx (1)
45-45: LGTM! Clean addition of the Imports tab.The new imports tab follows the existing pattern and integrates well with the settings navigation structure.
apps/worker/package.json (1)
23-23: LGTM! Necessary dependency for import functionality.The addition of the
@openpanel/importerworkspace dependency enables the worker to process import jobs.packages/db/src/clickhouse/client.ts (1)
59-59: LGTM! Table name constant added correctly.The
events_importstable name addition follows the existing pattern and aligns with the new ClickHouse migration.apps/worker/src/index.ts (1)
8-8: LGTM! Import queue integration added correctly.The
importQueueis properly imported and registered with Bull Board for monitoring.Also applies to: 42-42
packages/db/src/services/event.service.ts (1)
144-147: LGTM! Type definition is clear and purposeful.The
IServiceImportedEventPayloadtype correctly omitsprofileandmetafields for imported events, which makes sense as these would need to be resolved separately during import processing.packages/db/code-migrations/5-add-imports-table.sql (2)
42-43: Verify the 7-day TTL is intentional for staging data.The TTL of 7 days on
imported_at_metameans all staging import data will be automatically deleted after one week. Confirm this aligns with your data retention requirements and import workflow expectations. If imports might take longer than 7 days to validate or if you need to keep historical import records, consider increasing this value or making it configurable.
1-38: LGTM! Well-designed ClickHouse table schema.The table structure uses appropriate data types, codecs for compression, and partitioning strategy. The use of LowCardinality for categorical fields and specialized codecs (ZSTD, Delta, Gorilla) shows good understanding of ClickHouse optimization patterns.
packages/db/prisma/schema.prisma (1)
470-486: LGTM! Import model is well-structured.The Import model has appropriate fields for tracking import progress, includes proper relations to Project with cascade delete, and uses sensible defaults. The use of Json for config provides flexibility while the comment
/// [IPrismaImportConfig]indicates type documentation exists elsewhere.Also applies to: 197-197
IMPORT_REVIEW.md (2)
37-59: Remove the action item; importer uses node:crypto, not the uuid packageThe script results show that
packages/importerusesrandomUUID()imported fromnode:crypto(inpackages/importer/src/providers/umami.tsandpackages/cli/src/importer/importer.ts), not from the externaluuidpackage. Theuuidpackage is used elsewhere in the codebase (specifically inpackages/db/src/services/event.service.ts), but not in the importer package. The review comment's recommendation to adduuidto the importer's dependencies is unnecessary.Likely an incorrect or invalid review comment.
10-31: Review comment is based on outdated or incorrect code state; actual implementation already uses proper typesThe review comment cites code that does not match the current implementation:
- processBatch function: Does not exist in the codebase (rg search found zero matches)
anytype usage: No matches found inapps/worker/src/jobs/(contradicts claim of "extensive use")- Actual typing: Line 46 of
apps/worker/src/jobs/import.tsalready usesconst eventBatch: UmamiRawEvent[] = [];(properly typed, notany[])- BaseImportProvider: Confirmed generic with
<TRawEvent extends BaseRawEvent = BaseRawEvent>, andUmamiProvidercorrectly extends it asBaseImportProvider<UmamiRawEvent>The current implementation already has the proper type safety that the review comment suggests. The code is not coupled via weak typing but properly typed with
UmamiRawEvent. No refactoring is needed.Likely an incorrect or invalid review comment.
apps/start/src/routeTree.gen.ts (1)
57-57: Generated file — no manual editsLooks consistent with the new Imports tab. No action; keep excluded from lint/format.
Also applies to: 373-378, 511-511, 564-564, 800-801, 1170-1176, 1504-1517
packages/importer/src/types.ts (1)
20-25: Naming duplication exists but no actual import collision risk – review comment is incorrect.The analysis shows that while
ImportProgressandImportStageResultare defined in both packages, the importer package does not import these collision types from @openpanel/db. The importer only imports unrelated types likeIClickhouseEventandIServiceImportedEventPayload.Additionally, the
ImportProgresstypes serve different purposes: the importer's version tracks batch processing state (totalEvents, processedEvents, currentBatch, totalBatches), while the db version tracks job metadata (importId, totalEvents, insertedEvents, status). This is intentional, not a collision.Since there are no cross-imports and the types aren't used interchangeably, no "confusion in imports" or "subtle type bugs" will occur. Both packages maintain independent type contracts appropriate to their concerns.
Likely an incorrect or invalid review comment.
packages/db/src/services/import.service.ts (4)
1-21: LGTM!The imports and type definitions are well-structured. The interfaces clearly define the data contracts for staging results and progress tracking.
26-53: LGTM!The batch insert function properly handles empty arrays, adds appropriate metadata, and leverages the retry-wrapped ClickHouse insert. The implementation is clean and defensive.
493-503: Verify cleanup scope: should Postgres import record also be deleted?The
cleanupFailedImportfunction only deletes from ClickHouse'sevents_importstable. The corresponding Postgresimportrecord remains with status 'failed'.Is this intentional (to preserve audit history), or should the Postgres record also be deleted for consistency?
If Postgres records should also be cleaned up, add:
export async function cleanupFailedImport(importId: string): Promise<void> { const deleteQuery = ` DELETE FROM ${TABLE_NAMES.events_imports} WHERE import_id = {importId:String} AND import_status = 'failed' `; await ch.command({ query: deleteQuery, query_params: { importId }, }); + + // Also remove from Postgres if desired + await db.import.delete({ + where: { id: importId, status: 'failed' }, + }); }
508-525: LGTM!The
getImportDateBoundsutility correctly handles the nullable case and provides a clean interface for date range queries. Defensive null check on line 524 prevents runtime errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix color contrast for accessibility.
White text on a yellow-500 background likely fails WCAG AA contrast requirements (4.5:1). Yellow backgrounds conventionally require dark text for proper readability.
Apply this diff to fix the contrast issue:
- warning: - 'border-transparent bg-yellow-500 text-white [a&]:hover:bg-yellow-500/90', + warning: + 'border-transparent bg-yellow-500 text-yellow-950 [a&]:hover:bg-yellow-500/90',
Alternatively, use a darker yellow background with white text:
- warning: - 'border-transparent bg-yellow-500 text-white [a&]:hover:bg-yellow-500/90', + warning: + 'border-transparent bg-yellow-600 text-white [a&]:hover:bg-yellow-600/90',
📝 Committable suggestion
!!️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
🤖 Prompt for AI Agents
In apps/start/src/components/ui/badge.tsx around lines 14-15, the warning
variant uses 'bg-yellow-500 text-white' which likely fails WCAG contrast; change
either the text color to a dark color (e.g., replace text-white with
text-gray-900 or text-black) so dark text sits on yellow-500, or keep white text
and use a darker yellow background (e.g., replace bg-yellow-500 with
bg-yellow-600 or bg-yellow-700) and adjust the hover class accordingly to
maintain accessible contrast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid leaking secrets in error paths/logs
error.message may include upstream details. Ensure server never echoes API secrets. Consider a generic toast and log details server-side only.
🤖 Prompt for AI Agents
In apps/start/src/modals/add-import.tsx around lines 55 to 60, the current toast
displays error.message which can leak upstream secrets; replace the client-side
toast with a generic user-facing message (e.g., "Import failed, try again
later") and remove inclusion of error.message, and instead ensure the server
logs the full error details securely (or returns a sanitized error code/message)
so secrets are never echoed to the client; update the toast usage accordingly
and confirm server-side logging/response sanitization is in place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Payload shape mismatches zod schema (will be rejected by TRPC)
- Missing
providerinsideconfig. config.typeshould be 'file' or 'api' (not provider name).- Mixpanel path hardcodes incorrect
typeand omitsproviderinconfig.
Align with zCreateImport provider-specific config shapes.
- type Provider = 'umami' | 'plausible' | 'mixpanel'; - -interface ImportConfig { - type: Provider; - fileUrl?: string; - apiKey?: string; - apiSecret?: string; - projectId?: string; -} +type Provider = 'umami' | 'plausible' | 'mixpanel'; + +interface ImportConfig { + fileUrl?: string; + apiKey?: string; + apiSecret?: string; + projectId?: string; +} @@ - const [config, setConfig] = useState<ImportConfig>({ - type: provider, - ...(sourceType === 'file' - ? { fileUrl: '' } - : { apiKey: '', apiSecret: '', projectId: '' }), - }); + const [config, setConfig] = useState<ImportConfig>( + sourceType === 'file' + ? { fileUrl: '' } + : { apiKey: '', apiSecret: '', projectId: '' }, + ); @@ - if (sourceType === 'file') { - createImport.mutate({ - projectId, - provider, - config: { - type: provider as 'umami' | 'plausible', - fileUrl: config.fileUrl || '', - }, - }); - } else { - createImport.mutate({ - projectId, - provider: 'mixpanel', - config: { - type: 'mixpanel', - apiKey: config.apiKey || '', - apiSecret: config.apiSecret || '', - projectId: config.projectId || '', - }, - }); - } + if (sourceType === 'file') { + createImport.mutate({ + projectId, + provider, // 'umami' or 'plausible' + config: { + provider, // required by schema + type: 'file', // required by schema + fileUrl: config.fileUrl!, + }, + }); + } else { + createImport.mutate({ + projectId, + provider: 'mixpanel', + config: { + provider: 'mixpanel', // required by schema + type: 'api', // required by schema + apiKey: config.apiKey!, + apiSecret: config.apiSecret!, + projectId: config.projectId!, + }, + }); + }
Also applies to: 13-21, 38-43
🤖 Prompt for AI Agents
In apps/start/src/modals/add-import.tsx around lines 62-84 (also fix same
pattern at 13-21 and 38-43): the object passed to createImport.mutate does not
match the zCreateImport schema — config is missing a provider field and
config.type is using provider names instead of the required 'file' | 'api'. Fix
by including provider at the top-level as you already do, and also add provider
inside config; set config.type = 'file' for file-based imports and config.type =
'api' for API-based imports; for file providers put fileUrl in config, for
mixpanel set config.apiKey, config.apiSecret, config.projectId and
config.provider = 'mixpanel' and config.type = 'api' so the payload matches the
schema exactly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Remove UmamiRawEvent coupling; use BaseImportProvider generics
This job should be provider-agnostic. Swap UmamiRawEvent for BaseImportProvider.
-import { UmamiProvider, type UmamiRawEvent } from '@openpanel/importer'; +import { UmamiProvider, type BaseImportProvider } from '@openpanel/importer';
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In apps/worker/src/jobs/import.ts around lines 12 to 14, remove the direct
coupling to UmamiRawEvent and instead use the generic BaseImportProvider<T> from
the importer package: drop the UmamiRawEvent import, import BaseImportProvider
(and its type) from '@openpanel/importer', and replace any references to
UmamiRawEvent in the job/payload types with the generic BaseImportProvider<T>
(e.g., ImportQueuePayload<BaseImportProvider<any>> or a concrete T where
appropriate). Ensure all imports and Job generics are updated to use the
BaseImportProvider generic so the job becomes provider-agnostic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Make batch buffer provider-agnostic and harden transform stage
- Use unknown[] for the raw batch.
- Guard transformEvent with try/catch to skip bad rows without failing the whole job.
- const eventBatch: UmamiRawEvent[] = []; + const eventBatch: unknown[] = []; @@ - const transformedEvents = eventBatch.map((event) => - providerInstance.transformEvent(event), - ); + const transformedEvents: any[] = []; + for (const event of eventBatch) { + try { + transformedEvents.push(providerInstance.transformEvent(event as any)); + } catch (e) { + jobLogger.warn('Skipping event on transform failure', { + provider: providerInstance.provider, + reason: e instanceof Error ? e.message : String(e), + }); + } + }
📝 Committable suggestion
!!️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
🤖 Prompt for AI Agents
In apps/worker/src/jobs/import.ts around lines 45 to 69, the batch buffer and
transform stage need hardening: change eventBatch from UmamiRawEvent[] to
unknown[] so the buffer is provider-agnostic, and wrap each
providerInstance.transformEvent call in a try/catch so a thrown error on one row
is caught, logged (jobLogger.warn with the rawEvent and error), and that row is
skipped without failing the whole job; ensure you collect only successful
transformed events (filter out failed/undefined results) before calling
insertImportBatch(importId).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EventProperties allows nested objects, but ClickHouse properties is primitives-only
IClickhouseEvent.properties is Record<string, string|number|boolean|undefined|null>. Storing objects (e.g., __query) will break ingestion unless stringified.
- Constrain EventProperties values to primitives.
- Or require JSON.stringify for nested shapes (e.g.,
__queryJson: string).
🤖 Prompt for AI Agents
In packages/importer/src/types.ts around lines 53 to 66, EventProperties
currently allows nested objects which will break ClickHouse ingestion because
IClickhouseEvent.properties must be primitives-only; update the EventProperties
interface so values are limited to string | number | boolean | null | undefined
(remove Record<string, unknown>) or replace nested object fields with explicit
JSON string fields (e.g., __queryJson: string) and rename/remove existing
__query, ensuring all nested shapes are JSON.stringified before being assigned
to EventProperties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
__dirname is unavailable in ESM modules.
Since package.json declares "type": "module", this file runs as ESM. The __dirname global doesn't exist in ESM and will throw a ReferenceError.
Apply this diff to compute the directory path in ESM:
+import { dirname } from 'node:path'; +import { fileURLToPath } from 'node:url'; + import { getSharedVitestConfig } from '../../vitest.shared'; -export default getSharedVitestConfig({ __dirname }); +export default getSharedVitestConfig({ + __dirname: dirname(fileURLToPath(import.meta.url)) +});
📝 Committable suggestion
!!️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
🤖 Prompt for AI Agents
In packages/importer/vitest.config.ts around line 3, the code passes the
non-existent __dirname to getSharedVitestConfig (ESM modules don't have
__dirname); compute the directory from import.meta.url instead and pass that
value: import fileURLToPath from 'url' and path from 'path' (or use their named
functions), derive const __dirname =
path.dirname(fileURLToPath(import.meta.url)), then call getSharedVitestConfig({
__dirname }) so the config receives the correct directory path in ESM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sensitive config exposure in list/get — restrict or redact.
config likely contains provider credentials. Current list/get return it to any project member with read access. Redact or gate by role.
Option A (redact for read-level):
list: protectedProcedure
.input(z.object({ projectId: z.string() }))
.query(async ({ input, ctx }) => {
const access = await getProjectAccessCached({
projectId: input.projectId,
userId: ctx.session.userId,
});
if (!access) {
throw TRPCAccessError('You do not have access to this project');
}
- return db.import.findMany({
+ const rows = await db.import.findMany({
where: { projectId: input.projectId },
orderBy: { createdAt: 'desc' },
});
+ const canSeeSecrets =
+ typeof access !== 'boolean' && access.level !== 'read';
+ return canSeeSecrets
+ ? rows
+ : rows.map((r) => ({ ...r, config: null }));
}), get: protectedProcedure
.input(z.object({ id: z.string() }))
.query(async ({ input, ctx }) => {
const importRecord = await db.import.findUniqueOrThrow({
where: { id: input.id },
include: { project: true },
});
const access = await getProjectAccessCached({
projectId: importRecord.projectId,
userId: ctx.session.userId,
});
if (!access) {
throw TRPCAccessError('You do not have access to this import');
}
- return importRecord;
+ const canSeeSecrets =
+ typeof access !== 'boolean' && access.level !== 'read';
+ return canSeeSecrets ? importRecord : { ...importRecord, config: null };
}),Option B: Use Prisma select to exclude config in list and only include it for admin/write roles.
Also applies to: 34-57
🤖 Prompt for AI Agents
In packages/trpc/src/routers/import.ts around lines 12 to 32 (and likewise apply
the change to lines 34 to 57), the current list/get resolvers return the full
import records including the sensitive config field to any project member;
change the DB queries to explicitly exclude the config field by using a Prisma
select (e.g., select: { config: false, ...otherFields }) for read-level access,
and only fetch/include config when the caller has a higher role (admin/write)
after re-checking the caller's role from ctx.session or the access object;
implement a conditional: if role allows, run a second query or include config in
select, otherwise omit/redact config from the returned objects and ensure no
accidental passthrough in list results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Queue enqueue not wrapped — record can be left pending with no job.
If importQueue.add fails, DB keeps a dangling 'pending' import. Wrap in try/catch, set failed status, and pass stable jobId/attempts/backoff.
// Create import record
const importRecord = await db.import.create({
data: {
projectId: input.projectId,
config: input.config,
status: 'pending',
},
});
- // Add job to queue
- const job = await importQueue.add('import', {
- type: 'import',
- payload: {
- importId: importRecord.id,
- },
- });
-
- // Update import record with job ID
- await db.import.update({
- where: { id: importRecord.id },
- data: { jobId: job.id },
- });
+ // Add job to queue
+ try {
+ const job = await importQueue.add(
+ 'import',
+ { type: 'import', payload: { importId: importRecord.id } },
+ { jobId: importRecord.id, attempts: 3, backoff: { type: 'exponential', delay: 30_000 } },
+ );
+ // Update import record with job ID (idempotent if jobId equals importRecord.id)
+ await db.import.update({
+ where: { id: importRecord.id },
+ data: { jobId: job.id },
+ });
+ } catch (err) {
+ await db.import.update({
+ where: { id: importRecord.id },
+ data: { status: 'failed', errorMessage: err instanceof Error ? err.message : 'Queue enqueue failed' },
+ });
+ throw err;
+ }📝 Committable suggestion
!!️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
🤖 Prompt for AI Agents
In packages/trpc/src/routers/import.ts around lines 72-99, the code creates an
import DB record then calls importQueue.add without error handling so a failed
enqueue leaves a dangling 'pending' record; to fix, generate a stable jobId
(e.g. UUID) before enqueue, persist that jobId on the import record (or create
the record with that jobId) and then wrap importQueue.add in try/catch — on
success update any job metadata if needed, on failure update the DB record to
status 'failed' and write the error message and retry metadata
(attempts/backoff) to the record; ensure the job payload includes the stable
jobId and desired attempts/backoff so retries are deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discriminate by provider to prevent mismatched payloads
provider and config can disagree (e.g., provider='umami' with a Mixpanel config). Make zCreateImport a discriminated union on provider so config shape is tied to the selected provider.
-export const zCreateImport = z.object({ - projectId: z.string().min(1), - provider: z.enum(['umami', 'plausible', 'mixpanel']), - config: z.union([ - zUmamiImportConfig, - zPlausibleImportConfig, - zMixpanelImportConfig, - ]), -}); +export const zCreateImport = z.discriminatedUnion('provider', [ + z.object({ + projectId: z.string().min(1), + provider: z.literal('umami'), + config: zUmamiImportConfig, + }), + z.object({ + projectId: z.string().min(1), + provider: z.literal('plausible'), + config: zPlausibleImportConfig, + }), + z.object({ + projectId: z.string().min(1), + provider: z.literal('mixpanel'), + config: zMixpanelImportConfig, + }), +]);
🤖 Prompt for AI Agents
In packages/validation/src/index.ts around lines 469 to 477, zCreateImport
currently allows provider and config to disagree; change it to a discriminated
union so each provider literal is paired with the correct config shape. Replace
the single z.object with either z.discriminatedUnion('provider', [...]) or a
z.union of three z.object schemas where provider is a literal ('umami' |
'plausible' | 'mixpanel') and config is the matching zUmamiImportConfig,
zPlausibleImportConfig, or zMixpanelImportConfig respectively; keep the same
exported name and types so callers get compile-time validation that provider and
config match.
Uh oh!
There was an error while loading. Please reload this page.
Summary by CodeRabbit