@@ -58,6 +58,8 @@ export interface Job {
5858
5959const jobs = new Map < string , Job > ( ) ;
6060const JOB_TTL_MS = 60 * 60 * 1000 ;
61+ const STALE_JOB_MS = 15 * 60 * 1000 ;
62+ const MAX_JOB_LIFETIME_MS = 45 * 60 * 1000 ;
6163
6264// Dynamic concurrency control for video processing.
6365//
@@ -268,15 +270,44 @@ export function cleanupExpiredJobs(): number {
268270 let cleaned = 0 ;
269271
270272 for ( const [ jobId , job ] of jobs ) {
271- if ( now - job . updatedAt > JOB_TTL_MS ) {
273+ const age = now - job . createdAt ;
274+ const staleness = now - job . updatedAt ;
275+
276+ if ( staleness > JOB_TTL_MS ) {
272277 if ( isActivePhase ( job . phase ) ) {
273278 console . warn (
274- `[job-manager] Cleaning up stuck job ${ jobId } (phase=${ job . phase } , age=${ Math . round ( ( now - job . createdAt ) / 60000 ) } m)` ,
279+ `[job-manager] Cleaning up expired job ${ jobId } (phase=${ job . phase } , age=${ Math . round ( age / 60000 ) } m)` ,
275280 ) ;
276281 job . abortController ?. abort ( ) ;
277282 }
278283 deleteJob ( jobId ) ;
279284 cleaned ++ ;
285+ continue ;
286+ }
287+
288+ if ( isActivePhase ( job . phase ) && staleness > STALE_JOB_MS ) {
289+ console . warn (
290+ `[job-manager] Marking stale job ${ jobId } as error (phase=${ job . phase } , no update for ${ Math . round ( staleness / 60000 ) } m)` ,
291+ ) ;
292+ job . abortController ?. abort ( ) ;
293+ job . phase = "error" ;
294+ job . error = `Job stale: no progress update for ${ Math . round ( staleness / 60000 ) } minutes` ;
295+ job . message = "Processing failed (stale)" ;
296+ job . updatedAt = now ;
297+ cleaned ++ ;
298+ continue ;
299+ }
300+
301+ if ( isActivePhase ( job . phase ) && age > MAX_JOB_LIFETIME_MS ) {
302+ console . warn (
303+ `[job-manager] Marking long-running job ${ jobId } as error (phase=${ job . phase } , age=${ Math . round ( age / 60000 ) } m)` ,
304+ ) ;
305+ job . abortController ?. abort ( ) ;
306+ job . phase = "error" ;
307+ job . error = `Job exceeded maximum lifetime of ${ Math . round ( MAX_JOB_LIFETIME_MS / 60000 ) } minutes` ;
308+ job . message = "Processing failed (timeout)" ;
309+ job . updatedAt = now ;
310+ cleaned ++ ;
280311 }
281312 }
282313
@@ -315,14 +346,32 @@ export async function sendWebhook(job: Job): Promise<void> {
315346 }
316347}
317348
318- const cleanupInterval = setInterval (
319- ( ) => {
320- const cleaned = cleanupExpiredJobs ( ) ;
321- if ( cleaned > 0 ) {
322- console . log ( `[job-manager] Cleaned up ${ cleaned } expired jobs` ) ;
349+ export function forceCleanupActiveJobs ( ) : number {
350+ let cleaned = 0 ;
351+ const now = Date . now ( ) ;
352+
353+ for ( const [ jobId , job ] of jobs ) {
354+ if ( isActivePhase ( job . phase ) ) {
355+ console . warn (
356+ `[job-manager] Force-cleaning job ${ jobId } (phase=${ job . phase } , age=${ Math . round ( ( now - job . createdAt ) / 60000 ) } m)` ,
357+ ) ;
358+ job . abortController ?. abort ( ) ;
359+ job . phase = "error" ;
360+ job . error = "Force-cleaned by admin" ;
361+ job . message = "Processing failed (force-cleaned)" ;
362+ job . updatedAt = now ;
363+ cleaned ++ ;
323364 }
324- } ,
325- 5 * 60 * 1000 ,
326- ) ;
365+ }
366+
367+ return cleaned ;
368+ }
369+
370+ const cleanupInterval = setInterval ( ( ) => {
371+ const cleaned = cleanupExpiredJobs ( ) ;
372+ if ( cleaned > 0 ) {
373+ console . log ( `[job-manager] Cleaned up ${ cleaned } expired/stale jobs` ) ;
374+ }
375+ } , 60 * 1000 ) ;
327376
328377cleanupInterval . unref ?.( ) ;
0 commit comments