RunsOn implem

pull/1662/head
Cyril Rohr 2024-02-13 18:58:13 +00:00
parent 13aacd865c
commit e477012777
No known key found for this signature in database
GPG Key ID: 4F06363B8C22B3B9
11 changed files with 140945 additions and 44 deletions

File diff suppressed because one or more lines are too long

34050
dist/restore/index.js vendored

File diff suppressed because one or more lines are too long

34053
dist/save-only/index.js vendored

File diff suppressed because one or more lines are too long

34053
dist/save/index.js vendored

File diff suppressed because one or more lines are too long

3943
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,7 @@
"description": "Cache dependencies and build outputs",
"main": "dist/restore/index.js",
"scripts": {
"build": "tsc && ncc build -o dist/restore src/restore.ts && ncc build -o dist/save src/save.ts && ncc build -o dist/restore-only src/restoreOnly.ts && ncc build -o dist/save-only src/saveOnly.ts",
"build": "ncc build -o dist/restore src/restore.ts && ncc build -o dist/save src/save.ts && ncc build -o dist/restore-only src/restoreOnly.ts && ncc build -o dist/save-only src/saveOnly.ts",
"test": "tsc --noEmit && jest --coverage",
"lint": "eslint **/*.ts --cache",
"format": "prettier --write **/*.ts",
@ -23,10 +23,13 @@
"author": "GitHub",
"license": "MIT",
"dependencies": {
"@actions/cache": "^3.2.3",
"@actions/cache": "^3.2.4",
"@actions/core": "^1.10.0",
"@actions/exec": "^1.1.1",
"@actions/io": "^1.1.2"
"@actions/io": "^1.1.2",
"@aws-sdk/client-s3": "^3.511.0",
"@aws-sdk/lib-storage": "^3.513.0",
"@aws-sdk/s3-request-presigner": "^3.513.0"
},
"devDependencies": {
"@types/jest": "^27.5.2",
@ -44,7 +47,7 @@
"jest": "^28.1.3",
"jest-circus": "^27.5.1",
"nock": "^13.2.9",
"prettier": "^2.8.0",
"prettier": "^2.8.8",
"ts-jest": "^28.0.8",
"typescript": "^4.9.3"
}

View File

@ -0,0 +1,202 @@
import {
S3Client,
GetObjectCommand,
ListObjectsV2Command
} from "@aws-sdk/client-s3";
const { getSignedUrl } = require("@aws-sdk/s3-request-presigner");
import { createReadStream } from "fs";
import * as crypto from "crypto";
import {
DownloadOptions,
getDownloadOptions
} from "@actions/cache/lib/options";
import { CompressionMethod } from "@actions/cache/lib/internal/constants";
import * as core from "@actions/core";
import * as utils from "@actions/cache/lib/internal/cacheUtils";
import { Upload } from "@aws-sdk/lib-storage";
import { downloadCacheHttpClientConcurrent } from "./downloadUtils";
export interface ArtifactCacheEntry {
cacheKey?: string;
scope?: string;
cacheVersion?: string;
creationTime?: string;
archiveLocation?: string;
}
const versionSalt = "1.0";
const bucketName = process.env.RUNS_ON_S3_BUCKET_CACHE;
const region =
process.env.RUNS_ON_AWS_REGION ||
process.env.AWS_REGION ||
process.env.AWS_DEFAULT_REGION;
export function getCacheVersion(
paths: string[],
compressionMethod?: CompressionMethod,
enableCrossOsArchive = false
): string {
// don't pass changes upstream
const components = paths.slice();
// Add compression method to cache version to restore
// compressed cache as per compression method
if (compressionMethod) {
components.push(compressionMethod);
}
// Only check for windows platforms if enableCrossOsArchive is false
if (process.platform === "win32" && !enableCrossOsArchive) {
components.push("windows-only");
}
// Add salt to cache version to support breaking changes in cache entry
components.push(versionSalt);
return crypto
.createHash("sha256")
.update(components.join("|"))
.digest("hex");
}
function getS3Prefix(
paths: string[],
{ compressionMethod, enableCrossOsArchive }
): string {
const repository = process.env.GITHUB_REPOSITORY;
const version = getCacheVersion(
paths,
compressionMethod,
enableCrossOsArchive
);
return ["cache", repository, version].join("/");
}
export async function getCacheEntry(
keys,
paths,
{ compressionMethod, enableCrossOsArchive }
) {
const cacheEntry: ArtifactCacheEntry = {};
const s3Client = new S3Client({ region });
// Find the most recent key matching one of the restoreKeys prefixes
for (const restoreKey of keys) {
const s3Prefix = getS3Prefix(paths, {
compressionMethod,
enableCrossOsArchive
});
const listObjectsParams = {
Bucket: bucketName,
Prefix: [s3Prefix, restoreKey].join("/")
};
try {
const { Contents = [] } = await s3Client.send(
new ListObjectsV2Command(listObjectsParams)
);
if (Contents.length > 0) {
// Sort keys by LastModified time in descending order
const sortedKeys = Contents.sort(
(a, b) => Number(b.LastModified) - Number(a.LastModified)
);
const s3Path = sortedKeys[0].Key; // Return the most recent key
cacheEntry.cacheKey = s3Path?.replace(`${s3Prefix}/`, "");
cacheEntry.archiveLocation = `s3://${bucketName}/${s3Path}`;
return cacheEntry;
}
} catch (error) {
console.error(
`Error listing objects with prefix ${restoreKey} in bucket ${bucketName}:`,
error
);
}
}
return cacheEntry; // No keys found
}
export async function downloadCache(
archiveLocation: string,
archivePath: string,
options?: DownloadOptions
): Promise<void> {
if (!bucketName) {
throw new Error("Environment variable RUNS_ON_S3_BUCKET_CACHE not set");
}
if (!region) {
throw new Error("Environment variable RUNS_ON_AWS_REGION not set");
}
const s3Client = new S3Client({ region });
const archiveUrl = new URL(archiveLocation);
const objectKey = archiveUrl.pathname.slice(1);
const command = new GetObjectCommand({
Bucket: bucketName,
Key: objectKey
});
const url = await getSignedUrl(s3Client, command, {
expiresIn: 3600
});
const downloadOptions = getDownloadOptions({
...options,
downloadConcurrency: 14,
concurrentBlobDownloads: true
});
await downloadCacheHttpClientConcurrent(url, archivePath, downloadOptions);
}
export async function saveCache(
key: string,
paths: string[],
archivePath: string,
{ compressionMethod, enableCrossOsArchive, cacheSize: archiveFileSize }
): Promise<void> {
if (!bucketName) {
throw new Error("Environment variable RUNS_ON_S3_BUCKET_CACHE not set");
}
if (!region) {
throw new Error("Environment variable RUNS_ON_AWS_REGION not set");
}
const s3Client = new S3Client({ region });
const s3Prefix = getS3Prefix(paths, {
compressionMethod,
enableCrossOsArchive
});
const s3Key = `${s3Prefix}/${key}`;
const multipartUpload = new Upload({
client: s3Client,
params: {
Bucket: bucketName,
Key: s3Key,
Body: createReadStream(archivePath)
},
// Part size in bytes
partSize: 32 * 1024 * 1024,
// Max concurrency
queueSize: 14
});
// Commit Cache
const cacheSize = utils.getArchiveFileSizeInBytes(archivePath);
core.info(
`Cache Size: ~${Math.round(
cacheSize / (1024 * 1024)
)} MB (${cacheSize} B)`
);
core.info(`Uploading cache from ${archivePath} to ${bucketName}/${s3Key}`);
multipartUpload.on("httpUploadProgress", progress => {
core.info(`Uploaded ${progress.part}/${progress.total}.`);
});
await multipartUpload.done();
core.info(`Cache saved successfully.`);
}

236
src/custom/cache.ts 100644
View File

@ -0,0 +1,236 @@
// https://github.com/actions/toolkit/blob/%40actions/cache%403.2.2/packages/cache/src/cache.ts
import * as core from "@actions/core";
import * as path from "path";
import * as utils from "@actions/cache/lib/internal/cacheUtils";
import * as cacheHttpClient from "./backend";
import {
createTar,
extractTar,
listTar
} from "@actions/cache/lib/internal/tar";
import { DownloadOptions, UploadOptions } from "@actions/cache/lib/options";
export class ValidationError extends Error {
constructor(message: string) {
super(message);
this.name = "ValidationError";
Object.setPrototypeOf(this, ValidationError.prototype);
}
}
export class ReserveCacheError extends Error {
constructor(message: string) {
super(message);
this.name = "ReserveCacheError";
Object.setPrototypeOf(this, ReserveCacheError.prototype);
}
}
function checkPaths(paths: string[]): void {
if (!paths || paths.length === 0) {
throw new ValidationError(
`Path Validation Error: At least one directory or file path is required`
);
}
}
function checkKey(key: string): void {
if (key.length > 512) {
throw new ValidationError(
`Key Validation Error: ${key} cannot be larger than 512 characters.`
);
}
const regex = /^[^,]*$/;
if (!regex.test(key)) {
throw new ValidationError(
`Key Validation Error: ${key} cannot contain commas.`
);
}
}
/**
* isFeatureAvailable to check the presence of Actions cache service
*
* @returns boolean return true if Actions cache service feature is available, otherwise false
*/
export function isFeatureAvailable(): boolean {
return !!process.env["ACTIONS_CACHE_URL"];
}
/**
* Restores cache from keys
*
* @param paths a list of file paths to restore from the cache
* @param primaryKey an explicit key for restoring the cache
* @param restoreKeys an optional ordered list of keys to use for restoring the cache if no cache hit occurred for key
* @param downloadOptions cache download options
* @param enableCrossOsArchive an optional boolean enabled to restore on windows any cache created on any platform
* @returns string returns the key for the cache hit, otherwise returns undefined
*/
export async function restoreCache(
paths: string[],
primaryKey: string,
restoreKeys?: string[],
options?: DownloadOptions,
enableCrossOsArchive = false
): Promise<string | undefined> {
checkPaths(paths);
restoreKeys = restoreKeys || [];
const keys = [primaryKey, ...restoreKeys];
core.debug("Resolved Keys:");
core.debug(JSON.stringify(keys));
if (keys.length > 10) {
throw new ValidationError(
`Key Validation Error: Keys are limited to a maximum of 10.`
);
}
for (const key of keys) {
checkKey(key);
}
const compressionMethod = await utils.getCompressionMethod();
let archivePath = "";
try {
// path are needed to compute version
const cacheEntry = await cacheHttpClient.getCacheEntry(keys, paths, {
compressionMethod,
enableCrossOsArchive
});
if (!cacheEntry?.archiveLocation) {
// Cache not found
return undefined;
}
if (options?.lookupOnly) {
core.info("Lookup only - skipping download");
return cacheEntry.cacheKey;
}
archivePath = path.join(
await utils.createTempDirectory(),
utils.getCacheFileName(compressionMethod)
);
core.debug(`Archive Path: ${archivePath}`);
// Download the cache from the cache entry
await cacheHttpClient.downloadCache(
cacheEntry.archiveLocation,
archivePath,
options
);
if (core.isDebug()) {
await listTar(archivePath, compressionMethod);
}
const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath);
core.info(
`Cache Size: ~${Math.round(
archiveFileSize / (1024 * 1024)
)} MB (${archiveFileSize} B)`
);
await extractTar(archivePath, compressionMethod);
core.info("Cache restored successfully");
return cacheEntry.cacheKey;
} catch (error) {
const typedError = error as Error;
if (typedError.name === ValidationError.name) {
throw error;
} else {
// Supress all non-validation cache related errors because caching should be optional
core.warning(`Failed to restore: ${(error as Error).message}`);
}
} finally {
// Try to delete the archive to save space
try {
await utils.unlinkFile(archivePath);
} catch (error) {
core.debug(`Failed to delete archive: ${error}`);
}
}
return undefined;
}
/**
* Saves a list of files with the specified key
*
* @param paths a list of file paths to be cached
* @param key an explicit key for restoring the cache
* @param enableCrossOsArchive an optional boolean enabled to save cache on windows which could be restored on any platform
* @param options cache upload options
* @returns number returns cacheId if the cache was saved successfully and throws an error if save fails
*/
export async function saveCache(
paths: string[],
key: string,
options?: UploadOptions,
enableCrossOsArchive = false
): Promise<number> {
checkPaths(paths);
checkKey(key);
const compressionMethod = await utils.getCompressionMethod();
let cacheId = -1;
const cachePaths = await utils.resolvePaths(paths);
core.debug("Cache Paths:");
core.debug(`${JSON.stringify(cachePaths)}`);
if (cachePaths.length === 0) {
throw new Error(
`Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.`
);
}
const archiveFolder = await utils.createTempDirectory();
const archivePath = path.join(
archiveFolder,
utils.getCacheFileName(compressionMethod)
);
core.debug(`Archive Path: ${archivePath}`);
try {
await createTar(archiveFolder, cachePaths, compressionMethod);
if (core.isDebug()) {
await listTar(archivePath, compressionMethod);
}
const archiveFileSize = utils.getArchiveFileSizeInBytes(archivePath);
core.debug(`File Size: ${archiveFileSize}`);
await cacheHttpClient.saveCache(key, paths, archivePath, {
compressionMethod,
enableCrossOsArchive,
cacheSize: archiveFileSize
});
// dummy cacheId, if we get there without raising, it means the cache has been saved
cacheId = 1;
} catch (error) {
const typedError = error as Error;
if (typedError.name === ValidationError.name) {
throw error;
} else if (typedError.name === ReserveCacheError.name) {
core.info(`Failed to save: ${typedError.message}`);
} else {
core.warning(`Failed to save: ${typedError.message}`);
}
} finally {
// Try to delete the archive to save space
try {
await utils.unlinkFile(archivePath);
} catch (error) {
core.debug(`Failed to delete archive: ${error}`);
}
}
return cacheId;
}

View File

@ -0,0 +1,328 @@
// Just a copy of the original file from the toolkit/actions/cache repository, with a change for byte range used in the downloadCacheHttpClientConcurrent function.
import * as core from "@actions/core";
import { HttpClient } from "@actions/http-client";
import { TransferProgressEvent } from "@azure/ms-rest-js";
import * as fs from "fs";
import { DownloadOptions } from "@actions/cache/lib/options";
import { retryHttpClientResponse } from "@actions/cache/lib/internal/requestUtils";
/**
* Class for tracking the download state and displaying stats.
*/
export class DownloadProgress {
contentLength: number;
segmentIndex: number;
segmentSize: number;
segmentOffset: number;
receivedBytes: number;
startTime: number;
displayedComplete: boolean;
timeoutHandle?: ReturnType<typeof setTimeout>;
constructor(contentLength: number) {
this.contentLength = contentLength;
this.segmentIndex = 0;
this.segmentSize = 0;
this.segmentOffset = 0;
this.receivedBytes = 0;
this.displayedComplete = false;
this.startTime = Date.now();
}
/**
* Progress to the next segment. Only call this method when the previous segment
* is complete.
*
* @param segmentSize the length of the next segment
*/
nextSegment(segmentSize: number): void {
this.segmentOffset = this.segmentOffset + this.segmentSize;
this.segmentIndex = this.segmentIndex + 1;
this.segmentSize = segmentSize;
this.receivedBytes = 0;
core.debug(
`Downloading segment at offset ${this.segmentOffset} with length ${this.segmentSize}...`
);
}
/**
* Sets the number of bytes received for the current segment.
*
* @param receivedBytes the number of bytes received
*/
setReceivedBytes(receivedBytes: number): void {
this.receivedBytes = receivedBytes;
}
/**
* Returns the total number of bytes transferred.
*/
getTransferredBytes(): number {
return this.segmentOffset + this.receivedBytes;
}
/**
* Returns true if the download is complete.
*/
isDone(): boolean {
return this.getTransferredBytes() === this.contentLength;
}
/**
* Prints the current download stats. Once the download completes, this will print one
* last line and then stop.
*/
display(): void {
if (this.displayedComplete) {
return;
}
const transferredBytes = this.segmentOffset + this.receivedBytes;
const percentage = (
100 *
(transferredBytes / this.contentLength)
).toFixed(1);
const elapsedTime = Date.now() - this.startTime;
const downloadSpeed = (
transferredBytes /
(1024 * 1024) /
(elapsedTime / 1000)
).toFixed(1);
core.info(
`Received ${transferredBytes} of ${this.contentLength} (${percentage}%), ${downloadSpeed} MBs/sec`
);
if (this.isDone()) {
this.displayedComplete = true;
}
}
/**
* Returns a function used to handle TransferProgressEvents.
*/
onProgress(): (progress: TransferProgressEvent) => void {
return (progress: TransferProgressEvent) => {
this.setReceivedBytes(progress.loadedBytes);
};
}
/**
* Starts the timer that displays the stats.
*
* @param delayInMs the delay between each write
*/
startDisplayTimer(delayInMs = 1000): void {
const displayCallback = (): void => {
this.display();
if (!this.isDone()) {
this.timeoutHandle = setTimeout(displayCallback, delayInMs);
}
};
this.timeoutHandle = setTimeout(displayCallback, delayInMs);
}
/**
* Stops the timer that displays the stats. As this typically indicates the download
* is complete, this will display one last line, unless the last line has already
* been written.
*/
stopDisplayTimer(): void {
if (this.timeoutHandle) {
clearTimeout(this.timeoutHandle);
this.timeoutHandle = undefined;
}
this.display();
}
}
/**
* Download the cache using the Actions toolkit http-client concurrently
*
* @param archiveLocation the URL for the cache
* @param archivePath the local path where the cache is saved
*/
export async function downloadCacheHttpClientConcurrent(
archiveLocation: string,
archivePath: fs.PathLike,
options: DownloadOptions
): Promise<void> {
const archiveDescriptor = await fs.promises.open(archivePath, "w");
const httpClient = new HttpClient("actions/cache", undefined, {
socketTimeout: options.timeoutInMs,
keepAlive: true
});
try {
const res = await retryHttpClientResponse(
"downloadCacheMetadata",
async () =>
await httpClient.request("GET", archiveLocation, null, {
Range: "bytes=0-1"
})
);
const contentRange = res.message.headers["content-range"];
if (!contentRange) {
throw new Error("Range request not supported by server");
}
const match = contentRange?.match(/bytes \d+-\d+\/(\d+)/);
if (!match) {
throw new Error(
"Content-Range header in server response not in correct format"
);
}
const length = parseInt(match[1]);
if (Number.isNaN(length)) {
throw new Error(`Could not interpret Content-Length: ${length}`);
}
const downloads: {
offset: number;
promiseGetter: () => Promise<DownloadSegment>;
}[] = [];
const blockSize = 32 * 1024 * 1024;
for (let offset = 0; offset < length; offset += blockSize) {
const count = Math.min(blockSize, length - offset);
downloads.push({
offset,
promiseGetter: async () => {
return await downloadSegmentRetry(
httpClient,
archiveLocation,
offset,
count
);
}
});
}
// reverse to use .pop instead of .shift
downloads.reverse();
let actives = 0;
let bytesDownloaded = 0;
const progress = new DownloadProgress(length);
progress.startDisplayTimer();
const progressFn = progress.onProgress();
const activeDownloads: { [offset: number]: Promise<DownloadSegment> } =
[];
let nextDownload:
| { offset: number; promiseGetter: () => Promise<DownloadSegment> }
| undefined;
const waitAndWrite: () => Promise<void> = async () => {
const segment = await Promise.race(Object.values(activeDownloads));
await archiveDescriptor.write(
segment.buffer,
0,
segment.count,
segment.offset
);
actives--;
delete activeDownloads[segment.offset];
bytesDownloaded += segment.count;
progressFn({ loadedBytes: bytesDownloaded });
};
while ((nextDownload = downloads.pop())) {
activeDownloads[nextDownload.offset] = nextDownload.promiseGetter();
actives++;
if (actives >= (options.downloadConcurrency ?? 10)) {
await waitAndWrite();
}
}
while (actives > 0) {
await waitAndWrite();
}
} finally {
httpClient.dispose();
await archiveDescriptor.close();
}
}
async function downloadSegmentRetry(
httpClient: HttpClient,
archiveLocation: string,
offset: number,
count: number
): Promise<DownloadSegment> {
const retries = 5;
let failures = 0;
while (true) {
try {
const timeout = 30000;
const result = await promiseWithTimeout(
timeout,
downloadSegment(httpClient, archiveLocation, offset, count)
);
if (typeof result === "string") {
throw new Error("downloadSegmentRetry failed due to timeout");
}
return result;
} catch (err) {
if (failures >= retries) {
throw err;
}
failures++;
}
}
}
async function downloadSegment(
httpClient: HttpClient,
archiveLocation: string,
offset: number,
count: number
): Promise<DownloadSegment> {
const partRes = await retryHttpClientResponse(
"downloadCachePart",
async () =>
await httpClient.get(archiveLocation, {
Range: `bytes=${offset}-${offset + count - 1}`
})
);
if (!partRes.readBodyBuffer) {
throw new Error(
"Expected HttpClientResponse to implement readBodyBuffer"
);
}
return {
offset,
count,
buffer: await partRes.readBodyBuffer()
};
}
declare class DownloadSegment {
offset: number;
count: number;
buffer: Buffer;
}
const promiseWithTimeout = async <T>(
timeoutMs: number,
promise: Promise<T>
): Promise<T | string> => {
let timeoutHandle: NodeJS.Timeout;
const timeoutPromise = new Promise<string>(resolve => {
timeoutHandle = setTimeout(() => resolve("timeout"), timeoutMs);
});
return Promise.race([promise, timeoutPromise]).then(result => {
clearTimeout(timeoutHandle);
return result;
});
};

View File

@ -9,6 +9,9 @@ import {
} from "./stateProvider";
import * as utils from "./utils/actionUtils";
import * as custom from "./custom/cache";
const canSaveToS3 = process.env["RUNS_ON_S3_BUCKET_CACHE"] !== undefined;
export async function restoreImpl(
stateProvider: IStateProvider
): Promise<string | undefined> {
@ -41,13 +44,27 @@ export async function restoreImpl(
const failOnCacheMiss = utils.getInputAsBool(Inputs.FailOnCacheMiss);
const lookupOnly = utils.getInputAsBool(Inputs.LookupOnly);
const cacheKey = await cache.restoreCache(
cachePaths,
primaryKey,
restoreKeys,
{ lookupOnly: lookupOnly },
enableCrossOsArchive
);
let cacheKey: string | undefined;
if (canSaveToS3) {
core.info(
"The cache action detected a local S3 bucket cache. Using it."
);
cacheKey = await custom.restoreCache(
cachePaths,
primaryKey,
restoreKeys,
{ lookupOnly: lookupOnly }
);
} else {
cacheKey = await cache.restoreCache(
cachePaths,
primaryKey,
restoreKeys,
{ lookupOnly: lookupOnly },
enableCrossOsArchive
);
}
if (!cacheKey) {
if (failOnCacheMiss) {

View File

@ -9,6 +9,9 @@ import {
} from "./stateProvider";
import * as utils from "./utils/actionUtils";
import * as custom from "./custom/cache";
const canSaveToS3 = process.env["RUNS_ON_S3_BUCKET_CACHE"] !== undefined;
// Catch and log any unhandled exceptions. These exceptions can leak out of the uploadChunk method in
// @actions/toolkit when a failed upload closes the file descriptor causing any in-process reads to
// throw an uncaught exception. Instead of failing this action, just warn.
@ -62,12 +65,29 @@ export async function saveImpl(
Inputs.EnableCrossOsArchive
);
cacheId = await cache.saveCache(
cachePaths,
primaryKey,
{ uploadChunkSize: utils.getInputAsInt(Inputs.UploadChunkSize) },
enableCrossOsArchive
);
if (canSaveToS3) {
core.info(
"The cache action detected a local S3 bucket cache. Using it."
);
cacheId = await custom.saveCache(
cachePaths,
primaryKey,
{
uploadChunkSize: utils.getInputAsInt(Inputs.UploadChunkSize)
},
enableCrossOsArchive
);
} else {
cacheId = await cache.saveCache(
cachePaths,
primaryKey,
{
uploadChunkSize: utils.getInputAsInt(Inputs.UploadChunkSize)
},
enableCrossOsArchive
);
}
if (cacheId != -1) {
core.info(`Cache saved with key: ${primaryKey}`);