Skip to content

Commit f6fd6d4

Browse files
authored
Make task runner backwards compatible with conductor v4 (#119)
* Make task runner backwards compatible with conductor v4 * Update unit test * Add tests to check for update task v2 endpoint usage and fallback
1 parent b363160 commit f6fd6d4

5 files changed

Lines changed: 449 additions & 156 deletions

File tree

src/integration-tests/TaskRunner.test.ts

Lines changed: 68 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import {
99
} from "../sdk";
1010
import { cleanupWorkflowsAndTasks } from "./utils/cleanup";
1111
import { waitForWorkflowStatus } from "./utils/waitForWorkflowStatus";
12-
import { describeForOrkesV5 } from "./utils/customJestDescribe";
1312

1413
describe("TaskRunner", () => {
1514
const clientPromise = orkesConductorClient();
@@ -27,87 +26,85 @@ describe("TaskRunner", () => {
2726
);
2827
});
2928

30-
describeForOrkesV5("worker example (requires update-v2)", () => {
31-
test("worker example ", async () => {
32-
const client = await clientPromise;
33-
const executor = new WorkflowExecutor(client);
34-
const taskName = `jsSdkTest-task-manager-int-test-${Date.now()}`;
35-
const workflowName = `jsSdkTest-task-manager-int-test-wf-${Date.now()}`;
29+
test("worker example ", async () => {
30+
const client = await clientPromise;
31+
const executor = new WorkflowExecutor(client);
32+
const taskName = `jsSdkTest-task-manager-int-test-${Date.now()}`;
33+
const workflowName = `jsSdkTest-task-manager-int-test-wf-${Date.now()}`;
3634

37-
const taskRunner = new TaskRunner({
38-
client: client,
39-
worker: {
40-
taskDefName: taskName,
41-
execute: async () => {
42-
return {
43-
outputData: {
44-
hello: "From your worker",
45-
},
46-
status: "COMPLETED",
47-
};
48-
},
49-
},
50-
options: {
51-
pollInterval: 1000,
52-
domain: undefined,
53-
concurrency: 2,
54-
workerID: "",
35+
const taskRunner = new TaskRunner({
36+
client: client,
37+
worker: {
38+
taskDefName: taskName,
39+
execute: async () => {
40+
return {
41+
outputData: {
42+
hello: "From your worker",
43+
},
44+
status: "COMPLETED",
45+
};
5546
},
56-
});
57-
taskRunner.startPolling();
47+
},
48+
options: {
49+
pollInterval: 1000,
50+
domain: undefined,
51+
concurrency: 2,
52+
workerID: "",
53+
},
54+
});
55+
taskRunner.startPolling();
5856

59-
expect(taskRunner.isPolling).toEqual(true);
57+
expect(taskRunner.isPolling).toEqual(true);
6058

61-
await executor.registerWorkflow(true, {
59+
await executor.registerWorkflow(true, {
60+
name: workflowName,
61+
version: 1,
62+
ownerEmail: "developers@orkes.io",
63+
tasks: [simpleTask(taskName, taskName, {})],
64+
inputParameters: [],
65+
outputParameters: {},
66+
timeoutSeconds: 0,
67+
});
68+
workflowsToCleanup.push({ name: workflowName, version: 1 });
69+
70+
const { workflowId: executionId } = await executor.executeWorkflow(
71+
{
6272
name: workflowName,
6373
version: 1,
64-
ownerEmail: "developers@orkes.io",
65-
tasks: [simpleTask(taskName, taskName, {})],
66-
inputParameters: [],
67-
outputParameters: {},
68-
timeoutSeconds: 0,
69-
});
70-
workflowsToCleanup.push({ name: workflowName, version: 1 });
71-
72-
const { workflowId: executionId } = await executor.executeWorkflow(
73-
{
74-
name: workflowName,
75-
version: 1,
76-
},
77-
workflowName,
78-
1,
79-
`${workflowName}-id`
80-
);
81-
expect(executionId).toBeDefined();
74+
},
75+
workflowName,
76+
1,
77+
`${workflowName}-id`
78+
);
79+
expect(executionId).toBeDefined();
8280

83-
taskRunner.updateOptions({ concurrency: 1, pollInterval: 100 });
81+
taskRunner.updateOptions({ concurrency: 1, pollInterval: 100 });
8482

85-
expect(executionId).toBeDefined();
86-
if (!executionId) {
87-
throw new Error("Execution ID is undefined");
88-
}
83+
expect(executionId).toBeDefined();
84+
if (!executionId) {
85+
throw new Error("Execution ID is undefined");
86+
}
8987

90-
const workflowStatus = await waitForWorkflowStatus(
91-
executor,
92-
executionId,
93-
"COMPLETED"
94-
);
88+
const workflowStatus = await waitForWorkflowStatus(
89+
executor,
90+
executionId,
91+
"COMPLETED"
92+
);
9593

96-
const [firstTask] = workflowStatus.tasks || [];
97-
expect(firstTask?.taskType).toEqual(taskName);
98-
expect(workflowStatus.status).toEqual("COMPLETED");
94+
const [firstTask] = workflowStatus.tasks || [];
95+
expect(firstTask?.taskType).toEqual(taskName);
96+
expect(workflowStatus.status).toEqual("COMPLETED");
9997

100-
await taskRunner.stopPolling();
98+
await taskRunner.stopPolling();
10199

102-
expect(taskRunner.isPolling).toEqual(false);
103-
const taskDetails = await executor.getTask(firstTask?.taskId || "");
104-
expect(taskDetails?.status).toEqual("COMPLETED");
100+
expect(taskRunner.isPolling).toEqual(false);
101+
const taskDetails = await executor.getTask(firstTask?.taskId || "");
102+
expect(taskDetails?.status).toEqual("COMPLETED");
105103

106-
const metadataClient = new OrkesClients(client).getMetadataClient();
107-
await cleanupWorkflowsAndTasks(metadataClient, {
108-
workflows: [{ name: workflowName, version: 1 }],
109-
tasks: [taskName],
110-
});
111-
}, 120000);
112-
});
104+
const metadataClient = new OrkesClients(client).getMetadataClient();
105+
await cleanupWorkflowsAndTasks(metadataClient, {
106+
workflows: [{ name: workflowName, version: 1 }],
107+
tasks: [taskName],
108+
});
109+
}, 120000);
113110
});

src/integration-tests/WorkerRegistration.test.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
MetadataClient,
55
NonRetryableException,
66
TaskHandler,
7+
TaskRunner,
78
WorkflowExecutor,
89
clearWorkerRegistry,
910
getRegisteredWorkers,
@@ -668,4 +669,56 @@ describe("SDK Worker Registration", () => {
668669

669670
expect(getRegisteredWorkers().length).toBe(0);
670671
});
672+
673+
describeForOrkesV5("update-v2 endpoint verification", () => {
674+
test("SDK detects and uses /api/tasks/update-v2 on v5 server", async () => {
675+
const client = await clientPromise;
676+
const taskName = `sdk_test_update_v2_verify_${Date.now()}`;
677+
const workflowName = `sdk_test_update_v2_verify_wf_${Date.now()}`;
678+
679+
// Reset the static probe so this test measures a live response from the server,
680+
// regardless of what earlier tests in this shard may have set.
681+
(TaskRunner as unknown as { updateV2Available: boolean | null }).updateV2Available = null;
682+
683+
const taskRunner = new TaskRunner({
684+
client,
685+
worker: {
686+
taskDefName: taskName,
687+
execute: async () => ({ outputData: {}, status: "COMPLETED" }),
688+
},
689+
options: { pollInterval: 100, concurrency: 1, workerID: "" },
690+
});
691+
taskRunner.startPolling();
692+
693+
await executor.registerWorkflow(true, {
694+
name: workflowName,
695+
version: 1,
696+
ownerEmail: "developers@orkes.io",
697+
tasks: [simpleTask(taskName, taskName, {})],
698+
inputParameters: [],
699+
outputParameters: {},
700+
timeoutSeconds: 0,
701+
});
702+
workflowsToCleanup.push({ name: workflowName, version: 1 });
703+
704+
const executionId = await executor.startWorkflow({
705+
name: workflowName,
706+
input: {},
707+
version: 1,
708+
});
709+
710+
if (!executionId) throw new Error("Execution ID is undefined");
711+
712+
await waitForWorkflowStatus(executor, executionId, "COMPLETED");
713+
taskRunner.stopPolling();
714+
715+
// The workflow only reaches COMPLETED if a task update was accepted.
716+
// This assertion verifies the SDK used /api/tasks/update-v2 (not the v4
717+
// legacy /api/tasks fallback) — if update-v2 broke silently and the SDK
718+
// fell back, this would be false.
719+
expect(
720+
(TaskRunner as unknown as { updateV2Available: boolean | null }).updateV2Available
721+
).toBe(true);
722+
}, 60000);
723+
});
671724
});

src/integration-tests/readme.test.ts

Lines changed: 61 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import {
99
} from "../sdk";
1010
import { TaskType } from "../open-api";
1111
import { waitForWorkflowStatus } from "./utils/waitForWorkflowStatus";
12-
import { describeForOrkesV5 } from "./utils/customJestDescribe";
1312

1413
describe("TaskManager", () => {
1514
const clientPromise = orkesConductorClient();
@@ -27,71 +26,69 @@ describe("TaskManager", () => {
2726
);
2827
});
2928

30-
describeForOrkesV5("worker example (requires update-v2)", () => {
31-
test("worker example ", async () => {
32-
const client = await clientPromise;
33-
const executor = new WorkflowExecutor(client);
34-
const workflowName = `jsSdkTest-my_first_js_wf-${Date.now()}`;
35-
const taskName = `jsSdkTest-taskmanager-test-${Date.now()}`;
36-
37-
const taskRunner = new TaskRunner({
38-
client: client,
39-
worker: {
40-
taskDefName: taskName,
41-
execute: async () => {
42-
return {
43-
outputData: {
44-
hello: "From your worker",
45-
},
46-
status: "COMPLETED",
47-
};
48-
},
49-
},
50-
options: {
51-
pollInterval: 10,
52-
domain: undefined,
53-
concurrency: 1,
54-
workerID: "",
29+
test("worker example ", async () => {
30+
const client = await clientPromise;
31+
const executor = new WorkflowExecutor(client);
32+
const workflowName = `jsSdkTest-my_first_js_wf-${Date.now()}`;
33+
const taskName = `jsSdkTest-taskmanager-test-${Date.now()}`;
34+
35+
const taskRunner = new TaskRunner({
36+
client: client,
37+
worker: {
38+
taskDefName: taskName,
39+
execute: async () => {
40+
return {
41+
outputData: {
42+
hello: "From your worker",
43+
},
44+
status: "COMPLETED",
45+
};
5546
},
56-
});
57-
taskRunner.startPolling();
47+
},
48+
options: {
49+
pollInterval: 10,
50+
domain: undefined,
51+
concurrency: 1,
52+
workerID: "",
53+
},
54+
});
55+
taskRunner.startPolling();
5856

59-
await executor.registerWorkflow(true, {
60-
name: workflowName,
61-
version: 1,
62-
ownerEmail: "developers@orkes.io",
63-
tasks: [simpleTask(taskName, taskName, {})],
64-
inputParameters: [],
65-
outputParameters: {},
66-
timeoutSeconds: 0,
67-
});
68-
workflowsToCleanup.push({ name: workflowName, version: 1 });
69-
70-
const executionId = await executor.startWorkflow({
71-
name: workflowName,
72-
input: {},
73-
version: 1,
74-
});
75-
76-
if (!executionId) {
77-
throw new Error("Execution ID is undefined");
78-
}
79-
80-
const workflowStatus = await waitForWorkflowStatus(
81-
executor,
82-
executionId,
83-
"COMPLETED"
84-
);
85-
86-
const [firstTask] = workflowStatus.tasks || [];
87-
expect(firstTask?.taskType).toEqual(taskName);
88-
expect(workflowStatus.status).toEqual("COMPLETED");
89-
90-
taskRunner.stopPolling();
91-
const taskDetails = await executor.getTask(firstTask?.taskId || "");
92-
expect(taskDetails?.status).toEqual("COMPLETED");
93-
}, 120000);
94-
});
57+
await executor.registerWorkflow(true, {
58+
name: workflowName,
59+
version: 1,
60+
ownerEmail: "developers@orkes.io",
61+
tasks: [simpleTask(taskName, taskName, {})],
62+
inputParameters: [],
63+
outputParameters: {},
64+
timeoutSeconds: 0,
65+
});
66+
workflowsToCleanup.push({ name: workflowName, version: 1 });
67+
68+
const executionId = await executor.startWorkflow({
69+
name: workflowName,
70+
input: {},
71+
version: 1,
72+
});
73+
74+
if (!executionId) {
75+
throw new Error("Execution ID is undefined");
76+
}
77+
78+
const workflowStatus = await waitForWorkflowStatus(
79+
executor,
80+
executionId,
81+
"COMPLETED"
82+
);
83+
84+
const [firstTask] = workflowStatus.tasks || [];
85+
expect(firstTask?.taskType).toEqual(taskName);
86+
expect(workflowStatus.status).toEqual("COMPLETED");
87+
88+
taskRunner.stopPolling();
89+
const taskDetails = await executor.getTask(firstTask?.taskId || "");
90+
expect(taskDetails?.status).toEqual("COMPLETED");
91+
}, 120000);
9592

9693
test("update task example ", async () => {
9794
const client = await clientPromise;

0 commit comments

Comments
 (0)