fix k8s isodate

pull/524/head
Frostebite 2023-03-24 23:43:39 +00:00
parent ae0804b5c7
commit b22874e411
4 changed files with 152 additions and 302 deletions

209
dist/index.js generated vendored
View File

@ -3046,8 +3046,6 @@ const kubernetes_job_spec_factory_1 = __importDefault(__nccwpck_require__(3610))
const kubernetes_service_account_1 = __importDefault(__nccwpck_require__(47319));
const cloud_runner_logger_1 = __importDefault(__nccwpck_require__(22855));
const cloud_runner_1 = __importDefault(__nccwpck_require__(79144));
const kubernetes_pods_1 = __importDefault(__nccwpck_require__(90740));
const follow_log_stream_service_1 = __nccwpck_require__(64121);
class Kubernetes {
// eslint-disable-next-line no-unused-vars
constructor(buildParameters) {
@ -3136,73 +3134,18 @@ class Kubernetes {
this.containerName = `main`;
await kubernetes_secret_1.default.createSecret(secrets, this.secretName, this.namespace, this.kubeClient);
let output = '';
// eslint-disable-next-line no-constant-condition
while (true) {
try {
let existsAlready = false;
try {
(await this.kubeClient.readNamespacedPodStatus(this.podName, this.namespace)).body.status;
existsAlready = true;
}
catch {
// empty
}
if (!existsAlready) {
cloud_runner_logger_1.default.log('Job does not exist');
await this.createJob(commands, image, mountdir, workingdir, environment, secrets);
cloud_runner_logger_1.default.log('Watching pod until running');
await kubernetes_task_runner_1.default.watchUntilPodRunning(this.kubeClient, this.podName, this.namespace);
}
cloud_runner_logger_1.default.log('Pod running, streaming logs');
const running = await kubernetes_pods_1.default.IsPodRunning(this.podName, this.namespace, this.kubeClient);
output += await kubernetes_task_runner_1.default.runTask(this.kubeConfig, this.kubeClient, this.jobName, this.podName, kubernetes_job_spec_factory_1.default.MainContainerName, this.namespace, running);
let podStatus = await kubernetes_pods_1.default.GetPodStatus(this.podName, this.namespace, this.kubeClient);
if (!running) {
if (!follow_log_stream_service_1.FollowLogStreamService.DidReceiveEndOfTransmission && podStatus === `Succeeded`) {
output += await kubernetes_task_runner_1.default.runTask(this.kubeConfig, this.kubeClient, this.jobName, this.podName, 'main', this.namespace, true);
cloud_runner_logger_1.default.log(JSON.stringify((await this.kubeClient.listNamespacedEvent(this.namespace)).body.items
.map((x) => {
return {
message: x.message || ``,
name: x.metadata.name || ``,
reason: x.reason || ``,
};
})
.filter((x) => x.name.includes(this.podName)), undefined, 4));
break;
}
if (follow_log_stream_service_1.FollowLogStreamService.DidReceiveEndOfTransmission) {
break;
}
}
podStatus = await kubernetes_pods_1.default.GetPodStatus(this.podName, this.namespace, this.kubeClient);
cloud_runner_logger_1.default.log(`Pod status ${podStatus}, retrying log stream...`);
}
catch (error) {
let errorParsed;
try {
errorParsed = JSON.parse(error);
}
catch {
errorParsed = error;
}
const errorMessage = errorParsed.name || errorParsed.reason || errorParsed.response?.body?.reason || errorParsed.message;
const continueStreaming = errorMessage.includes(`dial timeout, backstop`) ||
errorMessage.includes(`HttpError`) ||
errorMessage.includes(`HttpError: HTTP request failed`) ||
errorMessage.includes(`an error occurred when try to find container`) ||
errorMessage.includes(`not found`) ||
errorMessage.includes(`Not Found`);
if (continueStreaming) {
cloud_runner_logger_1.default.log('Log Stream Container Not Found');
await new Promise((resolve) => resolve(5000));
continue;
}
else {
cloud_runner_logger_1.default.log(`error running k8s workflow ${error}`);
throw error;
}
}
try {
cloud_runner_logger_1.default.log('Job does not exist');
await this.createJob(commands, image, mountdir, workingdir, environment, secrets);
cloud_runner_logger_1.default.log('Watching pod until running');
await kubernetes_task_runner_1.default.watchUntilPodRunning(this.kubeClient, this.podName, this.namespace);
cloud_runner_logger_1.default.log('Pod running, streaming logs');
output += await kubernetes_task_runner_1.default.runTask(this.kubeConfig, this.kubeClient, this.jobName, this.podName, kubernetes_job_spec_factory_1.default.MainContainerName, this.namespace);
}
catch (error) {
cloud_runner_logger_1.default.log(`error running k8s workflow ${error}`);
await this.cleanupTaskResources();
throw error;
}
await this.cleanupTaskResources();
return output;
@ -3459,38 +3402,6 @@ KubernetesJobSpecFactory.MainContainerName = 'main';
exports["default"] = KubernetesJobSpecFactory;
/***/ }),
/***/ 90740:
/***/ (function(__unused_webpack_module, exports, __nccwpck_require__) {
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", ({ value: true }));
const cloud_runner_logger_1 = __importDefault(__nccwpck_require__(22855));
class KubernetesPods {
static async IsPodRunning(podName, namespace, kubeClient) {
const pods = (await kubeClient.listNamespacedPod(namespace)).body.items.filter((x) => podName === x.metadata?.name);
const running = pods.length > 0 && (pods[0].status?.phase === `Running` || pods[0].status?.phase === `Pending`);
const phase = pods[0]?.status?.phase || 'undefined status';
cloud_runner_logger_1.default.log(`Getting pod status: ${phase}`);
if (phase === `Failed`) {
throw new Error(`K8s pod failed`);
}
return running;
}
static async GetPodStatus(podName, namespace, kubeClient) {
const pods = (await kubeClient.listNamespacedPod(namespace)).body.items.find((x) => podName === x.metadata?.name);
const phase = pods?.status?.phase || 'undefined status';
return phase;
}
}
exports["default"] = KubernetesPods;
/***/ }),
/***/ 95875:
@ -3752,11 +3663,11 @@ const follow_log_stream_service_1 = __nccwpck_require__(64121);
const cloud_runner_system_1 = __nccwpck_require__(99393);
const cloud_runner_1 = __importDefault(__nccwpck_require__(79144));
class KubernetesTaskRunner {
static async runTask(kubeConfig, kubeClient, jobName, podName, containerName, namespace, alreadyFinished = false) {
static async runTask(kubeConfig, kubeClient, jobName, podName, containerName, namespace) {
const lastReceivedMessage = this.lastReceivedMessage !== ``
? `\nLast Log Message "${this.lastReceivedMessage}" ${this.lastReceivedTimestamp}`
: ``;
cloud_runner_logger_1.default.log(`Streaming logs from pod: ${podName} container: ${containerName} namespace: ${namespace} finished ${alreadyFinished} ${cloud_runner_1.default.buildParameters.kubeVolumeSize}/${cloud_runner_1.default.buildParameters.containerCpu}/${cloud_runner_1.default.buildParameters.containerMemory}\n${lastReceivedMessage}`);
cloud_runner_logger_1.default.log(`Streaming logs from pod: ${podName} container: ${containerName} namespace: ${namespace} ${cloud_runner_1.default.buildParameters.kubeVolumeSize}/${cloud_runner_1.default.buildParameters.containerCpu}/${cloud_runner_1.default.buildParameters.containerMemory}\n${lastReceivedMessage}`);
let output = '';
let didStreamAnyLogs = false;
let shouldReadLogs = true;
@ -3775,51 +3686,57 @@ class KubernetesTaskRunner {
const tzd = `${symbolOffset + hourOffset + Math.floor(timeOffsetInHours)}:${minOffset}${minZone}`;
const dateTZDformat = currentDateTime + tzd;
try {
const sinceTime = KubernetesTaskRunner.lastReceivedTimestamp ? `--since-time="${dateTZDformat}" ` : ` `;
let lastMessageSeenIncludedInChunk = false;
let lastMessageSeen = false;
// using this instead of Kube
const logs = await cloud_runner_system_1.CloudRunnerSystem.Run(`kubectl logs ${podName} -f -c ${containerName} --timestamps ${sinceTime}`, false, true);
const splitLogs = logs.split(`\n`);
for (const chunk of splitLogs) {
if (chunk.replace(/\s/g, ``) === KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``) &&
KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``) !== ``) {
cloud_runner_logger_1.default.log(`Previous log message found ${chunk}`);
lastMessageSeenIncludedInChunk = true;
// eslint-disable-next-line no-constant-condition
while (true) {
const sinceTime = `${KubernetesTaskRunner.lastReceivedTimestamp}` !== `` ? `--since-time="${dateTZDformat}" ` : ` `;
let lastMessageSeenIncludedInChunk = false;
let lastMessageSeen = false;
// using this instead of Kube
const logs = await cloud_runner_system_1.CloudRunnerSystem.Run(`kubectl logs ${podName} -f -c ${containerName} --timestamps ${sinceTime}`, false, true);
const splitLogs = logs.split(`\n`);
for (const chunk of splitLogs) {
if (chunk.replace(/\s/g, ``) === KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``) &&
KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``) !== ``) {
cloud_runner_logger_1.default.log(`Previous log message found ${chunk}`);
lastMessageSeenIncludedInChunk = true;
}
}
for (const chunk of splitLogs) {
const newDate = Date.parse(`${chunk.toString().split(`Z `)[0]}Z`);
if (chunk.replace(/\s/g, ``) === KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``)) {
lastMessageSeen = true;
}
if (lastMessageSeenIncludedInChunk && !lastMessageSeen) {
continue;
}
didStreamAnyLogs = true;
const message = cloud_runner_1.default.buildParameters.cloudRunnerDebug ? chunk : chunk.split(`Z `)[1];
KubernetesTaskRunner.lastReceivedMessage = chunk;
KubernetesTaskRunner.lastReceivedTimestamp = newDate;
({ shouldReadLogs, shouldCleanup, output } = follow_log_stream_service_1.FollowLogStreamService.handleIteration(message, shouldReadLogs, shouldCleanup, output));
}
if (!didStreamAnyLogs) {
core.error('Failed to stream any logs, listing namespace events, check for an error with the container');
core.error(JSON.stringify({
events: (await kubeClient.listNamespacedEvent(namespace)).body.items
.filter((x) => {
return x.involvedObject.name === podName || x.involvedObject.name === jobName;
})
.map((x) => {
return {
type: x.involvedObject.kind,
name: x.involvedObject.name,
message: x.message,
};
}),
}, undefined, 4));
throw new Error(`No logs streamed from k8s`);
}
if (follow_log_stream_service_1.FollowLogStreamService.DidReceiveEndOfTransmission) {
cloud_runner_logger_1.default.log('end of log stream');
break;
}
}
for (const chunk of splitLogs) {
const newDate = Date.parse(`${chunk.toString().split(`Z `)[0]}Z`);
if (chunk.replace(/\s/g, ``) === KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``)) {
lastMessageSeen = true;
}
if (lastMessageSeenIncludedInChunk && !lastMessageSeen) {
continue;
}
didStreamAnyLogs = true;
const message = cloud_runner_1.default.buildParameters.cloudRunnerDebug ? chunk : chunk.split(`Z `)[1];
KubernetesTaskRunner.lastReceivedMessage = chunk;
KubernetesTaskRunner.lastReceivedTimestamp = newDate;
({ shouldReadLogs, shouldCleanup, output } = follow_log_stream_service_1.FollowLogStreamService.handleIteration(message, shouldReadLogs, shouldCleanup, output));
}
if (!didStreamAnyLogs) {
core.error('Failed to stream any logs, listing namespace events, check for an error with the container');
core.error(JSON.stringify({
events: (await kubeClient.listNamespacedEvent(namespace)).body.items
.filter((x) => {
return x.involvedObject.name === podName || x.involvedObject.name === jobName;
})
.map((x) => {
return {
type: x.involvedObject.kind,
name: x.involvedObject.name,
message: x.message,
};
}),
}, undefined, 4));
throw new Error(`No logs streamed from k8s`);
}
cloud_runner_logger_1.default.log('end of log stream');
}
catch (error) {
cloud_runner_logger_1.default.log(`k8s stream watching failed ${JSON.stringify(error, undefined, 4)}`);

2
dist/index.js.map generated vendored

File diff suppressed because one or more lines are too long

View File

@ -14,8 +14,6 @@ import { CoreV1Api } from '@kubernetes/client-node';
import CloudRunner from '../../cloud-runner';
import { ProviderResource } from '../provider-resource';
import { ProviderWorkflow } from '../provider-workflow';
import KubernetesPods from './kubernetes-pods';
import { FollowLogStreamService } from '../../services/follow-log-stream-service';
class Kubernetes implements ProviderInterface {
public static Instance: Kubernetes;
@ -133,98 +131,27 @@ class Kubernetes implements ProviderInterface {
this.containerName = `main`;
await KubernetesSecret.createSecret(secrets, this.secretName, this.namespace, this.kubeClient);
let output = '';
// eslint-disable-next-line no-constant-condition
while (true) {
try {
let existsAlready = false;
try {
(await this.kubeClient.readNamespacedPodStatus(this.podName, this.namespace)).body.status;
existsAlready = true;
} catch {
// empty
}
if (!existsAlready) {
CloudRunnerLogger.log('Job does not exist');
await this.createJob(commands, image, mountdir, workingdir, environment, secrets);
CloudRunnerLogger.log('Watching pod until running');
await KubernetesTaskRunner.watchUntilPodRunning(this.kubeClient, this.podName, this.namespace);
}
CloudRunnerLogger.log('Pod running, streaming logs');
const running = await KubernetesPods.IsPodRunning(this.podName, this.namespace, this.kubeClient);
output += await KubernetesTaskRunner.runTask(
this.kubeConfig,
this.kubeClient,
this.jobName,
this.podName,
KubernetesJobSpecFactory.MainContainerName,
this.namespace,
running,
);
let podStatus = await KubernetesPods.GetPodStatus(this.podName, this.namespace, this.kubeClient);
try {
CloudRunnerLogger.log('Job does not exist');
await this.createJob(commands, image, mountdir, workingdir, environment, secrets);
CloudRunnerLogger.log('Watching pod until running');
await KubernetesTaskRunner.watchUntilPodRunning(this.kubeClient, this.podName, this.namespace);
if (!running) {
if (!FollowLogStreamService.DidReceiveEndOfTransmission && podStatus === `Succeeded`) {
output += await KubernetesTaskRunner.runTask(
this.kubeConfig,
this.kubeClient,
this.jobName,
this.podName,
'main',
this.namespace,
true,
);
CloudRunnerLogger.log(
JSON.stringify(
(await this.kubeClient.listNamespacedEvent(this.namespace)).body.items
.map((x) => {
return {
message: x.message || ``,
name: x.metadata.name || ``,
reason: x.reason || ``,
};
})
.filter((x) => x.name.includes(this.podName)),
undefined,
4,
),
);
break;
}
if (FollowLogStreamService.DidReceiveEndOfTransmission) {
break;
}
}
podStatus = await KubernetesPods.GetPodStatus(this.podName, this.namespace, this.kubeClient);
CloudRunnerLogger.log(`Pod status ${podStatus}, retrying log stream...`);
} catch (error: any) {
let errorParsed;
try {
errorParsed = JSON.parse(error);
} catch {
errorParsed = error;
}
const errorMessage =
errorParsed.name || errorParsed.reason || errorParsed.response?.body?.reason || errorParsed.message;
const continueStreaming =
errorMessage.includes(`dial timeout, backstop`) ||
errorMessage.includes(`HttpError`) ||
errorMessage.includes(`HttpError: HTTP request failed`) ||
errorMessage.includes(`an error occurred when try to find container`) ||
errorMessage.includes(`not found`) ||
errorMessage.includes(`Not Found`);
if (continueStreaming) {
CloudRunnerLogger.log('Log Stream Container Not Found');
await new Promise((resolve) => resolve(5000));
continue;
} else {
CloudRunnerLogger.log(`error running k8s workflow ${error}`);
throw error;
}
}
CloudRunnerLogger.log('Pod running, streaming logs');
output += await KubernetesTaskRunner.runTask(
this.kubeConfig,
this.kubeClient,
this.jobName,
this.podName,
KubernetesJobSpecFactory.MainContainerName,
this.namespace,
);
} catch (error: any) {
CloudRunnerLogger.log(`error running k8s workflow ${error}`);
await this.cleanupTaskResources();
throw error;
}
await this.cleanupTaskResources();
return output;

View File

@ -16,14 +16,13 @@ class KubernetesTaskRunner {
podName: string,
containerName: string,
namespace: string,
alreadyFinished: boolean = false,
) {
const lastReceivedMessage =
this.lastReceivedMessage !== ``
? `\nLast Log Message "${this.lastReceivedMessage}" ${this.lastReceivedTimestamp}`
: ``;
CloudRunnerLogger.log(
`Streaming logs from pod: ${podName} container: ${containerName} namespace: ${namespace} finished ${alreadyFinished} ${CloudRunner.buildParameters.kubeVolumeSize}/${CloudRunner.buildParameters.containerCpu}/${CloudRunner.buildParameters.containerMemory}\n${lastReceivedMessage}`,
`Streaming logs from pod: ${podName} container: ${containerName} namespace: ${namespace} ${CloudRunner.buildParameters.kubeVolumeSize}/${CloudRunner.buildParameters.containerCpu}/${CloudRunner.buildParameters.containerMemory}\n${lastReceivedMessage}`,
);
let output = '';
let didStreamAnyLogs: boolean = false;
@ -45,70 +44,77 @@ class KubernetesTaskRunner {
const dateTZDformat = currentDateTime + tzd;
try {
const sinceTime = KubernetesTaskRunner.lastReceivedTimestamp ? `--since-time="${dateTZDformat}" ` : ` `;
let lastMessageSeenIncludedInChunk = false;
let lastMessageSeen = false;
// eslint-disable-next-line no-constant-condition
while (true) {
const sinceTime =
`${KubernetesTaskRunner.lastReceivedTimestamp}` !== `` ? `--since-time="${dateTZDformat}" ` : ` `;
let lastMessageSeenIncludedInChunk = false;
let lastMessageSeen = false;
// using this instead of Kube
const logs = await CloudRunnerSystem.Run(
`kubectl logs ${podName} -f -c ${containerName} --timestamps ${sinceTime}`,
false,
true,
);
const splitLogs = logs.split(`\n`);
for (const chunk of splitLogs) {
if (
chunk.replace(/\s/g, ``) === KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``) &&
KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``) !== ``
) {
CloudRunnerLogger.log(`Previous log message found ${chunk}`);
lastMessageSeenIncludedInChunk = true;
}
}
for (const chunk of splitLogs) {
const newDate = Date.parse(`${chunk.toString().split(`Z `)[0]}Z`);
if (chunk.replace(/\s/g, ``) === KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``)) {
lastMessageSeen = true;
}
if (lastMessageSeenIncludedInChunk && !lastMessageSeen) {
continue;
}
didStreamAnyLogs = true;
const message = CloudRunner.buildParameters.cloudRunnerDebug ? chunk : chunk.split(`Z `)[1];
KubernetesTaskRunner.lastReceivedMessage = chunk;
KubernetesTaskRunner.lastReceivedTimestamp = newDate;
({ shouldReadLogs, shouldCleanup, output } = FollowLogStreamService.handleIteration(
message,
shouldReadLogs,
shouldCleanup,
output,
));
}
if (!didStreamAnyLogs) {
core.error('Failed to stream any logs, listing namespace events, check for an error with the container');
core.error(
JSON.stringify(
{
events: (await kubeClient.listNamespacedEvent(namespace)).body.items
.filter((x) => {
return x.involvedObject.name === podName || x.involvedObject.name === jobName;
})
.map((x) => {
return {
type: x.involvedObject.kind,
name: x.involvedObject.name,
message: x.message,
};
}),
},
undefined,
4,
),
// using this instead of Kube
const logs = await CloudRunnerSystem.Run(
`kubectl logs ${podName} -f -c ${containerName} --timestamps ${sinceTime}`,
false,
true,
);
throw new Error(`No logs streamed from k8s`);
const splitLogs = logs.split(`\n`);
for (const chunk of splitLogs) {
if (
chunk.replace(/\s/g, ``) === KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``) &&
KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``) !== ``
) {
CloudRunnerLogger.log(`Previous log message found ${chunk}`);
lastMessageSeenIncludedInChunk = true;
}
}
for (const chunk of splitLogs) {
const newDate = Date.parse(`${chunk.toString().split(`Z `)[0]}Z`);
if (chunk.replace(/\s/g, ``) === KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``)) {
lastMessageSeen = true;
}
if (lastMessageSeenIncludedInChunk && !lastMessageSeen) {
continue;
}
didStreamAnyLogs = true;
const message = CloudRunner.buildParameters.cloudRunnerDebug ? chunk : chunk.split(`Z `)[1];
KubernetesTaskRunner.lastReceivedMessage = chunk;
KubernetesTaskRunner.lastReceivedTimestamp = newDate;
({ shouldReadLogs, shouldCleanup, output } = FollowLogStreamService.handleIteration(
message,
shouldReadLogs,
shouldCleanup,
output,
));
}
if (!didStreamAnyLogs) {
core.error('Failed to stream any logs, listing namespace events, check for an error with the container');
core.error(
JSON.stringify(
{
events: (await kubeClient.listNamespacedEvent(namespace)).body.items
.filter((x) => {
return x.involvedObject.name === podName || x.involvedObject.name === jobName;
})
.map((x) => {
return {
type: x.involvedObject.kind,
name: x.involvedObject.name,
message: x.message,
};
}),
},
undefined,
4,
),
);
throw new Error(`No logs streamed from k8s`);
}
if (FollowLogStreamService.DidReceiveEndOfTransmission) {
CloudRunnerLogger.log('end of log stream');
break;
}
}
CloudRunnerLogger.log('end of log stream');
} catch (error: any) {
CloudRunnerLogger.log(`k8s stream watching failed ${JSON.stringify(error, undefined, 4)}`);
}