Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

new importer #214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
lindesvard wants to merge 1 commit into main
base: main
Choose a base branch
Loading
from feature/import
Open

new importer #214

lindesvard wants to merge 1 commit into main from feature/import

Conversation

@lindesvard
Copy link
Contributor

@lindesvard lindesvard commented Oct 19, 2025
edited by coderabbitai bot
Loading

Summary by CodeRabbit

  • New Features
    • Added data import functionality supporting Umami, Plausible, and Mixpanel providers
    • New Imports settings page to view and manage import history
    • Import status monitoring with progress tracking and visual indicators
    • Ability to retry failed imports and delete completed imports

Copy link
Contributor

coderabbitai bot commented Oct 19, 2025
edited
Loading

Walkthrough

Implements a comprehensive provider-based import system for external analytics sources, including a new importer package, database schema updates (PostgreSQL and ClickHouse), worker job handler, TRPC API routes, validation schemas, and frontend UI for managing data imports from Umami, Plausible, and Mixpanel.

Changes

Cohort / File(s) Summary
Import System Core
packages/importer/src/base-provider.ts, packages/importer/src/index.ts
Defines abstract BaseImportProvider class with lifecycle hooks and public API exports
Umami Provider Implementation
packages/importer/src/providers/umami.ts
Full Umami provider with CSV/gzip parsing, streaming, validation, and event transformation logic
Importer Types & Utils
packages/importer/src/types.ts, packages/importer/src/utils/referrer-parser.ts
Exports import-related type contracts and referrer parsing utility
Importer Package Config
packages/importer/package.json, packages/importer/tsconfig.json, packages/importer/vitest.config.ts
Package metadata, TypeScript, and Vitest configuration for new importer workspace
PostgreSQL Schema
packages/db/prisma/migrations/20251018205153_add_import_table/migration.sql, packages/db/prisma/migrations/20251018214030_fix/migration.sql, packages/db/prisma/schema.prisma
Creates imports table with project relation; removes provider, sourceLocation, sourceType columns in follow-up; updates Prisma schema
ClickHouse Schema & Service
packages/db/code-migrations/5-add-imports-table.sql, packages/db/code-migrations/5-add-imports-table.ts
Adds events\_imports staging table with TTL policy via SQL migration script
Database Service Layer
packages/db/src/services/import.service.ts
Exports batch insertion, session reconstruction, production migration, progress tracking, and cleanup functions
Database Configuration
packages/db/src/clickhouse/client.ts, packages/db/src/types.ts, packages/db/src/services/event.service.ts
Adds events\_imports table name, import config type alias, and imported event payload type
Queue & Worker Setup
packages/queue/src/queues.ts, apps/worker/src/boot-workers.ts, apps/worker/src/index.ts, apps/worker/package.json
Defines ImportQueuePayload, registers importQueue, creates importWorker with concurrency, updates dependencies
Import Job Handler
apps/worker/src/jobs/import.ts
Implements end-to-end import pipeline with staging, batch processing, session reconstruction, and production migration
Validation & API
packages/validation/src/index.ts, packages/trpc/src/routers/import.ts, packages/trpc/src/root.ts, packages/trpc/package.json
Defines import config schemas, TRPC router with list/get/create/delete/retry procedures, registers router
Frontend Settings Page
apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.imports.tsx
New ImportsSettings component with provider grid, import history table, status tracking, and mutations (delete, retry)
Frontend Modal & Navigation
apps/start/src/modals/add-import.tsx, apps/start/src/modals/index.tsx, apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.tsx
Adds AddImport modal with file/API config inputs, registers modal, adds imports tab to settings
Route Generation & UI Updates
apps/start/src/routeTree.gen.ts, apps/start/src/components/ui/badge.tsx, apps/start/src/components/integrations/integration-card.tsx
Auto-generates imports route, adds success/warning/info badge variants, adds className prop to IntegrationCardLogoImage
Package Updates
apps/start/package.json, apps/worker/package.json, packages/trpc/package.json
Renames start script, adds importer dependency to worker, adds queue dependency to trpc
Documentation
IMPORT_REVIEW.md
Outlines type safety, timestamp validation, dependency fixes, and architectural improvements

Sequence Diagram(s)

sequenceDiagram
 actor User
 participant UI as Frontend<br/>(AddImport Modal)
 participant TRPC as TRPC Router<br/>(import.create)
 participant DB as PostgreSQL<br/>(imports table)
 participant Queue as Redis Queue<br/>(importQueue)
 participant Worker as Import Worker
 participant Provider as BaseImportProvider<br/>(e.g., Umami)
 participant Staging as ClickHouse<br/>(events_imports)
 participant Prod as ClickHouse<br/>(events)
 User->>UI: Configure import (provider, config)
 UI->>TRPC: import.create({provider, config})
 TRPC->>DB: Create import record (status: pending)
 DB-->>TRPC: Return importId
 TRPC->>Queue: Enqueue job with importId
 TRPC->>DB: Update import with jobId
 TRPC-->>UI: Success toast, refresh list
 
 Note over Worker: Job Processing
 Worker->>DB: Load import record
 Worker->>Provider: Initialize provider (Umami, Plausible, Mixpanel)
 
 rect rgb(200, 220, 240)
 Note over Worker,Staging: Phase 1: Load & Batch
 Worker->>Provider: parseSource() async generator
 Provider-->>Worker: Yields raw events (stream)
 Worker->>Provider: validate(rawEvent)
 Worker->>Provider: transformEvent(rawEvent)
 Worker->>Staging: insertImportBatch (chunk of 5000)
 Staging-->>Worker: Return batch result
 Worker->>DB: updateImportProgress()
 end
 
 rect rgb(220, 240, 220)
 Note over Worker,Prod: Phase 2-4: Monthly Processing
 loop For each month-spanning slice
 Worker->>Staging: reconstructSessions(importId, from, to)
 Staging->>Staging: Build session_start/end events
 Staging-->>Worker: Sessions reconstructed
 Worker->>Staging: migrateImportToProduction(importId, from, to)
 Staging->>Prod: Bulk insert migrated events
 Prod-->>Staging: Events migrated
 Worker->>DB: updateImportProgress()
 end
 end
 
 rect rgb(240, 220, 220)
 Note over Worker,DB: Phase 5: Completion
 Worker->>DB: markImportComplete()
 DB-->>Worker: Status: completed
 Worker->>DB: getImportProgress()
 end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

The changes introduce a multi-layered import system spanning databases (PostgreSQL + ClickHouse), provider abstraction with generics, worker job orchestration, API routes with access control, and frontend UI. While individual cohorts follow consistent patterns (straightforward schema migrations, service functions, TRPC procedures), the heterogeneity of components and density of new abstractions (BaseImportProvider lifecycle, session reconstruction logic, batch staging workflow) demands careful cross-layer reasoning and validation of end-to-end data flow.

Poem

🐰 Hops through imports with delight,
Providers plug in, left and right,
Events cascade in batches true,
Sessions dance, old made new,
From Umami's fields to ClickHouse's keep—
Analytics harvested, data stacked deep! 🌾✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.91% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "new importer" is related to the changeset and refers to a real aspect of the change. The PR introduces a new importer system with provider support (Umami, Plausible, Mixpanel) as a core addition, which the title accurately captures. While the changeset also includes supporting infrastructure (database tables, TRPC API endpoints, UI components, job handling), the primary focus is the new importer package and system, making the title appropriately descriptive. The title is not misleading or vague to the extent of "misc updates" or "stuff" — it conveys meaningful information about what is being added.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/import

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 28

🧹 Nitpick comments (35)
apps/start/src/modals/index.tsx (1)

8-8: Consider lazy‐loading AddImport to keep initial bundle slim

AddImport brings TRPC/React Query; load it on demand.

Apply:

+import { lazy } from 'react';
-import AddImport from './add-import';
+const AddImport = lazy(() => import('./add-import'));

Ensure ModalProvider or the modal surface renders within a .

Also applies to: 41-41

packages/importer/src/base-provider.ts (1)

7-8: Make core hooks async‐friendly and identifiers immutable

Allow providers to perform async transforms/validation; mark identifiers readonly.

 export abstract class BaseImportProvider<
 TRawEvent extends BaseRawEvent = BaseRawEvent,
 > {
- abstract provider: string;
- abstract version: string;
+ abstract readonly provider: string;
+ abstract readonly version: string;
+
+ // Utility type to allow sync or async implementations
+ export type MaybePromise<T> = T | Promise<T>;
 /**
 * Stream-read and parse source (file/API) → yields raw events
 * This should be implemented as an async generator to handle large files efficiently
 */
 abstract parseSource(): AsyncGenerator<TRawEvent, void, unknown>;
 /**
 * Convert provider format → IClickhouseEvent
 */
- abstract transformEvent(rawEvent: TRawEvent): IClickhouseEvent;
+ abstract transformEvent(rawEvent: TRawEvent): MaybePromise<IClickhouseEvent>;
 /**
 * Validate raw event structure
 */
- abstract validate(rawEvent: TRawEvent): boolean;
+ abstract validate(rawEvent: TRawEvent): MaybePromise<boolean>;

Also applies to: 19-24

packages/importer/src/utils/referrer-parser.ts (3)

5-35: Type the map and normalize hosts; reduce duplicates

Strong types + normalization avoid listing www/m. variants twice and improve coverage.

-import { stripTrailingSlash } from '@openpanel/common';
+import { stripTrailingSlash } from '@openpanel/common';
+type ReferrerType = 'search' | 'social' | 'referral';
+type ReferrerEntry = { type: ReferrerType; name: string };
+const normalizeHost = (h: string) => h.replace(/^(www\.|m\.)/i, '').toLowerCase();
-const referrers: Record<string, { type: string; name: string }> = {
+const referrers: Record<string, ReferrerEntry> = {
 // Search engines
- 'google.com': { type: 'search', name: 'Google' },
- 'www.google.com': { type: 'search', name: 'Google' },
+ 'google.com': { type: 'search', name: 'Google' },
 'bing.com': { type: 'search', name: 'Bing' },
- 'www.bing.com': { type: 'search', name: 'Bing' },
 'yahoo.com': { type: 'search', name: 'Yahoo!' },
- 'www.yahoo.com': { type: 'search', name: 'Yahoo!' },
 'duckduckgo.com': { type: 'search', name: 'DuckDuckGo' },
- 'www.duckduckgo.com': { type: 'search', name: 'DuckDuckGo' },
 'baidu.com': { type: 'search', name: 'Baidu' },
- 'www.baidu.com': { type: 'search', name: 'Baidu' },
 // Social media
- 'facebook.com': { type: 'social', name: 'Facebook' },
- 'www.facebook.com': { type: 'social', name: 'Facebook' },
+ 'facebook.com': { type: 'social', name: 'Facebook' },
 'twitter.com': { type: 'social', name: 'Twitter' },
- 'www.twitter.com': { type: 'social', name: 'Twitter' },
 'x.com': { type: 'social', name: 'X' },
- 'www.x.com': { type: 'social', name: 'X' },
 'linkedin.com': { type: 'social', name: 'LinkedIn' },
- 'www.linkedin.com': { type: 'social', name: 'LinkedIn' },
 'instagram.com': { type: 'social', name: 'Instagram' },
- 'www.instagram.com': { type: 'social', name: 'Instagram' },
 'tiktok.com': { type: 'social', name: 'TikTok' },
- 'www.tiktok.com': { type: 'social', name: 'TikTok' },
 'youtube.com': { type: 'social', name: 'YouTube' },
- 'www.youtube.com': { type: 'social', name: 'YouTube' },
 'reddit.com': { type: 'social', name: 'Reddit' },
- 'www.reddit.com': { type: 'social', name: 'Reddit' },
+ 't.co': { type: 'social', name: 'Twitter' },
+ 'lnkd.in': { type: 'social', name: 'LinkedIn' },
 };
 
 export function parseReferrer(url: string | undefined) {
- const hostname = getHostname(url);
- const match = referrers[hostname] ?? referrers[hostname.replace('www.', '')];
+ const hostname = normalizeHost(getHostname(url));
+ const match = referrers[hostname];

Also applies to: 49-52


49-57: Guard output URL to http(s) only

Avoid echoing non-web schemes (e.g., javascript:) back to callers.

 export function parseReferrer(url: string | undefined) {
 const hostname = getHostname(url);
 const match = referrers[hostname] ?? referrers[hostname.replace('www.', '')];
- return {
+ let safeUrl = '';
+ try {
+ if (url) {
+ const u = new URL(url);
+ if (u.protocol === 'http:' || u.protocol === 'https:') {
+ safeUrl = stripTrailingSlash(url);
+ }
+ }
+ } catch {
+ // ignore
+ }
+
+ return {
 name: match?.name ?? '',
 type: match?.type ?? 'referral',
- url: stripTrailingSlash(url ?? ''),
+ url: safeUrl,
 };
 }

60-91: Minor: normalize source before lookup and keep types consistent

Lower-case once; reuse normalizeHost for direct key match; return typed result.

-export function getReferrerWithQuery(
- query: Record<string, string> | undefined,
-) {
+export function getReferrerWithQuery(
+ query: Record<string, string> | undefined,
+): { name: string; type: ReferrerType; url: string } | null {
 if (!query) {
 return null;
 }
- const source = query.utm_source ?? query.ref ?? query.utm_referrer ?? '';
+ const source = (query.utm_source ?? query.ref ?? query.utm_referrer ?? '').trim();
 if (source === '') {
 return null;
 }
- const match =
- Object.values(referrers).find(
- (referrer) => referrer.name.toLowerCase() === source.toLowerCase(),
- ) || referrers[source];
+ const lower = source.toLowerCase();
+ const match =
+ Object.values(referrers).find((r) => r.name.toLowerCase() === lower) ||
+ referrers[normalizeHost(lower)];
 if (match) {
 return {
 name: match.name,
 type: match.type,
 url: '',
 };
 }
packages/db/prisma/migrations/20251018214030_fix/migration.sql (1)

1-13: Consider more descriptive migration naming.

The migration name "fix" is not descriptive. Consider renaming to something like "remove_redundant_import_columns" or "migrate_import_config_to_json" to better indicate the purpose of this schema change.

packages/db/code-migrations/5-add-imports-table.sql (1)

32-32: Consider using an Enum for import_status.

The import_status field uses LowCardinality(String) with a default of 'pending'. For better type safety and query performance, consider using an Enum type instead:

`import_status` LowCardinality(Enum8('pending' = 1, 'processing' = 2, 'completed' = 3, 'failed' = 4)) DEFAULT 'pending'

This ensures only valid status values can be stored and provides better compression.

packages/db/prisma/schema.prisma (1)

470-486: Consider using an enum for the status field.

The status field is defined as a String with a comment indicating it should be one of 'pending' | 'processing' | 'completed' | 'failed'. For better type safety and database constraints, define a Prisma enum:

enum ImportStatus {
 pending
 processing
 completed
 failed
}
model Import {
 // ...
 status ImportStatus // instead of String
 // ...
}

This provides compile-time type checking and database-level validation.

packages/trpc/src/root.ts (1)

7-7: Router key named "import" (reserved keyword) — consider renaming for DX.

Valid in JS/TS, but can confuse tooling/usages (e.g., appRouter.import vs dynamic import()). Consider renaming to imports/importer for clarity.

Also applies to: 44-44

packages/queue/src/queues.ts (1)

163-176: Harden import jobs: add attempts/backoff/timeout; optionally expose QueueEvents.

Imports call external providers; transient failures are expected. Provide sensible defaults and metrics hook.

Apply:

 export const importQueue = new Queue<ImportQueuePayload>('import', {
 connection: getRedisQueue(),
 defaultJobOptions: {
 removeOnComplete: 10,
- removeOnFail: 50,
+ removeOnFail: 50,
+ attempts: 3,
+ backoff: { type: 'exponential', delay: 30_000 },
+ timeout: 1000 * 60 * 30, // 30 min; tune as needed
 },
 });
+
+// Optional: enable events for monitoring dashboards/Bull Board progress
+export const importQueueEvents = new QueueEvents('import', {
+ connection: getRedisQueue(),
+});
apps/worker/src/boot-workers.ts (3)

138-142: Also drain import queue before shutdown to avoid cutting imports mid-run.

- await waitForQueueToEmpty(cronQueue);
+ await waitForQueueToEmpty(cronQueue);
+ await waitForQueueToEmpty(importQueue, 10 * 60_000); // 10 min cap; tune as needed

155-167: Dev path skips graceful shutdown — can leave jobs in inconsistent state.

Unify behavior and call the same exit handler in dev.

- process.on(evt, (code) => {
- if (process.env.NODE_ENV === 'production') {
- exitHandler(evt, code);
- } else {
- logger.info('Shutting down for development', {
- event: evt,
- code,
- });
- process.exit(0);
- }
- });
+ process.on(evt, (code) => {
+ exitHandler(evt, code ?? 0);
+ });

4-13: Remove unused imports and dead/commented code for clarity.

eventsGroupQueue, GroupWorker, eventsJob, incomingEventPure are no longer used; commented worker block can be dropped.

Also applies to: 18-23, 33-48

packages/trpc/src/routers/import.ts (3)

121-131: Guard job removal on delete with error handling (active/locked jobs can throw).

 const job = await importQueue.getJob(importRecord.jobId);
- if (job) {
- await job.remove();
- }
+ if (job) {
+ try {
+ await job.remove();
+ } catch (e) {
+ // best-effort removal; proceed with deletion
+ }
+ }

165-171: Retry should set stable jobId and attempts/backoff like create.

- const job = await importQueue.add('import', {
- type: 'import',
- payload: {
- importId: importRecord.id,
- },
- });
+ const job = await importQueue.add(
+ 'import',
+ { type: 'import', payload: { importId: importRecord.id } },
+ { jobId: importRecord.id, attempts: 3, backoff: { type: 'exponential', delay: 30_000 } },
+ );

3-6: Unused type import (IImportConfig).

IImportConfig is not referenced. Remove to satisfy lint.

packages/importer/src/index.ts (1)

1-14: Public API surface looks good; minor polish

Consider also re-exporting provider config/types (e.g., IUmamiImportConfig) from providers to keep consumers on a single import path. Optional.

IMPORT_REVIEW.md (1)

320-337: Move this review doc to /docs or PR notes to avoid shipping in package tarballs

Keeps repo clean and avoids confusing consumers.

apps/worker/src/jobs/import.ts (4)

1-11: Avoid deep imports; prefer package barrel exports

Importing services via '@openpanel/db/src/services/import.service' couples to internal paths. Re-export these from '@openpanel/db' and import from there to reduce breakage.


75-81: Progress totalBatches fluctuates; compute once or report pct

As totalEvents grows, totalBatches changes mid-run. Either omit totalBatches here or compute a percentage for UI.

- totalBatches: Math.ceil(totalEvents / BATCH_SIZE),
+ progressPct: totalEvents ? Math.round((processedEvents / totalEvents) * 100) : 0,

17-17: Make BATCH_SIZE configurable

Read from config/env to tune per deployment.

-const BATCH_SIZE = 5000;
+const BATCH_SIZE = Number(process.env.IMPORT_BATCH_SIZE ?? 5000);

200-209: Validate provider-specific config with a schema before instantiation

Avoid passing untyped Prisma JSON into providers. Parse with provider schema (zod) and fail early.

 function createProvider(
 record: Prisma.ImportGetPayload<{ include: { project: true } }>,
 ) {
 switch (record.config.provider) {
 case 'umami':
- return new UmamiProvider(record.projectId, record.config);
+ // Example: zUmamiImportConfig.safeParse(record.config)
+ return new UmamiProvider(
+ record.projectId,
+ (record.config as any) /* TODO: parse/validate */,
+ );
 default:
 throw new Error(`Unknown provider: ${record.config.provider}`);
 }
 }

I can wire in zod parsing if you expose the provider config schemas from @openpanel/importer.

packages/db/prisma/migrations/20251018205153_add_import_table/migration.sql (2)

9-16: Constrain status values

Add a CHECK constraint (or enum) to prevent invalid states.

 "status" TEXT NOT NULL,
@@
 ) ;
+
+-- Restrict status values
+ALTER TABLE "public"."imports"
+ ADD CONSTRAINT "imports_status_check"
+ CHECK ("status" IN ('pending','processing','completed','failed'));

18-22: Add indexes for common queries

List/filters will hit projectId/status/createdAt frequently.

 -- AddForeignKey
 ALTER TABLE "public"."imports" ADD CONSTRAINT "imports_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "public"."projects"("id") ON DELETE CASCADE ON UPDATE CASCADE;
+
+-- Indexes for performance
+CREATE INDEX IF NOT EXISTS "idx_imports_project_createdAt"
+ ON "public"."imports" ("projectId", "createdAt" DESC);
+CREATE INDEX IF NOT EXISTS "idx_imports_status_createdAt"
+ ON "public"."imports" ("status", "createdAt" DESC);
apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.imports.tsx (5)

5-10: Remove unused icon import

CheckCircle is unused.

-import {
- CheckCircle,
- CheckCircleIcon,
- Download,
- Loader2Icon,
- XCircleIcon,
-} from 'lucide-react';
+import { CheckCircleIcon, Download, Loader2Icon, XCircleIcon } from 'lucide-react';

83-104: Add onError to mutations for user feedback

Surface errors with toasts; also consider logging.

 const deleteImport = useMutation(
 trpc.import.delete.mutationOptions({
 onSuccess: () => {
 toast.success('Import deleted', {
 description: 'The import has been successfully deleted.',
 });
 queryClient.invalidateQueries(trpc.import.list.pathFilter());
 },
+ onError: (err) => {
+ toast.error('Delete failed', { description: String(err) });
+ },
 }),
 );
@@
 const retryImport = useMutation(
 trpc.import.retry.mutationOptions({
 onSuccess: () => {
 toast.success('Import retried', {
 description: 'The import has been queued for processing again.',
 });
 queryClient.invalidateQueries(trpc.import.list.pathFilter());
 },
+ onError: (err) => {
+ toast.error('Retry failed', { description: String(err) });
+ },
 }),
 );

244-261: Disable action buttons while pending to avoid double submits

Tie disabled state to mutation pending flags.

- {imp.status === 'failed' && (
+ {imp.status === 'failed' && (
 <Button
 variant="outline"
 size="sm"
- onClick={() => retryImport.mutate({ id: imp.id })}
+ disabled={retryImport.isPending}
+ onClick={() => retryImport.mutate({ id: imp.id })}
 >
 Retry
 </Button>
 )}
 <Button
 variant="ghost"
 size="sm"
- onClick={() => deleteImport.mutate({ id: imp.id })}
+ disabled={deleteImport.isPending}
+ onClick={() => deleteImport.mutate({ id: imp.id })}
 >
 Delete
 </Button>

41-66: Consider serving logos locally or via asset proxy

External Brandfetch URLs can break and may violate CSP. If feasible, mirror assets or use a proxy.


141-161: Consider extending IntegrationCardLogoImage to accept a customizable alt prop for improved accessibility

The alt attribute is currently hardcoded to the generic "Integration Logo" at line 66 of apps/start/src/components/integrations/integration-card.tsx. Extending the component to accept an optional alt prop and passing provider.name would provide more descriptive alternative text for each integration, improving accessibility and SEO.

packages/validation/src/index.ts (1)

444-445: Local file support vs schema

fileUrl is z.string().url(), but the Umami provider supports local files. Either (a) allow file:/// or absolute paths, or (b) drop local-path support in the provider. Today this schema blocks local imports.

- fileUrl: z.string().url(),
+ fileUrl: z.string().refine(
+ (v) => /^https?:\/\//i.test(v) || /^file:\/\//i.test(v) || v.startsWith('/'),
+ 'Must be an http(s) URL, file:// URL, or absolute path',
+ ),
packages/importer/src/providers/umami.ts (2)

178-195: Byte counting via body.on('data') competes with pipeline

Attach a lightweight Transform to count bytes between body and decompress to preserve backpressure and avoid dual consumers.

- if (maxBytes) {
- body.on('data', (chunk: Buffer) => {
- seenBytes += chunk.length;
- if (seenBytes > maxBytes) {
- controller.abort();
- body.destroy(new Error(`Stream exceeded size limit (${seenBytes} > ${maxBytes})`));
- }
- });
- }
+ const byteCounter = new (await import('node:stream')).Transform({
+ transform(chunk, _enc, cb) {
+ seenBytes += chunk.length;
+ if (maxBytes && seenBytes > maxBytes) {
+ cb(new Error(`Stream exceeded size limit (${seenBytes} > ${maxBytes})`));
+ return;
+ }
+ cb(null, chunk);
+ },
+ });
@@
- if (decompress) {
- await pipeline(body, decompress, parser, { signal: controller.signal });
- } else {
- await pipeline(body, parser, { signal: controller.signal });
- }
+ if (decompress) {
+ await pipeline(body, byteCounter, decompress, parser, { signal: controller.signal });
+ } else {
+ await pipeline(body, byteCounter, parser, { signal: controller.signal });
+ }

Also applies to: 211-227


91-99: Optional: support local Brotli (.br) like remote path

Mirror remote handling and support .br locally if needed.

- const isGzipped = filePath.endsWith('.gz');
+ const isGzipped = filePath.endsWith('.gz');
+ const isBrotli = filePath.endsWith('.br');
@@
- if (isGzipped) {
+ if (isGzipped) {
 await pipeline(fileStream, createGunzip(), parser);
- } else {
+ } else if (isBrotli) {
+ await pipeline(fileStream, createBrotliDecompress(), parser);
+ } else {
 await pipeline(fileStream, parser);
 }
packages/db/src/services/import.service.ts (3)

78-92: Consider using CTEs or subqueries for better maintainability.

The argMin/argMax tuple approach works but is fragile—the 17-element tuple order must match the destructuring on lines 147-173. If fields are added, removed, or reordered, this becomes error-prone and difficult to debug.

A clearer alternative would be to use CTEs with ROW_NUMBER() or separate queries for first/last events, making field selection explicit:

WITH ranked_events AS (
 SELECT *,
 ROW_NUMBER() OVER (PARTITION BY device_id, session_id, project_id, profile_id ORDER BY created_at ASC) as first_rank,
 ROW_NUMBER() OVER (PARTITION BY device_id, session_id, project_id, profile_id ORDER BY created_at DESC) as last_rank
 FROM ${TABLE_NAMES.events_imports}
 WHERE ${rangeWhere} AND name NOT IN ('session_start', 'session_end')
)
SELECT ...
FROM ranked_events
WHERE first_rank = 1 OR last_rank = 1

26-525: Consider adding explicit error handling and logging.

None of the functions include try-catch blocks or error enrichment. While throwing errors to the caller is acceptable, adding contextual error handling would improve debuggability and allow for better error classification (transient vs. permanent failures).

Consider wrapping critical operations:

export async function insertImportBatch(
 events: IClickhouseEvent[],
 importId: string,
): Promise<ImportStageResult> {
 try {
 if (events.length === 0) {
 return { importId, totalEvents: 0, insertedEvents: 0 };
 }
 
 const eventsWithMetadata = events.map((event) => ({
 ...event,
 import_id: importId,
 import_status: 'pending',
 imported_at_meta: new Date(),
 }));
 await ch.insert({
 table: TABLE_NAMES.events_imports,
 values: eventsWithMetadata,
 format: 'JSONEachRow',
 });
 return {
 importId,
 totalEvents: events.length,
 insertedEvents: events.length,
 };
 } catch (error) {
 // Add context and rethrow or log
 throw new Error(`Failed to insert import batch for ${importId}: ${error instanceof Error ? error.message : String(error)}`);
 }
}

This pattern would help with debugging import failures in production.


278-356: Consider batch migration for very large imports.

The INSERT INTO ... SELECT approach is efficient for most cases, but for imports with millions of events, it might:

  • Block for extended periods
  • Consume significant memory
  • Provide no progress feedback during the migration

For very large imports, consider implementing batch migration:

export async function migrateImportToProduction(
 importId: string,
 from?: string,
 to?: string,
 batchSize = 100000, // configurable batch size
): Promise<void> {
 let offset = 0;
 let migrated = 0;
 
 while (true) {
 const migrationQuery = `
 INSERT INTO ${TABLE_NAMES.events} (...)
 SELECT ...
 FROM ${TABLE_NAMES.events_imports}
 WHERE import_id = {importId:String}
 ${from ? 'AND created_at >= {from:String}' : ''}
 ${to ? 'AND created_at < {to:String}' : ''}
 ORDER BY created_at ASC
 LIMIT {batchSize:UInt32} OFFSET {offset:UInt32}
 `;
 
 const result = await ch.command({
 query: migrationQuery,
 query_params: { importId, from, to, batchSize, offset },
 });
 
 // If no rows affected, we're done
 if (result.query_id && migrated === 0) break;
 
 offset += batchSize;
 // Could call updateImportProgress here
 }
}

This is optional and depends on expected import sizes.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d8a297e and d71712e.

⛔ Files ignored due to path filters (2)
  • packages/importer/a70ff57d-f632-4292-a98b-658d7734fbec.csv is excluded by !**/*.csv
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (35)
  • IMPORT_REVIEW.md (1 hunks)
  • apps/start/package.json (1 hunks)
  • apps/start/src/components/integrations/integration-card.tsx (1 hunks)
  • apps/start/src/components/ui/badge.tsx (1 hunks)
  • apps/start/src/modals/add-import.tsx (1 hunks)
  • apps/start/src/modals/index.tsx (2 hunks)
  • apps/start/src/routeTree.gen.ts (11 hunks)
  • apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.imports.tsx (1 hunks)
  • apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.tsx (1 hunks)
  • apps/worker/package.json (1 hunks)
  • apps/worker/src/boot-workers.ts (5 hunks)
  • apps/worker/src/index.ts (2 hunks)
  • apps/worker/src/jobs/import.ts (1 hunks)
  • packages/db/code-migrations/5-add-imports-table.sql (1 hunks)
  • packages/db/code-migrations/5-add-imports-table.ts (1 hunks)
  • packages/db/prisma/migrations/20251018205153_add_import_table/migration.sql (1 hunks)
  • packages/db/prisma/migrations/20251018214030_fix/migration.sql (1 hunks)
  • packages/db/prisma/schema.prisma (2 hunks)
  • packages/db/src/clickhouse/client.ts (1 hunks)
  • packages/db/src/services/event.service.ts (1 hunks)
  • packages/db/src/services/import.service.ts (1 hunks)
  • packages/db/src/types.ts (2 hunks)
  • packages/importer/package.json (1 hunks)
  • packages/importer/src/base-provider.ts (1 hunks)
  • packages/importer/src/index.ts (1 hunks)
  • packages/importer/src/providers/umami.ts (1 hunks)
  • packages/importer/src/types.ts (1 hunks)
  • packages/importer/src/utils/referrer-parser.ts (1 hunks)
  • packages/importer/tsconfig.json (1 hunks)
  • packages/importer/vitest.config.ts (1 hunks)
  • packages/queue/src/queues.ts (1 hunks)
  • packages/trpc/package.json (1 hunks)
  • packages/trpc/src/root.ts (2 hunks)
  • packages/trpc/src/routers/import.ts (1 hunks)
  • packages/validation/src/index.ts (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/start/**/*.{ts,tsx}

📄 CodeRabbit inference engine (apps/start/.cursorrules)

apps/start/**/*.{ts,tsx}: Instrument server functions created with createServerFn by wrapping their implementation in Sentry.startSpan and importing Sentry from '@sentry/tanstackstart-react'
When using Sentry in the TanStack Start React app, import it as: import * as Sentry from '@sentry/tanstackstart-react'

Files:

  • apps/start/src/components/integrations/integration-card.tsx
  • apps/start/src/modals/index.tsx
  • apps/start/src/modals/add-import.tsx
  • apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.tsx
  • apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.imports.tsx
  • apps/start/src/components/ui/badge.tsx
  • apps/start/src/routeTree.gen.ts
🧬 Code graph analysis (17)
packages/db/src/types.ts (1)
packages/validation/src/index.ts (1)
  • IImportConfig (464-467)
packages/importer/src/utils/referrer-parser.ts (1)
packages/common/src/string.ts (1)
  • stripTrailingSlash (1-3)
packages/queue/src/queues.ts (1)
packages/redis/redis.ts (1)
  • getRedisQueue (93-105)
apps/worker/src/jobs/import.ts (3)
packages/queue/src/queues.ts (1)
  • ImportQueuePayload (163-168)
packages/importer/src/providers/umami.ts (2)
  • UmamiRawEvent (55-55)
  • UmamiProvider (57-426)
packages/db/src/services/import.service.ts (8)
  • insertImportBatch (26-53)
  • updateImportProgress (387-400)
  • getImportDateBounds (508-525)
  • reconstructSessions (61-272)
  • migrateImportToProduction (278-356)
  • markImportComplete (361-382)
  • getImportProgress (434-488)
  • markImportFailed (405-429)
apps/start/src/modals/index.tsx (1)
apps/start/src/modals/add-import.tsx (1)
  • AddImport (29-164)
packages/importer/vitest.config.ts (1)
vitest.shared.ts (1)
  • getSharedVitestConfig (4-27)
apps/worker/src/boot-workers.ts (2)
packages/queue/src/queues.ts (1)
  • importQueue (170-176)
apps/worker/src/jobs/import.ts (1)
  • importJob (19-198)
packages/trpc/src/root.ts (1)
packages/trpc/src/routers/import.ts (1)
  • importRouter (11-183)
packages/db/code-migrations/5-add-imports-table.ts (2)
packages/db/code-migrations/helpers.ts (1)
  • getIsCluster (17-24)
packages/db/src/clickhouse/migration.ts (2)
  • createTable (68-116)
  • runClickhouseMigrationCommands (322-454)
apps/start/src/modals/add-import.tsx (2)
apps/start/src/integrations/tanstack-query/root-provider.tsx (1)
  • Provider (68-86)
packages/importer/src/types.ts (1)
  • ImportConfig (7-12)
packages/trpc/src/routers/import.ts (6)
packages/trpc/src/trpc.ts (2)
  • createTRPCRouter (138-138)
  • protectedProcedure (161-164)
packages/trpc/src/access.ts (1)
  • getProjectAccessCached (4-4)
packages/trpc/src/errors.ts (1)
  • TRPCAccessError (3-7)
packages/db/src/prisma-client.ts (1)
  • db (270-270)
packages/validation/src/index.ts (1)
  • zCreateImport (469-477)
packages/queue/src/queues.ts (1)
  • importQueue (170-176)
packages/db/src/services/import.service.ts (3)
packages/db/src/services/event.service.ts (1)
  • IClickhouseEvent (68-102)
packages/db/src/clickhouse/client.ts (4)
  • ch (131-141)
  • TABLE_NAMES (48-60)
  • convertClickhouseDateToJs (218-220)
  • formatClickhouseDate (188-199)
packages/db/src/prisma-client.ts (1)
  • db (270-270)
apps/worker/src/index.ts (1)
packages/queue/src/queues.ts (1)
  • importQueue (170-176)
apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.imports.tsx (2)
apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.tsx (1)
  • Route (12-34)
apps/start/src/components/integrations/integration-card.tsx (3)
  • IntegrationCard (91-112)
  • IntegrationCardLogoImage (50-69)
  • IntegrationCardFooter (3-15)
packages/importer/src/base-provider.ts (2)
packages/importer/src/types.ts (3)
  • BaseRawEvent (40-42)
  • ImportJobMetadata (69-73)
  • ErrorContext (45-51)
packages/db/src/services/event.service.ts (1)
  • IClickhouseEvent (68-102)
packages/importer/src/providers/umami.ts (4)
packages/validation/src/index.ts (1)
  • IUmamiImportConfig (450-450)
packages/db/src/services/event.service.ts (2)
  • IClickhouseEvent (68-102)
  • query (701-847)
packages/common/src/url.ts (2)
  • parsePath (11-49)
  • isSameDomain (51-63)
packages/importer/src/utils/referrer-parser.ts (2)
  • parseReferrer (49-58)
  • getReferrerWithQuery (60-91)
packages/importer/src/types.ts (2)
packages/db/src/services/event.service.ts (1)
  • IServiceImportedEventPayload (144-147)
packages/db/src/services/import.service.ts (2)
  • ImportProgress (16-21)
  • ImportStageResult (10-14)
🪛 Gitleaks (8.28.0)
apps/worker/src/index.ts

[high] 60-60: Identified a pattern that may indicate AWS credentials, risking unauthorized cloud resource access and data breaches on AWS platforms.

(aws-access-token)

🪛 LanguageTool
IMPORT_REVIEW.md

[uncategorized] ~35-~35: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ... batchNumber: number ) ``` --- ## 🟡 Medium Priority Issues ### 2. **Missing UUID Dependenc...

(EN_COMPOUND_ADJECTIVE_INTERNAL)


[grammar] ~156-~156: Ensure spelling is correct
Context: ...d for EVERY event ``` Improvement: Cleanup periodically (e.g., every 1000 events) ...

(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: build-and-push-api
  • GitHub Check: build-and-push-worker
🔇 Additional comments (24)
apps/start/package.json (1)

6-6: Clarify the motivation for renaming "testing" → "testing2".

The script was renamed from "testing" to "testing2", but the name "testing2" is not descriptive and appears to be a placeholder. Additionally, this change seems orthogonal to the PR objective ("new importer").

Please clarify:

  1. What is the intended purpose of renaming this script?
  2. Is "testing2" a temporary name, or was a different name intended?
  3. Was the original "testing" script referenced in CI/CD pipelines, documentation, or developer workflows? If so, those references need to be updated to avoid breaking them.

If this change is unintended, consider reverting it.

apps/start/src/components/integrations/integration-card.tsx (1)

50-69: LGTM! Consistent API enhancement.

The addition of the optional className prop aligns IntegrationCardLogoImage with the other components in this file (IntegrationCardFooter, IntegrationCardHeader, IntegrationCardHeaderButtons) that already accept className. The prop is properly forwarded to IntegrationCardLogo, which correctly merges it using the cn() utility.

apps/start/src/components/ui/badge.tsx (2)

12-13: LGTM! Success variant follows good practices.

The emerald-700 background with white text provides good contrast and is semantically appropriate for success states.


16-16: LGTM! Info variant is well-implemented.

The blue-500 background with white text provides adequate contrast and follows the established pattern.

packages/importer/tsconfig.json (1)

1-9: LGTM! Clean TypeScript configuration.

The configuration is properly structured for the new importer package, extending the shared base config and setting appropriate compiler options for a source-in-src, output-to-dist layout.

packages/importer/package.json (1)

27-27: bullmq dependency placement is correct.

Verification shows bullmq is exclusively in devDependencies with no runtime imports or usage in the importer source code. The package does not directly enqueue or process jobs—job operations are handled through the @openpanel/queue workspace dependency. The current placement is appropriate.

packages/db/src/types.ts (1)

2-2: Alignment with validation types looks good

The Prisma JSON alias cleanly mirrors the validation union. No issues spotted.

Also applies to: 16-16

packages/trpc/package.json (1)

19-19: No circular dependency detected—safe to proceed.

Verification confirms @openpanel/queue does not depend on @openpanel/trpc. The dependency is unidirectional: @openpanel/trpc (line 19) → @openpanel/queue. No cycle risk.

apps/start/src/routes/_app.$organizationId.$projectId_.settings._tabs.tsx (1)

45-45: LGTM! Clean addition of the Imports tab.

The new imports tab follows the existing pattern and integrates well with the settings navigation structure.

apps/worker/package.json (1)

23-23: LGTM! Necessary dependency for import functionality.

The addition of the @openpanel/importer workspace dependency enables the worker to process import jobs.

packages/db/src/clickhouse/client.ts (1)

59-59: LGTM! Table name constant added correctly.

The events_imports table name addition follows the existing pattern and aligns with the new ClickHouse migration.

apps/worker/src/index.ts (1)

8-8: LGTM! Import queue integration added correctly.

The importQueue is properly imported and registered with Bull Board for monitoring.

Also applies to: 42-42

packages/db/src/services/event.service.ts (1)

144-147: LGTM! Type definition is clear and purposeful.

The IServiceImportedEventPayload type correctly omits profile and meta fields for imported events, which makes sense as these would need to be resolved separately during import processing.

packages/db/code-migrations/5-add-imports-table.sql (2)

42-43: Verify the 7-day TTL is intentional for staging data.

The TTL of 7 days on imported_at_meta means all staging import data will be automatically deleted after one week. Confirm this aligns with your data retention requirements and import workflow expectations. If imports might take longer than 7 days to validate or if you need to keep historical import records, consider increasing this value or making it configurable.


1-38: LGTM! Well-designed ClickHouse table schema.

The table structure uses appropriate data types, codecs for compression, and partitioning strategy. The use of LowCardinality for categorical fields and specialized codecs (ZSTD, Delta, Gorilla) shows good understanding of ClickHouse optimization patterns.

packages/db/prisma/schema.prisma (1)

470-486: LGTM! Import model is well-structured.

The Import model has appropriate fields for tracking import progress, includes proper relations to Project with cascade delete, and uses sensible defaults. The use of Json for config provides flexibility while the comment /// [IPrismaImportConfig] indicates type documentation exists elsewhere.

Also applies to: 197-197

IMPORT_REVIEW.md (2)

37-59: Remove the action item; importer uses node:crypto, not the uuid package

The script results show that packages/importer uses randomUUID() imported from node:crypto (in packages/importer/src/providers/umami.ts and packages/cli/src/importer/importer.ts), not from the external uuid package. The uuid package is used elsewhere in the codebase (specifically in packages/db/src/services/event.service.ts), but not in the importer package. The review comment's recommendation to add uuid to the importer's dependencies is unnecessary.

Likely an incorrect or invalid review comment.


10-31: Review comment is based on outdated or incorrect code state; actual implementation already uses proper types

The review comment cites code that does not match the current implementation:

  • processBatch function: Does not exist in the codebase (rg search found zero matches)
  • any type usage: No matches found in apps/worker/src/jobs/ (contradicts claim of "extensive use")
  • Actual typing: Line 46 of apps/worker/src/jobs/import.ts already uses const eventBatch: UmamiRawEvent[] = []; (properly typed, not any[])
  • BaseImportProvider: Confirmed generic with <TRawEvent extends BaseRawEvent = BaseRawEvent>, and UmamiProvider correctly extends it as BaseImportProvider<UmamiRawEvent>

The current implementation already has the proper type safety that the review comment suggests. The code is not coupled via weak typing but properly typed with UmamiRawEvent. No refactoring is needed.

Likely an incorrect or invalid review comment.

apps/start/src/routeTree.gen.ts (1)

57-57: Generated file — no manual edits

Looks consistent with the new Imports tab. No action; keep excluded from lint/format.

Also applies to: 373-378, 511-511, 564-564, 800-801, 1170-1176, 1504-1517

packages/importer/src/types.ts (1)

20-25: Naming duplication exists but no actual import collision risk – review comment is incorrect.

The analysis shows that while ImportProgress and ImportStageResult are defined in both packages, the importer package does not import these collision types from @openpanel/db. The importer only imports unrelated types like IClickhouseEvent and IServiceImportedEventPayload.

Additionally, the ImportProgress types serve different purposes: the importer's version tracks batch processing state (totalEvents, processedEvents, currentBatch, totalBatches), while the db version tracks job metadata (importId, totalEvents, insertedEvents, status). This is intentional, not a collision.

Since there are no cross-imports and the types aren't used interchangeably, no "confusion in imports" or "subtle type bugs" will occur. Both packages maintain independent type contracts appropriate to their concerns.

Likely an incorrect or invalid review comment.

packages/db/src/services/import.service.ts (4)

1-21: LGTM!

The imports and type definitions are well-structured. The interfaces clearly define the data contracts for staging results and progress tracking.


26-53: LGTM!

The batch insert function properly handles empty arrays, adds appropriate metadata, and leverages the retry-wrapped ClickHouse insert. The implementation is clean and defensive.


493-503: Verify cleanup scope: should Postgres import record also be deleted?

The cleanupFailedImport function only deletes from ClickHouse's events_imports table. The corresponding Postgres import record remains with status 'failed'.

Is this intentional (to preserve audit history), or should the Postgres record also be deleted for consistency?

If Postgres records should also be cleaned up, add:

 export async function cleanupFailedImport(importId: string): Promise<void> {
 const deleteQuery = `
 DELETE FROM ${TABLE_NAMES.events_imports}
 WHERE import_id = {importId:String} AND import_status = 'failed'
 `;
 await ch.command({
 query: deleteQuery,
 query_params: { importId },
 });
+ 
+ // Also remove from Postgres if desired
+ await db.import.delete({
+ where: { id: importId, status: 'failed' },
+ });
 }

508-525: LGTM!

The getImportDateBounds utility correctly handles the nullable case and provides a clean interface for date range queries. Defensive null check on line 524 prevents runtime errors.

Comment on lines +14 to +15
warning:
'border-transparent bg-yellow-500 text-white [a&]:hover:bg-yellow-500/90',
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix color contrast for accessibility.

White text on a yellow-500 background likely fails WCAG AA contrast requirements (4.5:1). Yellow backgrounds conventionally require dark text for proper readability.

Apply this diff to fix the contrast issue:

- warning:
- 'border-transparent bg-yellow-500 text-white [a&]:hover:bg-yellow-500/90',
+ warning:
+ 'border-transparent bg-yellow-500 text-yellow-950 [a&]:hover:bg-yellow-500/90',

Alternatively, use a darker yellow background with white text:

- warning:
- 'border-transparent bg-yellow-500 text-white [a&]:hover:bg-yellow-500/90',
+ warning:
+ 'border-transparent bg-yellow-600 text-white [a&]:hover:bg-yellow-600/90',
📝 Committable suggestion

!!️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
warning:
'border-transparent bg-yellow-500 text-white [a&]:hover:bg-yellow-500/90',
warning:
'border-transparent bg-yellow-500 text-yellow-950 [a&]:hover:bg-yellow-500/90',
Suggested change
warning:
'border-transparent bg-yellow-500 text-white [a&]:hover:bg-yellow-500/90',
warning:
'border-transparent bg-yellow-600 text-white [a&]:hover:bg-yellow-600/90',
🤖 Prompt for AI Agents
In apps/start/src/components/ui/badge.tsx around lines 14-15, the warning
variant uses 'bg-yellow-500 text-white' which likely fails WCAG contrast; change
either the text color to a dark color (e.g., replace text-white with
text-gray-900 or text-black) so dark text sits on yellow-500, or keep white text
and use a darker yellow background (e.g., replace bg-yellow-500 with
bg-yellow-600 or bg-yellow-700) and adjust the hover class accordingly to
maintain accessible contrast.

Comment on lines +55 to +60
toast.error('Import failed', {
description: error.message,
});
},
}),
);
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid leaking secrets in error paths/logs

error.message may include upstream details. Ensure server never echoes API secrets. Consider a generic toast and log details server-side only.

🤖 Prompt for AI Agents
In apps/start/src/modals/add-import.tsx around lines 55 to 60, the current toast
displays error.message which can leak upstream secrets; replace the client-side
toast with a generic user-facing message (e.g., "Import failed, try again
later") and remove inclusion of error.message, and instead ensure the server
logs the full error details securely (or returns a sanitized error code/message)
so secrets are never echoed to the client; update the toast usage accordingly
and confirm server-side logging/response sanitization is in place.

Comment on lines +62 to +84
const handleSubmit = () => {
if (sourceType === 'file') {
createImport.mutate({
projectId,
provider,
config: {
type: provider as 'umami' | 'plausible',
fileUrl: config.fileUrl || '',
},
});
} else {
createImport.mutate({
projectId,
provider: 'mixpanel',
config: {
type: 'mixpanel',
apiKey: config.apiKey || '',
apiSecret: config.apiSecret || '',
projectId: config.projectId || '',
},
});
}
};
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Payload shape mismatches zod schema (will be rejected by TRPC)

  • Missing provider inside config.
  • config.type should be 'file' or 'api' (not provider name).
  • Mixpanel path hardcodes incorrect type and omits provider in config.

Align with zCreateImport provider-specific config shapes.

- type Provider = 'umami' | 'plausible' | 'mixpanel';
-
-interface ImportConfig {
- type: Provider;
- fileUrl?: string;
- apiKey?: string;
- apiSecret?: string;
- projectId?: string;
-}
+type Provider = 'umami' | 'plausible' | 'mixpanel';
+
+interface ImportConfig {
+ fileUrl?: string;
+ apiKey?: string;
+ apiSecret?: string;
+ projectId?: string;
+}
@@
- const [config, setConfig] = useState<ImportConfig>({
- type: provider,
- ...(sourceType === 'file'
- ? { fileUrl: '' }
- : { apiKey: '', apiSecret: '', projectId: '' }),
- });
+ const [config, setConfig] = useState<ImportConfig>(
+ sourceType === 'file'
+ ? { fileUrl: '' }
+ : { apiKey: '', apiSecret: '', projectId: '' },
+ );
@@
- if (sourceType === 'file') {
- createImport.mutate({
- projectId,
- provider,
- config: {
- type: provider as 'umami' | 'plausible',
- fileUrl: config.fileUrl || '',
- },
- });
- } else {
- createImport.mutate({
- projectId,
- provider: 'mixpanel',
- config: {
- type: 'mixpanel',
- apiKey: config.apiKey || '',
- apiSecret: config.apiSecret || '',
- projectId: config.projectId || '',
- },
- });
- }
+ if (sourceType === 'file') {
+ createImport.mutate({
+ projectId,
+ provider, // 'umami' or 'plausible'
+ config: {
+ provider, // required by schema
+ type: 'file', // required by schema
+ fileUrl: config.fileUrl!,
+ },
+ });
+ } else {
+ createImport.mutate({
+ projectId,
+ provider: 'mixpanel',
+ config: {
+ provider: 'mixpanel', // required by schema
+ type: 'api', // required by schema
+ apiKey: config.apiKey!,
+ apiSecret: config.apiSecret!,
+ projectId: config.projectId!,
+ },
+ });
+ }

Also applies to: 13-21, 38-43

🤖 Prompt for AI Agents
In apps/start/src/modals/add-import.tsx around lines 62-84 (also fix same
pattern at 13-21 and 38-43): the object passed to createImport.mutate does not
match the zCreateImport schema — config is missing a provider field and
config.type is using provider names instead of the required 'file' | 'api'. Fix
by including provider at the top-level as you already do, and also add provider
inside config; set config.type = 'file' for file-based imports and config.type =
'api' for API-based imports; for file providers put fileUrl in config, for
mixpanel set config.apiKey, config.apiSecret, config.projectId and
config.provider = 'mixpanel' and config.type = 'api' so the payload matches the
schema exactly.

Comment on lines +12 to +14
import { UmamiProvider, type UmamiRawEvent } from '@openpanel/importer';
import type { ImportQueuePayload } from '@openpanel/queue';
import type { Job } from 'bullmq';
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Remove UmamiRawEvent coupling; use BaseImportProvider generics

This job should be provider-agnostic. Swap UmamiRawEvent for BaseImportProvider.

-import { UmamiProvider, type UmamiRawEvent } from '@openpanel/importer';
+import { UmamiProvider, type BaseImportProvider } from '@openpanel/importer';

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In apps/worker/src/jobs/import.ts around lines 12 to 14, remove the direct
coupling to UmamiRawEvent and instead use the generic BaseImportProvider<T> from
the importer package: drop the UmamiRawEvent import, import BaseImportProvider
(and its type) from '@openpanel/importer', and replace any references to
UmamiRawEvent in the job/payload types with the generic BaseImportProvider<T>
(e.g., ImportQueuePayload<BaseImportProvider<any>> or a concrete T where
appropriate). Ensure all imports and Job generics are updated to use the
BaseImportProvider generic so the job becomes provider-agnostic.

Comment on lines +45 to +69
// Phase 1: Load & Transform - Process events in batches
const eventBatch: UmamiRawEvent[] = [];

for await (const rawEvent of providerInstance.parseSource()) {
// Validate event
if (!providerInstance.validate(rawEvent)) {
jobLogger.warn('Skipping invalid event', { rawEvent });
continue;
}

eventBatch.push(rawEvent);
totalEvents++;

// Process batch when it reaches the batch size
if (eventBatch.length >= BATCH_SIZE) {
jobLogger.info('Processing batch', { batchSize: eventBatch.length });

// Transform events
const transformedEvents = eventBatch.map((event) =>
providerInstance.transformEvent(event),
);

// Phase 2: Insert into staging table
await insertImportBatch(transformedEvents, importId);

Copy link
Contributor

@coderabbitai coderabbitai bot Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Make batch buffer provider-agnostic and harden transform stage

  • Use unknown[] for the raw batch.
  • Guard transformEvent with try/catch to skip bad rows without failing the whole job.
- const eventBatch: UmamiRawEvent[] = [];
+ const eventBatch: unknown[] = [];
@@
- const transformedEvents = eventBatch.map((event) =>
- providerInstance.transformEvent(event),
- );
+ const transformedEvents: any[] = [];
+ for (const event of eventBatch) {
+ try {
+ transformedEvents.push(providerInstance.transformEvent(event as any));
+ } catch (e) {
+ jobLogger.warn('Skipping event on transform failure', {
+ provider: providerInstance.provider,
+ reason: e instanceof Error ? e.message : String(e),
+ });
+ }
+ }
📝 Committable suggestion

!!️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Phase 1: Load & Transform - Process events in batches
const eventBatch: UmamiRawEvent[] = [];
for await (const rawEvent of providerInstance.parseSource()) {
// Validate event
if (!providerInstance.validate(rawEvent)) {
jobLogger.warn('Skipping invalid event', { rawEvent });
continue;
}
eventBatch.push(rawEvent);
totalEvents++;
// Process batch when it reaches the batch size
if (eventBatch.length >= BATCH_SIZE) {
jobLogger.info('Processing batch', { batchSize: eventBatch.length });
// Transform events
const transformedEvents = eventBatch.map((event) =>
providerInstance.transformEvent(event),
);
// Phase 2: Insert into staging table
await insertImportBatch(transformedEvents, importId);
// Phase 1: Load & Transform - Process events in batches
const eventBatch: unknown[] = [];
for await (const rawEvent of providerInstance.parseSource()) {
// Validate event
if (!providerInstance.validate(rawEvent)) {
jobLogger.warn('Skipping invalid event', { rawEvent });
continue;
}
eventBatch.push(rawEvent);
totalEvents++;
// Process batch when it reaches the batch size
if (eventBatch.length >= BATCH_SIZE) {
jobLogger.info('Processing batch', { batchSize: eventBatch.length });
// Transform events
const transformedEvents: any[] = [];
for (const event of eventBatch) {
try {
transformedEvents.push(providerInstance.transformEvent(event as any));
} catch (e) {
jobLogger.warn('Skipping event on transform failure', {
provider: providerInstance.provider,
reason: e instanceof Error ? e.message : String(e),
});
}
}
// Phase 2: Insert into staging table
await insertImportBatch(transformedEvents, importId);
🤖 Prompt for AI Agents
In apps/worker/src/jobs/import.ts around lines 45 to 69, the batch buffer and
transform stage need hardening: change eventBatch from UmamiRawEvent[] to
unknown[] so the buffer is provider-agnostic, and wrap each
providerInstance.transformEvent call in a try/catch so a thrown error on one row
is caught, logged (jobLogger.warn with the rawEvent and error), and that row is
skipped without failing the whole job; ensure you collect only successful
transformed events (filter out failed/undefined results) before calling
insertImportBatch(importId).

Comment on lines +53 to +66
// Properties type for events - more specific than Record<string, any>
export interface EventProperties {
[key: string]:
| string
| number
| boolean
| null
| undefined
| Record<string, unknown>;
__query?: Record<string, unknown>;
__title?: string;
__screen?: string;
__language?: string;
}
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

EventProperties allows nested objects, but ClickHouse properties is primitives-only

IClickhouseEvent.properties is Record<string, string|number|boolean|undefined|null>. Storing objects (e.g., __query) will break ingestion unless stringified.

  • Constrain EventProperties values to primitives.
  • Or require JSON.stringify for nested shapes (e.g., __queryJson: string).
🤖 Prompt for AI Agents
In packages/importer/src/types.ts around lines 53 to 66, EventProperties
currently allows nested objects which will break ClickHouse ingestion because
IClickhouseEvent.properties must be primitives-only; update the EventProperties
interface so values are limited to string | number | boolean | null | undefined
(remove Record<string, unknown>) or replace nested object fields with explicit
JSON string fields (e.g., __queryJson: string) and rename/remove existing
__query, ensuring all nested shapes are JSON.stringified before being assigned
to EventProperties.

@@ -0,0 +1,3 @@
import { getSharedVitestConfig } from '../../vitest.shared';

export default getSharedVitestConfig({ __dirname });
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

__dirname is unavailable in ESM modules.

Since package.json declares "type": "module", this file runs as ESM. The __dirname global doesn't exist in ESM and will throw a ReferenceError.

Apply this diff to compute the directory path in ESM:

+import { dirname } from 'node:path';
+import { fileURLToPath } from 'node:url';
+
 import { getSharedVitestConfig } from '../../vitest.shared';
-export default getSharedVitestConfig({ __dirname });
+export default getSharedVitestConfig({ 
+ __dirname: dirname(fileURLToPath(import.meta.url)) 
+});
📝 Committable suggestion

!!️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
exportdefaultgetSharedVitestConfig({ __dirname });
import{dirname}from'node:path';
import { fileURLToPath } from 'node:url';
import { getSharedVitestConfig } from '../../vitest.shared';
export default getSharedVitestConfig({
__dirname: dirname(fileURLToPath(import.meta.url))
});
🤖 Prompt for AI Agents
In packages/importer/vitest.config.ts around line 3, the code passes the
non-existent __dirname to getSharedVitestConfig (ESM modules don't have
__dirname); compute the directory from import.meta.url instead and pass that
value: import fileURLToPath from 'url' and path from 'path' (or use their named
functions), derive const __dirname =
path.dirname(fileURLToPath(import.meta.url)), then call getSharedVitestConfig({
__dirname }) so the config receives the correct directory path in ESM.

Comment on lines +12 to +32
list: protectedProcedure
.input(z.object({ projectId: z.string() }))
.query(async ({ input, ctx }) => {
const access = await getProjectAccessCached({
projectId: input.projectId,
userId: ctx.session.userId,
});

if (!access) {
throw TRPCAccessError('You do not have access to this project');
}

return db.import.findMany({
where: {
projectId: input.projectId,
},
orderBy: {
createdAt: 'desc',
},
});
}),
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Sensitive config exposure in list/get — restrict or redact.

config likely contains provider credentials. Current list/get return it to any project member with read access. Redact or gate by role.

Option A (redact for read-level):

 list: protectedProcedure
 .input(z.object({ projectId: z.string() }))
 .query(async ({ input, ctx }) => {
 const access = await getProjectAccessCached({
 projectId: input.projectId,
 userId: ctx.session.userId,
 });
 if (!access) {
 throw TRPCAccessError('You do not have access to this project');
 }
- return db.import.findMany({
+ const rows = await db.import.findMany({
 where: { projectId: input.projectId },
 orderBy: { createdAt: 'desc' },
 });
+ const canSeeSecrets =
+ typeof access !== 'boolean' && access.level !== 'read';
+ return canSeeSecrets
+ ? rows
+ : rows.map((r) => ({ ...r, config: null }));
 }),
 get: protectedProcedure
 .input(z.object({ id: z.string() }))
 .query(async ({ input, ctx }) => {
 const importRecord = await db.import.findUniqueOrThrow({
 where: { id: input.id },
 include: { project: true },
 });
 const access = await getProjectAccessCached({
 projectId: importRecord.projectId,
 userId: ctx.session.userId,
 });
 if (!access) {
 throw TRPCAccessError('You do not have access to this import');
 }
- return importRecord;
+ const canSeeSecrets =
+ typeof access !== 'boolean' && access.level !== 'read';
+ return canSeeSecrets ? importRecord : { ...importRecord, config: null };
 }),

Option B: Use Prisma select to exclude config in list and only include it for admin/write roles.

Also applies to: 34-57

🤖 Prompt for AI Agents
In packages/trpc/src/routers/import.ts around lines 12 to 32 (and likewise apply
the change to lines 34 to 57), the current list/get resolvers return the full
import records including the sensitive config field to any project member;
change the DB queries to explicitly exclude the config field by using a Prisma
select (e.g., select: { config: false, ...otherFields }) for read-level access,
and only fetch/include config when the caller has a higher role (admin/write)
after re-checking the caller's role from ctx.session or the access object;
implement a conditional: if role allows, run a second query or include config in
select, otherwise omit/redact config from the returned objects and ensure no
accidental passthrough in list results.

Comment on lines +72 to +99
// Create import record
const importRecord = await db.import.create({
data: {
projectId: input.projectId,
config: input.config,
status: 'pending',
},
});

// Add job to queue
const job = await importQueue.add('import', {
type: 'import',
payload: {
importId: importRecord.id,
},
});

// Update import record with job ID
await db.import.update({
where: { id: importRecord.id },
data: { jobId: job.id },
});

return {
...importRecord,
jobId: job.id,
};
}),
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Queue enqueue not wrapped — record can be left pending with no job.

If importQueue.add fails, DB keeps a dangling 'pending' import. Wrap in try/catch, set failed status, and pass stable jobId/attempts/backoff.

 // Create import record
 const importRecord = await db.import.create({
 data: {
 projectId: input.projectId,
 config: input.config,
 status: 'pending',
 },
 });
- // Add job to queue
- const job = await importQueue.add('import', {
- type: 'import',
- payload: {
- importId: importRecord.id,
- },
- });
-
- // Update import record with job ID
- await db.import.update({
- where: { id: importRecord.id },
- data: { jobId: job.id },
- });
+ // Add job to queue
+ try {
+ const job = await importQueue.add(
+ 'import',
+ { type: 'import', payload: { importId: importRecord.id } },
+ { jobId: importRecord.id, attempts: 3, backoff: { type: 'exponential', delay: 30_000 } },
+ );
+ // Update import record with job ID (idempotent if jobId equals importRecord.id)
+ await db.import.update({
+ where: { id: importRecord.id },
+ data: { jobId: job.id },
+ });
+ } catch (err) {
+ await db.import.update({
+ where: { id: importRecord.id },
+ data: { status: 'failed', errorMessage: err instanceof Error ? err.message : 'Queue enqueue failed' },
+ });
+ throw err;
+ }
📝 Committable suggestion

!!️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Create import record
const importRecord = await db.import.create({
data: {
projectId: input.projectId,
config: input.config,
status: 'pending',
},
});
// Add job to queue
const job = await importQueue.add('import', {
type: 'import',
payload: {
importId: importRecord.id,
},
});
// Update import record with job ID
await db.import.update({
where: { id: importRecord.id },
data: { jobId: job.id },
});
return {
...importRecord,
jobId: job.id,
};
}),
// Create import record
const importRecord = await db.import.create({
data: {
projectId: input.projectId,
config: input.config,
status: 'pending',
},
});
// Add job to queue
try {
const job = await importQueue.add(
'import',
{ type: 'import', payload: { importId: importRecord.id } },
{ jobId: importRecord.id, attempts: 3, backoff: { type: 'exponential', delay: 30_000 } },
);
// Update import record with job ID (idempotent if jobId equals importRecord.id)
await db.import.update({
where: { id: importRecord.id },
data: { jobId: job.id },
});
} catch (err) {
await db.import.update({
where: { id: importRecord.id },
data: { status: 'failed', errorMessage: err instanceof Error ? err.message : 'Queue enqueue failed' },
});
throw err;
}
return {
...importRecord,
jobId: job.id,
};
}),
🤖 Prompt for AI Agents
In packages/trpc/src/routers/import.ts around lines 72-99, the code creates an
import DB record then calls importQueue.add without error handling so a failed
enqueue leaves a dangling 'pending' record; to fix, generate a stable jobId
(e.g. UUID) before enqueue, persist that jobId on the import record (or create
the record with that jobId) and then wrap importQueue.add in try/catch — on
success update any job metadata if needed, on failure update the DB record to
status 'failed' and write the error message and retry metadata
(attempts/backoff) to the record; ensure the job payload includes the stable
jobId and desired attempts/backoff so retries are deterministic.

Comment on lines +469 to +477
export const zCreateImport = z.object({
projectId: z.string().min(1),
provider: z.enum(['umami', 'plausible', 'mixpanel']),
config: z.union([
zUmamiImportConfig,
zPlausibleImportConfig,
zMixpanelImportConfig,
]),
});
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Discriminate by provider to prevent mismatched payloads

provider and config can disagree (e.g., provider='umami' with a Mixpanel config). Make zCreateImport a discriminated union on provider so config shape is tied to the selected provider.

-export const zCreateImport = z.object({
- projectId: z.string().min(1),
- provider: z.enum(['umami', 'plausible', 'mixpanel']),
- config: z.union([
- zUmamiImportConfig,
- zPlausibleImportConfig,
- zMixpanelImportConfig,
- ]),
-});
+export const zCreateImport = z.discriminatedUnion('provider', [
+ z.object({
+ projectId: z.string().min(1),
+ provider: z.literal('umami'),
+ config: zUmamiImportConfig,
+ }),
+ z.object({
+ projectId: z.string().min(1),
+ provider: z.literal('plausible'),
+ config: zPlausibleImportConfig,
+ }),
+ z.object({
+ projectId: z.string().min(1),
+ provider: z.literal('mixpanel'),
+ config: zMixpanelImportConfig,
+ }),
+]);
🤖 Prompt for AI Agents
In packages/validation/src/index.ts around lines 469 to 477, zCreateImport
currently allows provider and config to disagree; change it to a discriminated
union so each provider literal is paired with the correct config shape. Replace
the single z.object with either z.discriminatedUnion('provider', [...]) or a
z.union of three z.object schemas where provider is a literal ('umami' |
'plausible' | 'mixpanel') and config is the matching zUmamiImportConfig,
zPlausibleImportConfig, or zMixpanelImportConfig respectively; keep the same
exported name and types so callers get compile-time validation that provider and
config match.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

@coderabbitai coderabbitai[bot] coderabbitai[bot] left review comments

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

1 participant

AltStyle によって変換されたページ (->オリジナル) /