unity-builder/src/model/cloud-runner/k8s/kubernetes-task-runner.ts

97 lines
3.1 KiB
TypeScript
Raw Normal View History

import { CoreV1Api, KubeConfig, Log } from '@kubernetes/client-node';
import { Writable } from 'stream';
import CloudRunnerLogger from '../services/cloud-runner-logger';
2021-12-29 15:43:32 +00:00
import * as core from '@actions/core';
2021-12-25 19:57:44 +00:00
import { CloudRunnerStatics } from '../cloud-runner-statics';
2021-12-29 17:25:38 +00:00
import waitUntil from 'async-wait-until';
2021-12-30 19:35:20 +00:00
import { Input } from '../..';
2021-12-29 15:21:52 +00:00
2021-12-29 17:25:38 +00:00
class KubernetesTaskRunner {
static async runTask(
kubeConfig: KubeConfig,
kubeClient: CoreV1Api,
jobName: string,
podName: string,
containerName: string,
namespace: string,
logCallback: any,
) {
2021-09-21 18:27:04 +00:00
CloudRunnerLogger.log(`Streaming logs from pod: ${podName} container: ${containerName} namespace: ${namespace}`);
const stream = new Writable();
2021-12-30 19:35:20 +00:00
let output = '';
let didStreamAnyLogs: boolean = false;
stream._write = (chunk, encoding, next) => {
didStreamAnyLogs = true;
2021-12-30 22:18:25 +00:00
let message = chunk.toString().trimRight(`\n`);
2021-12-25 19:57:44 +00:00
message = `[${CloudRunnerStatics.logPrefix}] ${message}`;
2021-12-30 19:35:20 +00:00
if (Input.cloudRunnerTests) {
output += message;
2021-12-24 02:39:00 +00:00
}
2021-12-24 03:18:44 +00:00
logCallback(message);
next();
};
const logOptions = {
follow: true,
2021-09-21 18:42:26 +00:00
pretty: false,
previous: false,
};
try {
const resultError = await new Promise(async (resolve) =>
new Log(kubeConfig).log(namespace, podName, containerName, stream, resolve, logOptions),
);
if (resultError) {
throw resultError;
}
if (!didStreamAnyLogs) {
2021-12-29 15:43:32 +00:00
core.error('Failed to stream any logs, listing namespace events, check for an error with the container');
core.error(
JSON.stringify(
{
events: (await kubeClient.listNamespacedEvent(namespace)).body.items
.filter((x) => {
return x.involvedObject.name === podName || x.involvedObject.name === jobName;
})
.map((x) => {
return {
type: x.involvedObject.kind,
name: x.involvedObject.name,
message: x.message,
};
}),
},
undefined,
4,
),
);
2021-12-29 15:43:32 +00:00
throw new Error(`No logs streamed from k8s`);
}
} catch (error) {
throw error;
}
2021-09-21 18:27:04 +00:00
CloudRunnerLogger.log('end of log stream');
2021-12-30 19:35:20 +00:00
return output;
}
2021-12-29 17:25:38 +00:00
static async watchUntilPodRunning(kubeClient: CoreV1Api, podName: string, namespace: string) {
let success: boolean = false;
CloudRunnerLogger.log(`Watching ${podName} ${namespace}`);
await waitUntil(
async () => {
2021-12-30 03:01:38 +00:00
const status = await kubeClient.readNamespacedPodStatus(podName, namespace);
const phase = status?.body.status?.phase;
2021-12-29 17:25:38 +00:00
success = phase === 'Running';
2021-12-30 22:16:56 +00:00
CloudRunnerLogger.log(`${status.body.status?.phase} ${status.body.status?.conditions?.[0].message || ''}`);
2021-12-29 17:25:38 +00:00
if (success || phase !== 'Pending') return true;
return false;
},
{
timeout: 2000000,
2021-12-29 17:25:38 +00:00
intervalBetweenAttempts: 15000,
},
);
return success;
}
}
2021-12-29 17:25:38 +00:00
export default KubernetesTaskRunner;