import { CoreV1Api, KubeConfig } from '@kubernetes/client-node'; import { Writable } from 'stream'; import CloudRunnerLogger from '../../services/cloud-runner-logger'; import * as core from '@actions/core'; import waitUntil from 'async-wait-until'; import { FollowLogStreamService } from '../../services/follow-log-stream-service'; import { CloudRunnerSystem } from '../../services/cloud-runner-system'; class KubernetesTaskRunner { static lastReceivedTimestamp: number; static async runTask( kubeConfig: KubeConfig, kubeClient: CoreV1Api, jobName: string, podName: string, containerName: string, namespace: string, alreadyFinished: boolean = false, ) { CloudRunnerLogger.log( `Streaming logs from pod: ${podName} container: ${containerName} namespace: ${namespace} finished ${alreadyFinished}`, ); const stream = new Writable(); let output = ''; let didStreamAnyLogs: boolean = false; let shouldReadLogs = true; let shouldCleanup = true; stream._write = (chunk, encoding, next) => { didStreamAnyLogs = true; try { const dateString = `${chunk.toString().split(`Z `)[0]}Z`; const newDate = Date.parse(dateString); new Date(newDate).toISOString(); KubernetesTaskRunner.lastReceivedTimestamp = newDate; } catch { /* */ } const message = chunk.toString().split(`Z `)[1].trimRight(`\n`); ({ shouldReadLogs, shouldCleanup, output } = FollowLogStreamService.handleIteration( message, shouldReadLogs, shouldCleanup, output, )); next(); }; // export interface LogOptions { /** * Follow the log stream of the pod. Defaults to false. */ // follow?: boolean; /** * If set, the number of bytes to read from the server before terminating the log output. This may not display a * complete final line of logging, and may return slightly more or slightly less than the specified limit. */ // limitBytes?: number; /** * If true, then the output is pretty printed. */ // pretty?: boolean; /** * Return previous terminated container logs. Defaults to false. */ // previous?: boolean; /** * A relative time in seconds before the current time from which to show logs. If this value precedes the time a * pod was started, only logs since the pod start will be returned. If this value is in the future, no logs will * be returned. Only one of sinceSeconds or sinceTime may be specified. */ // sinceSeconds?: number; /** * If set, the number of lines from the end of the logs to show. If not specified, logs are shown from the creation * of the container or sinceSeconds or sinceTime */ // tailLines?: number; /** * If true, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line of log output. Defaults to false. */ // timestamps?: boolean; // } // const logOptions = { // follow: !alreadyFinished, // pretty: false, // previous: alreadyFinished, // timestamps: true, // sinceSeconds: KubernetesTaskRunner.lastReceivedTimestamp, // }; try { // const resultError = await new Log(kubeConfig).log(namespace, podName, containerName, stream, logOptions); const sinceTime = KubernetesTaskRunner.lastReceivedTimestamp ? `--since-time="${new Date(KubernetesTaskRunner.lastReceivedTimestamp).toISOString()}" ` : ` `; await CloudRunnerSystem.Run( `kubectl logs ${podName} -c ${containerName} --timestamps ${sinceTime}> app.log`, false, true, ); const logs = await CloudRunnerSystem.Run(`cat app.log`, false, true); const splitLogs = logs.split(`\n`); for (const element of splitLogs) { stream.write(element); } stream.destroy(); // if (resultError) { // throw resultError; // } if (!didStreamAnyLogs) { core.error('Failed to stream any logs, listing namespace events, check for an error with the container'); core.error( JSON.stringify( { events: (await kubeClient.listNamespacedEvent(namespace)).body.items .filter((x) => { return x.involvedObject.name === podName || x.involvedObject.name === jobName; }) .map((x) => { return { type: x.involvedObject.kind, name: x.involvedObject.name, message: x.message, }; }), }, undefined, 4, ), ); throw new Error(`No logs streamed from k8s`); } } catch (error: any) { if (stream) { stream.destroy(); } CloudRunnerLogger.log('k8s task runner failed'); CloudRunnerLogger.log(JSON.stringify(error?.response?.body, undefined, 4)); CloudRunnerLogger.log(JSON.stringify(error, undefined, 4)); } CloudRunnerLogger.log('end of log stream'); return output; } static async watchUntilPodRunning(kubeClient: CoreV1Api, podName: string, namespace: string) { let success: boolean = false; let message = ``; CloudRunnerLogger.log(`Watching ${podName} ${namespace}`); await waitUntil( async () => { const status = await kubeClient.readNamespacedPodStatus(podName, namespace); const phase = status?.body.status?.phase; success = phase === 'Running'; message = `Phase:${status.body.status?.phase} \n Reason:${ status.body.status?.conditions?.[0].reason || '' } \n Message:${status.body.status?.conditions?.[0].message || ''}`; CloudRunnerLogger.log( JSON.stringify( (await kubeClient.listNamespacedEvent(namespace)).body.items .map((x) => { return { message: x.message || ``, name: x.metadata.name || ``, reason: x.reason || ``, }; }) .filter((x) => x.name.includes(podName)), undefined, 4, ), ); if (success || phase !== 'Pending') return true; return false; }, { timeout: 2000000, intervalBetweenAttempts: 15000, }, ); CloudRunnerLogger.log(message); return success; } } export default KubernetesTaskRunner;