From 59efc85ad1f04e9b7f49717130c0c59e2d696004 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 16 Jun 2026 14:21:42 -0700 Subject: [PATCH] =?UTF-8?q?improvement(mothership):=20user=5Ftable=20speed?= =?UTF-8?q?=20parity=20=E2=80=94=20limit=20bounds,=20keyset=20paging,=20as?= =?UTF-8?q?ync=20import/delete=20jobs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lib/copilot/generated/tool-catalog-v1.ts | 22 +- .../lib/copilot/generated/tool-schemas-v1.ts | 218 +++++----- .../tools/server/table/user-table.test.ts | 349 +++++++++++++++- .../copilot/tools/server/table/user-table.ts | 385 +++++++++++++++--- apps/sim/lib/table/import-runner.test.ts | 117 ++++++ apps/sim/lib/table/import-runner.ts | 18 +- 6 files changed, 951 insertions(+), 158 deletions(-) create mode 100644 apps/sim/lib/table/import-runner.test.ts diff --git a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts index ae4296db33..b1b1f2fb8e 100644 --- a/apps/sim/lib/copilot/generated/tool-catalog-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-catalog-v1.ts @@ -3870,6 +3870,22 @@ export const UserTable: ToolCatalogEntry = { type: 'object', description: 'Arguments for the operation', properties: { + after: { + type: 'object', + description: + "Keyset cursor for query_rows: pass the nextCursor object ({ orderKey, id }) from the previous page's response to fetch the next page on the default row order. Cannot be combined with sort (supplying both is rejected). When set, offset is ignored.", + properties: { + id: { + type: 'string', + description: 'id of the last row of the previous page (from nextCursor).', + }, + orderKey: { + type: 'string', + description: 'orderKey of the last row of the previous page (from nextCursor).', + }, + }, + required: ['orderKey', 'id'], + }, autoRun: { type: 'boolean', description: @@ -3958,7 +3974,8 @@ export const UserTable: ToolCatalogEntry = { }, limit: { type: 'number', - description: 'Maximum rows to return or affect (optional, default 100)', + description: + 'Maximum rows to return or affect (optional; default 100, max 1000). For delete_rows_by_filter, omitting it lets matches above 1000 run as a background job.', }, mapping: { type: 'object', @@ -4011,7 +4028,8 @@ export const UserTable: ToolCatalogEntry = { }, offset: { type: 'number', - description: 'Number of rows to skip (optional for query_rows, default 0)', + description: + 'Number of rows to skip (optional for query_rows, default 0). For paging past more than a few pages, prefer the after cursor — offset re-scans every prior row.', }, outputColumnNames: { type: 'object', diff --git a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts index 5ed8d9224d..9dc84e33f4 100644 --- a/apps/sim/lib/copilot/generated/tool-schemas-v1.ts +++ b/apps/sim/lib/copilot/generated/tool-schemas-v1.ts @@ -10,7 +10,7 @@ export interface ToolRuntimeSchemaEntry { } export const TOOL_RUNTIME_SCHEMAS: Record = { - agent: { + ['agent']: { parameters: { properties: { request: { @@ -23,7 +23,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - auth: { + ['auth']: { parameters: { properties: { request: { @@ -36,7 +36,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - check_deployment_status: { + ['check_deployment_status']: { parameters: { type: 'object', properties: { @@ -48,7 +48,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - complete_scheduled_task: { + ['complete_scheduled_task']: { parameters: { type: 'object', properties: { @@ -61,7 +61,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - crawl_website: { + ['crawl_website']: { parameters: { type: 'object', properties: { @@ -96,7 +96,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - create_file: { + ['create_file']: { parameters: { type: 'object', properties: { @@ -162,7 +162,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { required: ['success', 'message'], }, }, - create_file_folder: { + ['create_file_folder']: { parameters: { type: 'object', properties: { @@ -180,7 +180,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - create_folder: { + ['create_folder']: { parameters: { type: 'object', properties: { @@ -201,7 +201,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - create_workflow: { + ['create_workflow']: { parameters: { type: 'object', properties: { @@ -226,7 +226,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - create_workspace_mcp_server: { + ['create_workspace_mcp_server']: { parameters: { type: 'object', properties: { @@ -259,7 +259,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - delete_file: { + ['delete_file']: { parameters: { type: 'object', properties: { @@ -289,7 +289,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { required: ['success', 'message'], }, }, - delete_file_folder: { + ['delete_file_folder']: { parameters: { type: 'object', properties: { @@ -305,7 +305,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - delete_folder: { + ['delete_folder']: { parameters: { type: 'object', properties: { @@ -321,7 +321,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - delete_workflow: { + ['delete_workflow']: { parameters: { type: 'object', properties: { @@ -337,7 +337,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - delete_workspace_mcp_server: { + ['delete_workspace_mcp_server']: { parameters: { type: 'object', properties: { @@ -350,7 +350,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - deploy: { + ['deploy']: { parameters: { properties: { request: { @@ -364,7 +364,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - deploy_api: { + ['deploy_api']: { parameters: { type: 'object', properties: { @@ -448,7 +448,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { ], }, }, - deploy_chat: { + ['deploy_chat']: { parameters: { type: 'object', properties: { @@ -607,7 +607,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { ], }, }, - deploy_mcp: { + ['deploy_mcp']: { parameters: { type: 'object', properties: { @@ -723,7 +723,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { required: ['deploymentType', 'deploymentStatus'], }, }, - diff_workflows: { + ['diff_workflows']: { parameters: { type: 'object', properties: { @@ -747,7 +747,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - download_to_workspace_file: { + ['download_to_workspace_file']: { parameters: { type: 'object', properties: { @@ -796,7 +796,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - edit_content: { + ['edit_content']: { parameters: { type: 'object', properties: { @@ -828,7 +828,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { required: ['success', 'message'], }, }, - edit_workflow: { + ['edit_workflow']: { parameters: { type: 'object', properties: { @@ -867,7 +867,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - enrichment_run: { + ['enrichment_run']: { parameters: { type: 'object', properties: { @@ -911,7 +911,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { required: ['matched', 'result'], }, }, - ffmpeg: { + ['ffmpeg']: { parameters: { type: 'object', properties: { @@ -1092,7 +1092,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - file: { + ['file']: { parameters: { properties: { prompt: { @@ -1105,7 +1105,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - function_execute: { + ['function_execute']: { parameters: { type: 'object', properties: { @@ -1243,7 +1243,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - generate_api_key: { + ['generate_api_key']: { parameters: { type: 'object', properties: { @@ -1261,7 +1261,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - generate_audio: { + ['generate_audio']: { parameters: { type: 'object', properties: { @@ -1413,7 +1413,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - generate_image: { + ['generate_image']: { parameters: { type: 'object', properties: { @@ -1541,7 +1541,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - generate_video: { + ['generate_video']: { parameters: { type: 'object', properties: { @@ -1708,7 +1708,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - get_block_outputs: { + ['get_block_outputs']: { parameters: { type: 'object', properties: { @@ -1729,7 +1729,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - get_block_upstream_references: { + ['get_block_upstream_references']: { parameters: { type: 'object', properties: { @@ -1751,7 +1751,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - get_deployed_workflow_state: { + ['get_deployed_workflow_state']: { parameters: { type: 'object', properties: { @@ -1764,7 +1764,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - get_deployment_log: { + ['get_deployment_log']: { parameters: { type: 'object', properties: { @@ -1777,7 +1777,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - get_page_contents: { + ['get_page_contents']: { parameters: { type: 'object', properties: { @@ -1805,14 +1805,14 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - get_platform_actions: { + ['get_platform_actions']: { parameters: { type: 'object', properties: {}, }, resultSchema: undefined, }, - get_scheduled_task_logs: { + ['get_scheduled_task_logs']: { parameters: { type: 'object', properties: { @@ -1837,7 +1837,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - get_workflow_data: { + ['get_workflow_data']: { parameters: { type: 'object', properties: { @@ -1856,7 +1856,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - get_workflow_run_options: { + ['get_workflow_run_options']: { parameters: { type: 'object', properties: { @@ -1869,7 +1869,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - glob: { + ['glob']: { parameters: { type: 'object', properties: { @@ -1888,7 +1888,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - grep: { + ['grep']: { parameters: { type: 'object', properties: { @@ -1936,7 +1936,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - knowledge: { + ['knowledge']: { parameters: { properties: { request: { @@ -1949,7 +1949,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - knowledge_base: { + ['knowledge_base']: { parameters: { type: 'object', properties: { @@ -2142,7 +2142,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { required: ['success', 'message'], }, }, - list_file_folders: { + ['list_file_folders']: { parameters: { type: 'object', properties: { @@ -2154,7 +2154,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - list_folders: { + ['list_folders']: { parameters: { type: 'object', properties: { @@ -2166,7 +2166,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - list_integration_tools: { + ['list_integration_tools']: { parameters: { properties: { integration: { @@ -2180,14 +2180,14 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - list_user_workspaces: { + ['list_user_workspaces']: { parameters: { type: 'object', properties: {}, }, resultSchema: undefined, }, - list_workspace_mcp_servers: { + ['list_workspace_mcp_servers']: { parameters: { type: 'object', properties: { @@ -2200,7 +2200,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - load_deployment: { + ['load_deployment']: { parameters: { type: 'object', properties: { @@ -2219,7 +2219,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - load_integration_tool: { + ['load_integration_tool']: { parameters: { properties: { tool_ids: { @@ -2236,7 +2236,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - manage_credential: { + ['manage_credential']: { parameters: { type: 'object', properties: { @@ -2265,7 +2265,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - manage_custom_tool: { + ['manage_custom_tool']: { parameters: { type: 'object', properties: { @@ -2345,7 +2345,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - manage_mcp_tool: { + ['manage_mcp_tool']: { parameters: { type: 'object', properties: { @@ -2397,7 +2397,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - manage_scheduled_task: { + ['manage_scheduled_task']: { parameters: { type: 'object', properties: { @@ -2472,7 +2472,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - manage_skill: { + ['manage_skill']: { parameters: { type: 'object', properties: { @@ -2505,7 +2505,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - materialize_file: { + ['materialize_file']: { parameters: { type: 'object', properties: { @@ -2529,7 +2529,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - media: { + ['media']: { parameters: { properties: { prompt: { @@ -2542,7 +2542,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - move_file: { + ['move_file']: { parameters: { type: 'object', properties: { @@ -2563,7 +2563,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - move_file_folder: { + ['move_file_folder']: { parameters: { type: 'object', properties: { @@ -2581,7 +2581,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - move_folder: { + ['move_folder']: { parameters: { type: 'object', properties: { @@ -2599,7 +2599,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - move_workflow: { + ['move_workflow']: { parameters: { type: 'object', properties: { @@ -2619,7 +2619,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - oauth_get_auth_link: { + ['oauth_get_auth_link']: { parameters: { type: 'object', properties: { @@ -2633,7 +2633,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - oauth_request_access: { + ['oauth_request_access']: { parameters: { type: 'object', properties: { @@ -2647,7 +2647,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - open_resource: { + ['open_resource']: { parameters: { type: 'object', properties: { @@ -2681,7 +2681,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - promote_to_live: { + ['promote_to_live']: { parameters: { type: 'object', properties: { @@ -2700,7 +2700,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - query_logs: { + ['query_logs']: { parameters: { type: 'object', properties: { @@ -2811,7 +2811,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - read: { + ['read']: { parameters: { type: 'object', properties: { @@ -2838,7 +2838,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - redeploy: { + ['redeploy']: { parameters: { type: 'object', properties: { @@ -2917,7 +2917,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { ], }, }, - rename_file: { + ['rename_file']: { parameters: { type: 'object', properties: { @@ -2953,7 +2953,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { required: ['success', 'message'], }, }, - rename_file_folder: { + ['rename_file_folder']: { parameters: { type: 'object', properties: { @@ -2970,7 +2970,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - rename_workflow: { + ['rename_workflow']: { parameters: { type: 'object', properties: { @@ -2987,7 +2987,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - research: { + ['research']: { parameters: { properties: { topic: { @@ -3000,7 +3000,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - respond: { + ['respond']: { parameters: { additionalProperties: true, properties: { @@ -3023,7 +3023,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - restore_resource: { + ['restore_resource']: { parameters: { type: 'object', properties: { @@ -3041,7 +3041,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - run: { + ['run']: { parameters: { properties: { context: { @@ -3058,7 +3058,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - run_block: { + ['run_block']: { parameters: { type: 'object', properties: { @@ -3090,7 +3090,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - run_from_block: { + ['run_from_block']: { parameters: { type: 'object', properties: { @@ -3122,7 +3122,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - run_workflow: { + ['run_workflow']: { parameters: { type: 'object', properties: { @@ -3160,7 +3160,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - run_workflow_until_block: { + ['run_workflow_until_block']: { parameters: { type: 'object', properties: { @@ -3203,7 +3203,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - scheduled_task: { + ['scheduled_task']: { parameters: { properties: { request: { @@ -3216,7 +3216,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - scrape_page: { + ['scrape_page']: { parameters: { type: 'object', properties: { @@ -3237,7 +3237,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - search_documentation: { + ['search_documentation']: { parameters: { type: 'object', properties: { @@ -3254,7 +3254,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - search_library_docs: { + ['search_library_docs']: { parameters: { type: 'object', properties: { @@ -3275,7 +3275,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - search_online: { + ['search_online']: { parameters: { type: 'object', properties: { @@ -3315,7 +3315,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - search_patterns: { + ['search_patterns']: { parameters: { type: 'object', properties: { @@ -3337,7 +3337,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - set_block_enabled: { + ['set_block_enabled']: { parameters: { type: 'object', properties: { @@ -3359,7 +3359,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - set_environment_variables: { + ['set_environment_variables']: { parameters: { type: 'object', properties: { @@ -3393,7 +3393,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - set_global_workflow_variables: { + ['set_global_workflow_variables']: { parameters: { type: 'object', properties: { @@ -3434,7 +3434,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - superagent: { + ['superagent']: { parameters: { properties: { task: { @@ -3448,7 +3448,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - table: { + ['table']: { parameters: { properties: { request: { @@ -3461,7 +3461,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - update_deployment_version: { + ['update_deployment_version']: { parameters: { type: 'object', properties: { @@ -3490,7 +3490,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - update_scheduled_task_history: { + ['update_scheduled_task_history']: { parameters: { type: 'object', properties: { @@ -3508,7 +3508,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - update_workspace_mcp_server: { + ['update_workspace_mcp_server']: { parameters: { type: 'object', properties: { @@ -3533,7 +3533,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - user_memory: { + ['user_memory']: { parameters: { type: 'object', properties: { @@ -3582,7 +3582,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - user_table: { + ['user_table']: { parameters: { type: 'object', properties: { @@ -3590,6 +3590,22 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { type: 'object', description: 'Arguments for the operation', properties: { + after: { + type: 'object', + description: + "Keyset cursor for query_rows: pass the nextCursor object ({ orderKey, id }) from the previous page's response to fetch the next page on the default row order. Cannot be combined with sort (supplying both is rejected). When set, offset is ignored.", + properties: { + id: { + type: 'string', + description: 'id of the last row of the previous page (from nextCursor).', + }, + orderKey: { + type: 'string', + description: 'orderKey of the last row of the previous page (from nextCursor).', + }, + }, + required: ['orderKey', 'id'], + }, autoRun: { type: 'boolean', description: @@ -3686,7 +3702,8 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, limit: { type: 'number', - description: 'Maximum rows to return or affect (optional, default 100)', + description: + 'Maximum rows to return or affect (optional; default 100, max 1000). For delete_rows_by_filter, omitting it lets matches above 1000 run as a background job.', }, mapping: { type: 'object', @@ -3745,7 +3762,8 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, offset: { type: 'number', - description: 'Number of rows to skip (optional for query_rows, default 0)', + description: + 'Number of rows to skip (optional for query_rows, default 0). For paging past more than a few pages, prefer the after cursor — offset re-scans every prior row.', }, outputColumnNames: { type: 'object', @@ -3944,7 +3962,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { required: ['success', 'message'], }, }, - workflow: { + ['workflow']: { parameters: { properties: { prompt: { @@ -3957,7 +3975,7 @@ export const TOOL_RUNTIME_SCHEMAS: Record = { }, resultSchema: undefined, }, - workspace_file: { + ['workspace_file']: { parameters: { type: 'object', properties: { diff --git a/apps/sim/lib/copilot/tools/server/table/user-table.test.ts b/apps/sim/lib/copilot/tools/server/table/user-table.test.ts index 7d90e9c00a..14ca72a195 100644 --- a/apps/sim/lib/copilot/tools/server/table/user-table.test.ts +++ b/apps/sim/lib/copilot/tools/server/table/user-table.test.ts @@ -15,6 +15,12 @@ const { mockCreateTable, mockDeleteTable, mockGetWorkspaceTableLimits, + mockMarkTableJobRunning, + mockReleaseJobClaim, + mockQueryRows, + mockDeleteRowsByFilter, + mockRunTableImport, + mockRunTableDelete, fakeEnrichment, } = vi.hoisted(() => ({ mockResolveWorkspaceFileReference: vi.fn(), @@ -26,6 +32,12 @@ const { mockCreateTable: vi.fn(), mockDeleteTable: vi.fn(), mockGetWorkspaceTableLimits: vi.fn(), + mockMarkTableJobRunning: vi.fn(), + mockReleaseJobClaim: vi.fn(), + mockQueryRows: vi.fn(), + mockDeleteRowsByFilter: vi.fn(), + mockRunTableImport: vi.fn(), + mockRunTableDelete: vi.fn(), fakeEnrichment: { id: 'work-email', name: 'Work Email', @@ -83,16 +95,30 @@ vi.mock('@/lib/table/rows/service', () => ({ batchInsertRows: mockBatchInsertRows, batchUpdateRows: vi.fn(), deleteRow: vi.fn(), - deleteRowsByFilter: vi.fn(), + deleteRowsByFilter: mockDeleteRowsByFilter, deleteRowsByIds: vi.fn(), getRowById: vi.fn(), insertRow: vi.fn(), - queryRows: vi.fn(), + queryRows: mockQueryRows, replaceTableRows: mockReplaceTableRows, updateRow: vi.fn(), updateRowsByFilter: vi.fn(), })) +vi.mock('@/lib/table/jobs/service', () => ({ + markTableJobRunning: mockMarkTableJobRunning, + releaseJobClaim: mockReleaseJobClaim, +})) + +vi.mock('@/lib/table/import-runner', () => ({ + runTableImport: mockRunTableImport, +})) + +vi.mock('@/lib/table/delete-runner', () => ({ + markTableDeleteFailed: vi.fn(), + runTableDelete: mockRunTableDelete, +})) + vi.mock('@/lib/table/billing', () => ({ getWorkspaceTableLimits: mockGetWorkspaceTableLimits, })) @@ -122,15 +148,25 @@ function buildTable(overrides: Partial = {}): TableDefinition { } } +/** Lets a runDetached microtask chain run before asserting on the work it dispatched. */ +async function flushDetached(): Promise { + await Promise.resolve() + await Promise.resolve() +} + describe('userTableServerTool.import_file', () => { beforeEach(() => { vi.clearAllMocks() mockResolveWorkspaceFileReference.mockResolvedValue({ name: 'people.csv', type: 'text/csv', + key: 'workspace/workspace-1/people.csv', + size: 100, }) mockDownloadWorkspaceFile.mockResolvedValue(Buffer.from('name,age\nAlice,30\nBob,40')) mockGetTableById.mockResolvedValue(buildTable()) + mockMarkTableJobRunning.mockResolvedValue(true) + mockReleaseJobClaim.mockResolvedValue(undefined) mockBatchInsertRows.mockImplementation(async (data: { rows: unknown[] }) => data.rows.map((_, i) => ({ id: `row_${i}` })) ) @@ -253,12 +289,95 @@ describe('userTableServerTool.import_file', () => { expect(result.message).toMatch(/missing required columns/i) expect(mockBatchInsertRows).not.toHaveBeenCalled() }) + + it('claims and releases the table job slot around an inline import', async () => { + const result = await userTableServerTool.execute( + { operation: 'import_file', args: { tableId: 'tbl_1', fileId: 'file-1' } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(true) + expect(mockMarkTableJobRunning).toHaveBeenCalledWith('tbl_1', expect.any(String), 'import') + expect(mockReleaseJobClaim).toHaveBeenCalledWith( + 'tbl_1', + mockMarkTableJobRunning.mock.calls[0][1] + ) + }) + + it('rejects an inline import while another job holds the table slot', async () => { + mockMarkTableJobRunning.mockResolvedValueOnce(false) + const result = await userTableServerTool.execute( + { operation: 'import_file', args: { tableId: 'tbl_1', fileId: 'file-1' } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(false) + expect(result.message).toMatch(/job is already in progress/i) + expect(mockBatchInsertRows).not.toHaveBeenCalled() + expect(mockReleaseJobClaim).not.toHaveBeenCalled() + }) + + it('dispatches a background import for large CSV files', async () => { + mockResolveWorkspaceFileReference.mockResolvedValueOnce({ + name: 'big.csv', + type: 'text/csv', + key: 'workspace/workspace-1/big.csv', + size: 9 * 1024 * 1024, + }) + + const result = await userTableServerTool.execute( + { operation: 'import_file', args: { tableId: 'tbl_1', fileId: 'file-1', mode: 'replace' } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + await flushDetached() + + expect(result.success).toBe(true) + expect(result.data?.jobId).toBeDefined() + expect(result.message).toMatch(/background/i) + expect(mockMarkTableJobRunning).toHaveBeenCalledWith('tbl_1', expect.any(String), 'import') + expect(mockBatchInsertRows).not.toHaveBeenCalled() + expect(mockReplaceTableRows).not.toHaveBeenCalled() + expect(mockDownloadWorkspaceFile).not.toHaveBeenCalled() + expect(mockRunTableImport).toHaveBeenCalledTimes(1) + expect(mockRunTableImport.mock.calls[0][0]).toMatchObject({ + tableId: 'tbl_1', + workspaceId: 'workspace-1', + fileKey: 'workspace/workspace-1/big.csv', + mode: 'replace', + deleteSourceFile: false, + }) + }) + + it('rejects a background import while another job holds the table slot', async () => { + mockResolveWorkspaceFileReference.mockResolvedValueOnce({ + name: 'big.csv', + type: 'text/csv', + key: 'workspace/workspace-1/big.csv', + size: 9 * 1024 * 1024, + }) + mockMarkTableJobRunning.mockResolvedValueOnce(false) + + const result = await userTableServerTool.execute( + { operation: 'import_file', args: { tableId: 'tbl_1', fileId: 'file-1' } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + await flushDetached() + + expect(result.success).toBe(false) + expect(result.message).toMatch(/job is already in progress/i) + expect(mockRunTableImport).not.toHaveBeenCalled() + }) }) describe('userTableServerTool.create_from_file', () => { beforeEach(() => { vi.clearAllMocks() - mockResolveWorkspaceFileReference.mockResolvedValue({ name: 'people.csv', type: 'text/csv' }) + mockResolveWorkspaceFileReference.mockResolvedValue({ + name: 'people.csv', + type: 'text/csv', + key: 'workspace/workspace-1/people.csv', + size: 100, + }) mockDownloadWorkspaceFile.mockResolvedValue(Buffer.from('name,age\nAlice,30\nBob,40')) mockGetWorkspaceTableLimits.mockResolvedValue({ maxRowsPerTable: 1000, maxTables: 3 }) mockCreateTable.mockResolvedValue(buildTable({ id: 'tbl_new', name: 'people' })) @@ -313,6 +432,40 @@ describe('userTableServerTool.create_from_file', () => { expect(result.message).toMatch(/rolled back/i) expect(result.message).toMatch(/must be unique/i) }) + + it('creates a placeholder table and dispatches a background import for large CSV files', async () => { + mockResolveWorkspaceFileReference.mockResolvedValueOnce({ + name: 'big.csv', + type: 'text/csv', + key: 'workspace/workspace-1/big.csv', + size: 9 * 1024 * 1024, + }) + + const result = await userTableServerTool.execute( + { operation: 'create_from_file', args: { fileId: 'file-1' } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + await flushDetached() + + expect(result.success).toBe(true) + expect(result.data?.tableId).toBe('tbl_new') + expect(result.data?.jobId).toBeDefined() + expect(mockDownloadWorkspaceFile).not.toHaveBeenCalled() + expect(mockBatchInsertRows).not.toHaveBeenCalled() + const createArgs = mockCreateTable.mock.calls[0][0] as Record + expect(createArgs).toMatchObject({ + jobStatus: 'running', + jobType: 'import', + jobId: result.data?.jobId, + }) + expect(mockRunTableImport).toHaveBeenCalledTimes(1) + expect(mockRunTableImport.mock.calls[0][0]).toMatchObject({ + tableId: 'tbl_new', + mode: 'create', + fileKey: 'workspace/workspace-1/big.csv', + deleteSourceFile: false, + }) + }) }) describe('userTableServerTool.create', () => { @@ -506,3 +659,193 @@ describe('userTableServerTool.add_enrichment', () => { expect(mockAddWorkflowGroup).not.toHaveBeenCalled() }) }) + +describe('userTableServerTool.query_rows', () => { + const queryRow = (i: number) => ({ + id: `row_${i}`, + data: { name: `r${i}` }, + executions: {}, + position: i, + orderKey: `a${i}`, + createdAt: new Date('2024-01-01'), + updatedAt: new Date('2024-01-01'), + }) + + beforeEach(() => { + vi.clearAllMocks() + mockGetTableById.mockResolvedValue(buildTable()) + mockQueryRows.mockResolvedValue({ + rows: [queryRow(1), queryRow(2)], + rowCount: 2, + totalCount: 10, + limit: 2, + offset: 0, + }) + }) + + it('rejects limits above MAX_QUERY_LIMIT', async () => { + const result = await userTableServerTool.execute( + { operation: 'query_rows', args: { tableId: 'tbl_1', limit: 100000 } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(false) + expect(result.message).toBe('Limit cannot exceed 1000') + expect(mockQueryRows).not.toHaveBeenCalled() + }) + + it('queries without execution metadata and returns nextCursor on a full page', async () => { + const result = await userTableServerTool.execute( + { operation: 'query_rows', args: { tableId: 'tbl_1', limit: 2 } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(true) + const options = mockQueryRows.mock.calls[0][1] as Record + expect(options.withExecutions).toBe(false) + expect(result.data?.nextCursor).toEqual({ orderKey: 'a2', id: 'row_2' }) + }) + + it('omits nextCursor when the page is not full', async () => { + mockQueryRows.mockResolvedValueOnce({ + rows: [queryRow(1)], + rowCount: 1, + totalCount: 1, + limit: 100, + offset: 0, + }) + const result = await userTableServerTool.execute( + { operation: 'query_rows', args: { tableId: 'tbl_1' } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(true) + expect(result.data?.nextCursor).toBeUndefined() + }) + + it('passes the after cursor through and rejects after combined with sort', async () => { + const after = { orderKey: 'a2', id: 'row_2' } + const ok = await userTableServerTool.execute( + { operation: 'query_rows', args: { tableId: 'tbl_1', after } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + expect(ok.success).toBe(true) + expect((mockQueryRows.mock.calls[0][1] as Record).after).toEqual(after) + + const bad = await userTableServerTool.execute( + { operation: 'query_rows', args: { tableId: 'tbl_1', after, sort: { name: 'asc' } } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + expect(bad.success).toBe(false) + expect(bad.message).toMatch(/cannot be combined with sort/) + }) +}) + +describe('userTableServerTool.delete_rows_by_filter', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetTableById.mockResolvedValue(buildTable({ rowCount: 50000, maxRows: 100000 })) + mockMarkTableJobRunning.mockResolvedValue(true) + mockDeleteRowsByFilter.mockResolvedValue({ affectedCount: 5, affectedRowIds: ['r1'] }) + mockQueryRows.mockResolvedValue({ + rows: [], + rowCount: 0, + totalCount: 5, + limit: 1, + offset: 0, + }) + }) + + it('rejects limits above MAX_BULK_OPERATION_SIZE', async () => { + const result = await userTableServerTool.execute( + { + operation: 'delete_rows_by_filter', + args: { tableId: 'tbl_1', filter: { name: 'x' }, limit: 5000 }, + }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(false) + expect(result.message).toBe('Limit cannot exceed 1000') + expect(mockDeleteRowsByFilter).not.toHaveBeenCalled() + }) + + it('deletes inline when the unbounded match count is within the cap', async () => { + const result = await userTableServerTool.execute( + { operation: 'delete_rows_by_filter', args: { tableId: 'tbl_1', filter: { name: 'x' } } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(true) + expect(result.data?.affectedCount).toBe(5) + expect(mockDeleteRowsByFilter).toHaveBeenCalledTimes(1) + expect(mockMarkTableJobRunning).not.toHaveBeenCalled() + }) + + it('dispatches a background delete when the unbounded match count exceeds the cap', async () => { + mockQueryRows.mockResolvedValueOnce({ + rows: [], + rowCount: 0, + totalCount: 20000, + limit: 1, + offset: 0, + }) + + const result = await userTableServerTool.execute( + { operation: 'delete_rows_by_filter', args: { tableId: 'tbl_1', filter: { name: 'x' } } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + await flushDetached() + + expect(result.success).toBe(true) + expect(result.data?.jobId).toBeDefined() + expect(result.data?.doomedCount).toBe(20000) + expect(mockDeleteRowsByFilter).not.toHaveBeenCalled() + const [tableId, jobId, type, payload] = mockMarkTableJobRunning.mock.calls[0] + expect(tableId).toBe('tbl_1') + expect(type).toBe('delete') + expect(payload).toMatchObject({ doomedCount: 20000, cutoff: expect.any(String) }) + expect(mockRunTableDelete).toHaveBeenCalledTimes(1) + expect(mockRunTableDelete.mock.calls[0][0]).toMatchObject({ + jobId, + tableId: 'tbl_1', + workspaceId: 'workspace-1', + cutoff: expect.any(Date), + }) + }) + + it('rejects a background delete while another job holds the table slot', async () => { + mockQueryRows.mockResolvedValueOnce({ + rows: [], + rowCount: 0, + totalCount: 20000, + limit: 1, + offset: 0, + }) + mockMarkTableJobRunning.mockResolvedValueOnce(false) + + const result = await userTableServerTool.execute( + { operation: 'delete_rows_by_filter', args: { tableId: 'tbl_1', filter: { name: 'x' } } }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(false) + expect(result.message).toMatch(/job is already in progress/i) + expect(mockDeleteRowsByFilter).not.toHaveBeenCalled() + expect(mockRunTableDelete).not.toHaveBeenCalled() + }) + + it('deletes inline with an explicit limit without counting first', async () => { + const result = await userTableServerTool.execute( + { + operation: 'delete_rows_by_filter', + args: { tableId: 'tbl_1', filter: { name: 'x' }, limit: 100 }, + }, + { userId: 'user-1', workspaceId: 'workspace-1' } + ) + + expect(result.success).toBe(true) + expect(mockQueryRows).not.toHaveBeenCalled() + expect(mockDeleteRowsByFilter).toHaveBeenCalledTimes(1) + }) +}) diff --git a/apps/sim/lib/copilot/tools/server/table/user-table.ts b/apps/sim/lib/copilot/tools/server/table/user-table.ts index 7614dc63ae..e0629d2a28 100644 --- a/apps/sim/lib/copilot/tools/server/table/user-table.ts +++ b/apps/sim/lib/copilot/tools/server/table/user-table.ts @@ -7,9 +7,12 @@ import { type BaseServerTool, type ServerToolContext, } from '@/lib/copilot/tools/server/base-tool' +import { isTriggerDevEnabled } from '@/lib/core/config/env-flags' +import { runDetached } from '@/lib/core/utils/background' import { buildAutoMapping, COLUMN_TYPES, + CSV_ASYNC_IMPORT_THRESHOLD_BYTES, CSV_MAX_BATCH_SIZE, type CsvHeaderMapping, CsvImportValidationError, @@ -17,6 +20,8 @@ import { getWorkspaceTableLimits, inferSchemaFromCsv, parseFileRows, + sanitizeName, + TABLE_LIMITS, validateMapping, } from '@/lib/table' import { @@ -36,6 +41,9 @@ import { updateColumnConstraints, updateColumnType, } from '@/lib/table/columns/service' +import { markTableDeleteFailed, runTableDelete } from '@/lib/table/delete-runner' +import { runTableImport, type TableImportPayload } from '@/lib/table/import-runner' +import { markTableJobRunning, releaseJobClaim } from '@/lib/table/jobs/service' import { batchInsertRows, batchUpdateRows, @@ -52,8 +60,11 @@ import { import { createTable, deleteTable, getTableById, renameTable } from '@/lib/table/service' import type { ColumnDefinition, + Filter, RowData, TableDefinition, + TableDeleteJobPayload, + TableRowsCursor, WorkflowGroup, WorkflowGroupDependencies, WorkflowGroupDeploymentMode, @@ -93,18 +104,87 @@ type UserTableResult = { const MAX_BATCH_SIZE = CSV_MAX_BATCH_SIZE -async function resolveWorkspaceFile( - fileReference: string, - workspaceId: string -): Promise<{ buffer: Buffer; name: string; type: string }> { +async function resolveWorkspaceFileRecordOrThrow(fileReference: string, workspaceId: string) { const record = await resolveWorkspaceFileReference(workspaceId, fileReference) if (!record) { throw new Error( `File not found: "${fileReference}". Use glob("files/**") and read the canonical file path metadata to find workspace files.` ) } - const buffer = await fetchWorkspaceFileBuffer(record) - return { buffer, name: record.name, type: record.type } + return record +} + +/** + * Whether a workspace file should import as a background job instead of inline: + * CSV/TSV at or above the same byte threshold the UI uses. Other formats + * (xlsx/json) aren't supported by the streaming import worker and stay inline. + */ +function shouldImportInBackground(record: { name: string; size: number }): boolean { + const ext = record.name.split('.').pop()?.toLowerCase() + return (ext === 'csv' || ext === 'tsv') && record.size >= CSV_ASYNC_IMPORT_THRESHOLD_BYTES +} + +/** + * Dispatches a background import for an already-claimed job slot, mirroring the + * import-async routes: trigger.dev when enabled (survives deploys, retries), + * detached in-process worker otherwise. A failed dispatch releases the claim so + * a ghost `running` job can't hold the table's one-write-job slot. + */ +async function dispatchImportJob(payload: TableImportPayload): Promise { + if (isTriggerDevEnabled) { + try { + const [{ tableImportTask }, { tasks }] = await Promise.all([ + import('@/background/table-import'), + import('@trigger.dev/sdk'), + ]) + await tasks.trigger('table-import', payload, { + tags: [`tableId:${payload.tableId}`, `jobId:${payload.importId}`], + }) + } catch (error) { + await releaseJobClaim(payload.tableId, payload.importId).catch(() => {}) + throw error + } + } else { + runDetached('table-import', () => runTableImport(payload)) + } +} + +/** + * Dispatches a background filter-delete for an already-claimed job slot, + * mirroring the delete-async route. Same release-on-failed-dispatch guard as + * {@link dispatchImportJob}. + */ +async function dispatchDeleteJob(params: { + jobId: string + tableId: string + workspaceId: string + filter: Filter + cutoff: Date +}): Promise { + const { jobId, tableId, workspaceId, filter, cutoff } = params + if (isTriggerDevEnabled) { + try { + const [{ tableDeleteTask }, { tasks }] = await Promise.all([ + import('@/background/table-delete'), + import('@trigger.dev/sdk'), + ]) + await tasks.trigger( + 'table-delete', + { jobId, tableId, workspaceId, filter, cutoff: cutoff.toISOString() }, + { tags: [`tableId:${tableId}`, `jobId:${jobId}`] } + ) + } catch (error) { + await releaseJobClaim(tableId, jobId).catch(() => {}) + throw error + } + } else { + runDetached('table-delete', () => + runTableDelete({ jobId, tableId, workspaceId, filter, cutoff }).catch(async (error) => { + await markTableDeleteFailed(tableId, jobId, error) + throw error + }) + ) + } } /** @@ -159,6 +239,21 @@ function parseDeploymentMode(value: unknown): WorkflowGroupDeploymentMode | unde return value === 'live' || value === 'deployed' ? value : undefined } +/** + * Validates an optional row limit against the same bounds the HTTP contracts + * enforce. Returns an error message, or `null` when the limit is acceptable. + */ +function limitError(limit: unknown, max: number): string | null { + if (limit === undefined) return null + if (typeof limit !== 'number' || !Number.isInteger(limit) || limit < 1) { + return 'Limit must be an integer of at least 1' + } + if (limit > max) { + return `Limit cannot exceed ${max}` + } + return null +} + async function batchInsertAll( tableId: string, rows: RowData[], @@ -444,6 +539,26 @@ export const userTableServerTool: BaseServerTool return { success: false, message: 'Workspace ID is required' } } + const queryLimitError = limitError(args.limit, TABLE_LIMITS.MAX_QUERY_LIMIT) + if (queryLimitError) { + return { success: false, message: queryLimitError } + } + const after = args.after as TableRowsCursor | undefined + if (after && (typeof after.orderKey !== 'string' || typeof after.id !== 'string')) { + return { + success: false, + message: + 'after must be the nextCursor object ({ orderKey, id }) returned by a previous query_rows page', + } + } + if (after && args.sort) { + return { + success: false, + message: + 'after cursor cannot be combined with sort — cursors paginate the default order', + } + } + const table = await getTableById(args.tableId) if (!table || table.workspaceId !== workspaceId) { return { success: false, message: `Table not found: ${args.tableId}` } @@ -459,16 +574,28 @@ export const userTableServerTool: BaseServerTool sort: args.sort ? sortNamesToIds(args.sort, idByName) : undefined, limit: args.limit, offset: args.offset, + after, + withExecutions: false, }, requestId ) + // Keyset continuation for the default order: hand back the last row's + // cursor when the page filled, so the next call seeks past it instead + // of paying OFFSET's scan-and-discard. + const lastRow = result.rows[result.rows.length - 1] + const nextCursor = + !args.sort && result.rows.length === result.limit && lastRow?.orderKey + ? { orderKey: lastRow.orderKey, id: lastRow.id } + : undefined + return { success: true, message: `Returned ${result.rows.length} of ${result.totalCount} rows`, data: { ...result, rows: result.rows.map((r) => ({ ...r, data: rowDataIdToName(r.data, nameById) })), + ...(nextCursor ? { nextCursor } : {}), }, } } @@ -559,6 +686,10 @@ export const userTableServerTool: BaseServerTool if (!workspaceId) { return { success: false, message: 'Workspace ID is required' } } + const updateLimitError = limitError(args.limit, TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) + if (updateLimitError) { + return { success: false, message: updateLimitError } + } const table = await getTableById(args.tableId) if (!table || table.workspaceId !== workspaceId) { @@ -596,6 +727,10 @@ export const userTableServerTool: BaseServerTool if (!workspaceId) { return { success: false, message: 'Workspace ID is required' } } + const deleteLimitError = limitError(args.limit, TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) + if (deleteLimitError) { + return { success: false, message: deleteLimitError } + } const table = await getTableById(args.tableId) if (!table || table.workspaceId !== workspaceId) { @@ -603,12 +738,54 @@ export const userTableServerTool: BaseServerTool } const requestId = generateId().slice(0, 8) - assertNotAborted() const idByName = buildIdByName(table.schema) + const idFilter = filterNamesToIds(args.filter, idByName) + + // Unbounded "delete everything matching": measure the blast radius + // first, and hand anything past the inline cap to the background + // delete worker (same path as the UI's select-all delete) instead of + // loading every matching row id into this request. + if (args.limit === undefined) { + const { totalCount } = await queryRows( + table, + { filter: idFilter, limit: 1, withExecutions: false }, + requestId + ) + const matchCount = totalCount ?? 0 + if (matchCount > TABLE_LIMITS.MAX_BULK_OPERATION_SIZE) { + const doomedCount = Math.min(matchCount, table.rowCount) + const cutoff = new Date() + const jobId = generateId() + const payload: TableDeleteJobPayload = { + filter: idFilter, + cutoff: cutoff.toISOString(), + doomedCount, + } + assertNotAborted() + const claimed = await markTableJobRunning(table.id, jobId, 'delete', payload) + if (!claimed) { + return { success: false, message: 'A job is already in progress for this table' } + } + await dispatchDeleteJob({ + jobId, + tableId: table.id, + workspaceId, + filter: idFilter, + cutoff, + }) + return { + success: true, + message: `Started background delete of ${doomedCount} matching rows (job ${jobId}). The rows are hidden from reads immediately — query_rows already reflects the post-delete view.`, + data: { jobId, doomedCount }, + } + } + } + + assertNotAborted() const result = await deleteRowsByFilter( table, { - filter: filterNamesToIds(args.filter, idByName), + filter: idFilter, limit: args.limit, }, requestId @@ -741,7 +918,71 @@ export const userTableServerTool: BaseServerTool return { success: false, message: 'Workspace ID is required' } } - const file = await resolveWorkspaceFile(fileReference, workspaceId) + const record = await resolveWorkspaceFileRecordOrThrow(fileReference, workspaceId) + + // Large CSV/TSV: create a placeholder table whose creation claims the + // job slot, then let the streaming import worker infer the schema and + // populate rows in the background (mirrors POST /api/table/import-async). + if (shouldImportInBackground(record)) { + const planLimits = await getWorkspaceTableLimits(workspaceId) + const tableName = + args.name || + sanitizeName(record.name.replace(/\.[^.]+$/, ''), 'imported_table').slice( + 0, + TABLE_LIMITS.MAX_TABLE_NAME_LENGTH + ) + const requestId = generateId().slice(0, 8) + const importId = generateId() + assertNotAborted() + const table = await createTable( + { + name: tableName, + description: args.description || `Imported from ${record.name}`, + schema: { columns: [{ name: 'column_1', type: 'string' }] }, + workspaceId, + userId: context.userId, + maxRows: planLimits.maxRowsPerTable, + maxTables: planLimits.maxTables, + jobStatus: 'running', + jobType: 'import', + jobId: importId, + }, + requestId + ) + try { + await dispatchImportJob({ + importId, + tableId: table.id, + workspaceId, + userId: context.userId, + fileKey: record.key, + fileName: record.name, + delimiter: record.name.toLowerCase().endsWith('.tsv') ? '\t' : ',', + mode: 'create', + deleteSourceFile: false, + }) + } catch (dispatchError) { + // The user never saw the placeholder — archive it back out. + await deleteTable(table.id, generateId().slice(0, 8)).catch(() => {}) + throw dispatchError + } + return { + success: true, + message: `Created table "${table.name}" (${table.id}); importing rows from "${record.name}" in the background (job ${importId}). Columns and rows appear as the import progresses — query_rows to check what has landed.`, + data: { + tableId: table.id, + tableName: table.name, + jobId: importId, + sourceFile: record.name, + }, + } + } + + const file = { + buffer: await fetchWorkspaceFileBuffer(record), + name: record.name, + type: record.type, + } const { headers, rows } = await parseFileRows(file.buffer, file.name, file.type) if (rows.length === 0) { return { success: false, message: 'File contains no data rows' } @@ -866,7 +1107,42 @@ export const userTableServerTool: BaseServerTool return { success: false, message: `Table is archived: ${tableId}` } } - const file = await resolveWorkspaceFile(fileReference, workspaceId) + const record = await resolveWorkspaceFileRecordOrThrow(fileReference, workspaceId) + + // Large CSV/TSV: claim the table's one-write-job slot and hand the + // file to the streaming import worker (mirrors + // POST /api/table/[tableId]/import-async). + if (shouldImportInBackground(record)) { + const importId = generateId() + assertNotAborted() + const claimed = await markTableJobRunning(table.id, importId, 'import') + if (!claimed) { + return { success: false, message: 'A job is already in progress for this table' } + } + await dispatchImportJob({ + importId, + tableId: table.id, + workspaceId, + userId: context.userId, + fileKey: record.key, + fileName: record.name, + delimiter: record.name.toLowerCase().endsWith('.tsv') ? '\t' : ',', + mode, + mapping: rawMapping, + deleteSourceFile: false, + }) + return { + success: true, + message: `Started background ${mode} import of "${record.name}" into "${table.name}" (job ${importId}). Rows appear as the import progresses — query_rows to check what has landed.`, + data: { tableId: table.id, jobId: importId, mode }, + } + } + + const file = { + buffer: await fetchWorkspaceFileBuffer(record), + name: record.name, + type: record.type, + } const { headers, rows } = await parseFileRows(file.buffer, file.name, file.type) if (rows.length === 0) { return { success: false, message: 'File contains no data rows' } @@ -897,64 +1173,75 @@ export const userTableServerTool: BaseServerTool const coerced = coerceRowsForTable(rows, table.schema, validation.effectiveMap) - if (mode === 'replace') { - assertNotAborted() - const requestId = generateId().slice(0, 8) - const result = await replaceTableRows( - { tableId: table.id, rows: coerced, workspaceId, userId: context.userId }, - table, - requestId - ) + // Inline imports still claim the table's one-write-job slot so they + // can't interleave with a running background import/delete. + const inlineImportId = generateId() + assertNotAborted() + const inlineClaimed = await markTableJobRunning(table.id, inlineImportId, 'import') + if (!inlineClaimed) { + return { success: false, message: 'A job is already in progress for this table' } + } + try { + if (mode === 'replace') { + const requestId = generateId().slice(0, 8) + const result = await replaceTableRows( + { tableId: table.id, rows: coerced, workspaceId, userId: context.userId }, + table, + requestId + ) + + logger.info('Rows replaced from file', { + tableId: table.id, + fileName: file.name, + mode, + matchedColumns: validation.mappedHeaders.length, + deleted: result.deletedCount, + inserted: result.insertedCount, + userId: context.userId, + }) + + return { + success: true, + message: `Replaced rows in "${table.name}" from "${file.name}": deleted ${result.deletedCount}, inserted ${result.insertedCount}`, + data: { + tableId: table.id, + tableName: table.name, + mode, + matchedColumns: validation.mappedHeaders, + skippedColumns: validation.skippedHeaders, + deletedCount: result.deletedCount, + insertedCount: result.insertedCount, + sourceFile: file.name, + }, + } + } + + const inserted = await batchInsertAll(table.id, coerced, table, workspaceId, context) - logger.info('Rows replaced from file', { + logger.info('Rows imported from file', { tableId: table.id, fileName: file.name, mode, matchedColumns: validation.mappedHeaders.length, - deleted: result.deletedCount, - inserted: result.insertedCount, + rows: inserted, userId: context.userId, }) return { success: true, - message: `Replaced rows in "${table.name}" from "${file.name}": deleted ${result.deletedCount}, inserted ${result.insertedCount}`, + message: `Imported ${inserted} rows into "${table.name}" from "${file.name}" (${validation.mappedHeaders.length} columns matched)`, data: { tableId: table.id, tableName: table.name, mode, matchedColumns: validation.mappedHeaders, skippedColumns: validation.skippedHeaders, - deletedCount: result.deletedCount, - insertedCount: result.insertedCount, + rowCount: inserted, sourceFile: file.name, }, } - } - - const inserted = await batchInsertAll(table.id, coerced, table, workspaceId, context) - - logger.info('Rows imported from file', { - tableId: table.id, - fileName: file.name, - mode, - matchedColumns: validation.mappedHeaders.length, - rows: inserted, - userId: context.userId, - }) - - return { - success: true, - message: `Imported ${inserted} rows into "${table.name}" from "${file.name}" (${validation.mappedHeaders.length} columns matched)`, - data: { - tableId: table.id, - tableName: table.name, - mode, - matchedColumns: validation.mappedHeaders, - skippedColumns: validation.skippedHeaders, - rowCount: inserted, - sourceFile: file.name, - }, + } finally { + await releaseJobClaim(table.id, inlineImportId).catch(() => {}) } } diff --git a/apps/sim/lib/table/import-runner.test.ts b/apps/sim/lib/table/import-runner.test.ts new file mode 100644 index 0000000000..29b076a180 --- /dev/null +++ b/apps/sim/lib/table/import-runner.test.ts @@ -0,0 +1,117 @@ +/** + * @vitest-environment node + */ +import { Readable } from 'node:stream' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { + mockGetTableById, + mockBulkInsertImportBatch, + mockUpdateJobProgress, + mockMarkJobReady, + mockMarkJobFailed, + mockNextImportStartPosition, + mockNextImportStartOrderKey, + mockAppendTableEvent, + mockDeleteFile, + mockDownloadFileStream, + mockHeadObject, +} = vi.hoisted(() => ({ + mockGetTableById: vi.fn(), + mockBulkInsertImportBatch: vi.fn(), + mockUpdateJobProgress: vi.fn(), + mockMarkJobReady: vi.fn(), + mockMarkJobFailed: vi.fn(), + mockNextImportStartPosition: vi.fn(), + mockNextImportStartOrderKey: vi.fn(), + mockAppendTableEvent: vi.fn(), + mockDeleteFile: vi.fn(), + mockDownloadFileStream: vi.fn(), + mockHeadObject: vi.fn(), +})) + +vi.mock('@/lib/table/service', () => ({ + getTableById: mockGetTableById, +})) +vi.mock('@/lib/table/import-data', () => ({ + addImportColumns: vi.fn(), + bulkInsertImportBatch: mockBulkInsertImportBatch, + deleteAllTableRows: vi.fn(), + setTableSchemaForImport: vi.fn(), +})) +vi.mock('@/lib/table/jobs/service', () => ({ + markJobFailed: mockMarkJobFailed, + markJobReady: mockMarkJobReady, + updateJobProgress: mockUpdateJobProgress, +})) +vi.mock('@/lib/table/rows/ordering', () => ({ + nextImportStartOrderKey: mockNextImportStartOrderKey, + nextImportStartPosition: mockNextImportStartPosition, +})) +vi.mock('@/lib/table/events', () => ({ appendTableEvent: mockAppendTableEvent })) +vi.mock('@/lib/posthog/server', () => ({ captureServerEvent: vi.fn() })) +vi.mock('@/lib/uploads/core/storage-service', () => ({ + deleteFile: mockDeleteFile, + downloadFileStream: mockDownloadFileStream, + headObject: mockHeadObject, +})) +vi.mock('@/app/api/table/utils', () => ({ + normalizeColumn: (col: unknown) => col, +})) + +import { runTableImport, type TableImportPayload } from '@/lib/table/import-runner' + +const table = { + id: 'tbl_1', + name: 'People', + workspaceId: 'ws_1', + rowCount: 0, + maxRows: 1000, + schema: { columns: [{ id: 'col_name', name: 'name', type: 'string' }] }, +} + +function buildPayload(overrides: Partial = {}): TableImportPayload { + return { + importId: 'job_1', + tableId: 'tbl_1', + workspaceId: 'ws_1', + userId: 'user_1', + fileKey: 'workspace/ws_1/people.csv', + fileName: 'people.csv', + delimiter: ',', + mode: 'append', + ...overrides, + } +} + +describe('runTableImport source-file cleanup', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetTableById.mockResolvedValue(table) + mockHeadObject.mockResolvedValue({ size: 20 }) + mockDownloadFileStream.mockResolvedValue(Readable.from('name\nAlice\nBob\n')) + mockNextImportStartPosition.mockResolvedValue(0) + mockNextImportStartOrderKey.mockResolvedValue(null) + mockUpdateJobProgress.mockResolvedValue(true) + mockBulkInsertImportBatch.mockResolvedValue({ inserted: 2, lastOrderKey: 'a1' }) + mockMarkJobReady.mockResolvedValue(true) + mockDeleteFile.mockResolvedValue(undefined) + }) + + it('deletes the single-use source object by default', async () => { + await runTableImport(buildPayload()) + + expect(mockMarkJobReady).toHaveBeenCalled() + expect(mockDeleteFile).toHaveBeenCalledWith({ + key: 'workspace/ws_1/people.csv', + context: 'workspace', + }) + }) + + it('keeps a persistent workspace file when deleteSourceFile is false', async () => { + await runTableImport(buildPayload({ deleteSourceFile: false })) + + expect(mockMarkJobReady).toHaveBeenCalled() + expect(mockDeleteFile).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/lib/table/import-runner.ts b/apps/sim/lib/table/import-runner.ts index 5391d22a23..f9d2b3f503 100644 --- a/apps/sim/lib/table/import-runner.ts +++ b/apps/sim/lib/table/import-runner.ts @@ -60,6 +60,13 @@ export interface TableImportPayload { mapping?: CsvHeaderMapping /** (append/replace) CSV headers to auto-create as new columns (types inferred from the sample). */ createColumns?: string[] + /** + * Whether the source object is deleted once the import is terminal. Defaults + * to true (the UI routes upload a single-use temp object per import); pass + * false when importing a persistent workspace file (Mothership) that must + * survive the import. + */ + deleteSourceFile?: boolean } /** @@ -350,9 +357,12 @@ export async function runTableImport(payload: TableImportPayload): Promise // Release the storage stream so its HTTP connection doesn't leak on failure. source?.destroy() // The uploaded source file is single-use (a fresh upload per import) — delete it once the - // import is terminal so the workspace bucket doesn't accumulate. Best-effort. - await deleteFile({ key: fileKey, context: 'workspace' }).catch((err) => { - logger.warn(`[${requestId}] Failed to delete imported file`, { fileKey, err }) - }) + // import is terminal so the workspace bucket doesn't accumulate. Best-effort. Skipped for + // persistent workspace files (deleteSourceFile: false). + if (payload.deleteSourceFile !== false) { + await deleteFile({ key: fileKey, context: 'workspace' }).catch((err) => { + logger.warn(`[${requestId}] Failed to delete imported file`, { fileKey, err }) + }) + } } }