fix(aws): increase backoff and handle throttling in DescribeTasks/GetRecords
parent
f7725a72d6
commit
af988e6d2a
|
@ -1992,8 +1992,8 @@ class AWSTaskRunner {
|
|||
}
|
||||
}
|
||||
static async describeTasks(clusterName, taskArn) {
|
||||
const maxAttempts = 6;
|
||||
let delayMs = 500;
|
||||
const maxAttempts = 10;
|
||||
let delayMs = 1000;
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||
try {
|
||||
const tasks = await AWSTaskRunner.ECS.send(new client_ecs_1.DescribeTasksCommand({ cluster: clusterName, tasks: [taskArn] }));
|
||||
|
@ -2007,6 +2007,7 @@ class AWSTaskRunner {
|
|||
if (!isThrottle || attempt === maxAttempts) {
|
||||
throw error;
|
||||
}
|
||||
cloud_runner_logger_1.default.log(`AWS throttled DescribeTasks (attempt ${attempt}/${maxAttempts}), backing off ${delayMs}ms`);
|
||||
await new Promise((r) => setTimeout(r, delayMs));
|
||||
delayMs *= 2;
|
||||
}
|
||||
|
@ -2033,7 +2034,19 @@ class AWSTaskRunner {
|
|||
return { output, shouldCleanup };
|
||||
}
|
||||
static async handleLogStreamIteration(iterator, shouldReadLogs, output, shouldCleanup) {
|
||||
const records = await AWSTaskRunner.Kinesis.send(new client_kinesis_1.GetRecordsCommand({ ShardIterator: iterator }));
|
||||
let records;
|
||||
try {
|
||||
records = await AWSTaskRunner.Kinesis.send(new client_kinesis_1.GetRecordsCommand({ ShardIterator: iterator }));
|
||||
}
|
||||
catch (error) {
|
||||
const isThrottle = error?.name === 'ThrottlingException' || /rate exceeded/i.test(String(error?.message));
|
||||
if (isThrottle) {
|
||||
cloud_runner_logger_1.default.log(`AWS throttled GetRecords, backing off 1000ms`);
|
||||
await new Promise((r) => setTimeout(r, 1000));
|
||||
return { iterator, shouldReadLogs, output, shouldCleanup };
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
iterator = records.NextShardIterator || '';
|
||||
({ shouldReadLogs, output, shouldCleanup } = AWSTaskRunner.logRecords(records, iterator, shouldReadLogs, output, shouldCleanup));
|
||||
return { iterator, shouldReadLogs, output, shouldCleanup };
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -132,8 +132,8 @@ class AWSTaskRunner {
|
|||
}
|
||||
|
||||
static async describeTasks(clusterName: string, taskArn: string) {
|
||||
const maxAttempts = 6;
|
||||
let delayMs = 500;
|
||||
const maxAttempts = 10;
|
||||
let delayMs = 1000;
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||
try {
|
||||
const tasks = await AWSTaskRunner.ECS.send(
|
||||
|
@ -148,6 +148,7 @@ class AWSTaskRunner {
|
|||
if (!isThrottle || attempt === maxAttempts) {
|
||||
throw error;
|
||||
}
|
||||
CloudRunnerLogger.log(`AWS throttled DescribeTasks (attempt ${attempt}/${maxAttempts}), backing off ${delayMs}ms`);
|
||||
await new Promise((r) => setTimeout(r, delayMs));
|
||||
delayMs *= 2;
|
||||
}
|
||||
|
@ -188,7 +189,18 @@ class AWSTaskRunner {
|
|||
output: string,
|
||||
shouldCleanup: boolean,
|
||||
) {
|
||||
const records = await AWSTaskRunner.Kinesis.send(new GetRecordsCommand({ ShardIterator: iterator }));
|
||||
let records: any;
|
||||
try {
|
||||
records = await AWSTaskRunner.Kinesis.send(new GetRecordsCommand({ ShardIterator: iterator }));
|
||||
} catch (error: any) {
|
||||
const isThrottle = error?.name === 'ThrottlingException' || /rate exceeded/i.test(String(error?.message));
|
||||
if (isThrottle) {
|
||||
CloudRunnerLogger.log(`AWS throttled GetRecords, backing off 1000ms`);
|
||||
await new Promise((r) => setTimeout(r, 1000));
|
||||
return { iterator, shouldReadLogs, output, shouldCleanup };
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
iterator = records.NextShardIterator || '';
|
||||
({ shouldReadLogs, output, shouldCleanup } = AWSTaskRunner.logRecords(
|
||||
records,
|
||||
|
|
Loading…
Reference in New Issue