Consume kinesis logs while task is RUNNING status

pull/218/head
Frostebite 2021-02-07 15:48:22 +00:00
parent aca1ff1974
commit 6aec635bd4
2 changed files with 27 additions and 16 deletions

File diff suppressed because one or more lines are too long

View File

@ -106,21 +106,32 @@ class AWS {
// watching logs // watching logs
const kinesis = new SDK.Kinesis(); const kinesis = new SDK.Kinesis();
const iterator = await kinesis const getTaskStatus = async () => {
.getShardIterator({ const tasks = await ECS.describeTasks({
ShardIteratorType: 'TRIM_HORIZON', cluster: clusterName,
StreamName: taskDefResources.StackResources.find( tasks: [task.tasks[0].taskArn],
(x) => x.LogicalResourceId === 'KinesisStream', }).promise();
).PhysicalResourceId, core.info(`Task status is ${tasks.tasks[0].lastStatus}`);
ShardId: 'example', return tasks.tasks[0].lastStatus;
}) };
.promise();
const records = await kinesis while ((await getTaskStatus()) === 'RUNNING') {
.getRecords({ const iterator = await kinesis
ShardIterator: iterator.ShardIterator, .getShardIterator({
}) ShardIteratorType: 'TRIM_HORIZON',
.promise(); StreamName: taskDefResources.StackResources.find(
core.info(records.Records[0].Data); (x) => x.LogicalResourceId === 'KinesisStream',
).PhysicalResourceId,
ShardId: 'example',
})
.promise();
const records = await kinesis
.getRecords({
ShardIterator: iterator.ShardIterator,
})
.promise();
core.info(records.Records[0].Data);
}
await ECS.waitFor('tasksStopped', { await ECS.waitFor('tasksStopped', {
cluster: clusterName, cluster: clusterName,