Interrupt k8s logs when logs found

pull/531/head
Frostebite 2023-10-02 23:58:34 +01:00
parent 9cb82940fb
commit 4b072164b5
3 changed files with 22 additions and 39 deletions

23
dist/index.js generated vendored
View File

@ -3954,7 +3954,6 @@ class KubernetesTaskRunner {
extraFlags += (await kubernetes_pods_1.default.IsPodRunning(podName, namespace, kubeClient)) extraFlags += (await kubernetes_pods_1.default.IsPodRunning(podName, namespace, kubeClient))
? ` -f -c ${containerName}` ? ` -f -c ${containerName}`
: ` --previous`; : ` --previous`;
let logs;
const callback = (outputChunk) => { const callback = (outputChunk) => {
output += outputChunk; output += outputChunk;
// split output chunk and handle per line // split output chunk and handle per line
@ -3968,15 +3967,21 @@ class KubernetesTaskRunner {
cloud_runner_logger_1.default.log(`Loghash found`); cloud_runner_logger_1.default.log(`Loghash found`);
} }
if (chunk.includes(`LOGS:`)) { if (chunk.includes(`LOGS:`)) {
cloud_runner_logger_1.default.log(`LOGS: found`);
// remove "LOGS: " and decode base64 remaining // remove "LOGS: " and decode base64 remaining
const unpacked = Buffer.from(chunk.split(`LOGS: `)[1], 'base64').toString('ascii'); const unpacked = Buffer.from(chunk.split(`LOGS: `)[1], 'base64').toString('ascii');
const result = remote_client_logger_1.RemoteClientLogger.HandleLogFull(unpacked); const result = remote_client_logger_1.RemoteClientLogger.HandleLogFull(unpacked);
cloud_runner_logger_1.default.log(`Logs found HandleLogChunkLineResult:${result}`); cloud_runner_logger_1.default.log(`Logs found HandleLogChunkLineResult:${result}`);
if (result) {
follow_log_stream_service_1.FollowLogStreamService.DidReceiveEndOfTransmission = true;
}
return;
} }
({ shouldReadLogs, shouldCleanup, output } = follow_log_stream_service_1.FollowLogStreamService.handleIteration(chunk, shouldReadLogs, shouldCleanup, output));
} }
}; };
try { try {
logs = await cloud_runner_system_1.CloudRunnerSystem.Run(`kubectl logs ${podName}${extraFlags}`, false, true, callback); await cloud_runner_system_1.CloudRunnerSystem.Run(`kubectl logs ${podName}${extraFlags}`, false, true, callback);
} }
catch (error) { catch (error) {
await new Promise((resolve) => setTimeout(resolve, 3000)); await new Promise((resolve) => setTimeout(resolve, 3000));
@ -3991,20 +3996,6 @@ class KubernetesTaskRunner {
} }
throw error; throw error;
} }
const splitLogs = logs.split(`\n`);
for (const chunk of splitLogs) {
const message = cloud_runner_1.default.buildParameters.cloudRunnerDebug ? chunk : chunk.split(`Z `)[1];
// if line contains "LOGS: " then stop
if (message.includes(`LOGS:`)) {
cloud_runner_logger_1.default.log(`LOGS: found`);
continue;
}
({ shouldReadLogs, shouldCleanup, output } = follow_log_stream_service_1.FollowLogStreamService.handleIteration(message, shouldReadLogs, shouldCleanup, output));
const result = remote_client_logger_1.RemoteClientLogger.HandleLog(message);
if (result) {
follow_log_stream_service_1.FollowLogStreamService.DidReceiveEndOfTransmission = true;
}
}
if (follow_log_stream_service_1.FollowLogStreamService.DidReceiveEndOfTransmission) { if (follow_log_stream_service_1.FollowLogStreamService.DidReceiveEndOfTransmission) {
cloud_runner_logger_1.default.log('end of log stream'); cloud_runner_logger_1.default.log('end of log stream');
break; break;

2
dist/index.js.map generated vendored

File diff suppressed because one or more lines are too long

View File

@ -32,7 +32,6 @@ class KubernetesTaskRunner {
? ` -f -c ${containerName}` ? ` -f -c ${containerName}`
: ` --previous`; : ` --previous`;
let logs;
const callback = (outputChunk: string) => { const callback = (outputChunk: string) => {
output += outputChunk; output += outputChunk;
@ -47,15 +46,28 @@ class KubernetesTaskRunner {
CloudRunnerLogger.log(`Loghash found`); CloudRunnerLogger.log(`Loghash found`);
} }
if (chunk.includes(`LOGS:`)) { if (chunk.includes(`LOGS:`)) {
CloudRunnerLogger.log(`LOGS: found`);
// remove "LOGS: " and decode base64 remaining // remove "LOGS: " and decode base64 remaining
const unpacked = Buffer.from(chunk.split(`LOGS: `)[1], 'base64').toString('ascii'); const unpacked = Buffer.from(chunk.split(`LOGS: `)[1], 'base64').toString('ascii');
const result = RemoteClientLogger.HandleLogFull(unpacked); const result = RemoteClientLogger.HandleLogFull(unpacked);
CloudRunnerLogger.log(`Logs found HandleLogChunkLineResult:${result}`); CloudRunnerLogger.log(`Logs found HandleLogChunkLineResult:${result}`);
if (result) {
FollowLogStreamService.DidReceiveEndOfTransmission = true;
}
return;
} }
({ shouldReadLogs, shouldCleanup, output } = FollowLogStreamService.handleIteration(
chunk,
shouldReadLogs,
shouldCleanup,
output,
));
} }
}; };
try { try {
logs = await CloudRunnerSystem.Run(`kubectl logs ${podName}${extraFlags}`, false, true, callback); await CloudRunnerSystem.Run(`kubectl logs ${podName}${extraFlags}`, false, true, callback);
} catch (error: any) { } catch (error: any) {
await new Promise((resolve) => setTimeout(resolve, 3000)); await new Promise((resolve) => setTimeout(resolve, 3000));
const continueStreaming = await KubernetesPods.IsPodRunning(podName, namespace, kubeClient); const continueStreaming = await KubernetesPods.IsPodRunning(podName, namespace, kubeClient);
@ -70,26 +82,6 @@ class KubernetesTaskRunner {
} }
throw error; throw error;
} }
const splitLogs = logs.split(`\n`);
for (const chunk of splitLogs) {
const message = CloudRunner.buildParameters.cloudRunnerDebug ? chunk : chunk.split(`Z `)[1];
// if line contains "LOGS: " then stop
if (message.includes(`LOGS:`)) {
CloudRunnerLogger.log(`LOGS: found`);
continue;
}
({ shouldReadLogs, shouldCleanup, output } = FollowLogStreamService.handleIteration(
message,
shouldReadLogs,
shouldCleanup,
output,
));
const result = RemoteClientLogger.HandleLog(message);
if (result) {
FollowLogStreamService.DidReceiveEndOfTransmission = true;
}
}
if (FollowLogStreamService.DidReceiveEndOfTransmission) { if (FollowLogStreamService.DidReceiveEndOfTransmission) {
CloudRunnerLogger.log('end of log stream'); CloudRunnerLogger.log('end of log stream');
break; break;