Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions npi/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,28 @@ steps:
- '${_REGISTRY}/${_PROJECT}/gcsfuse-benchmarks/orbax-emulated-benchmark-${_GCSFUSE_VERSION}:latest'
- 'fio'

# Build and push the filecache-benchmark image.
- name: 'gcr.io/cloud-builders/docker'
id: 'filecache-benchmark'
waitFor: ['base']
args:
- 'buildx'
- 'build'
- '--push'
- '-f'
- 'fio/filecache.dockerfile'
- '--platform'
- 'linux/amd64,linux/arm64'
- '--build-arg'
- 'GCSFUSE_VERSION=${_GCSFUSE_VERSION}'
- '--build-arg'
- 'REGISTRY=${_REGISTRY}'
- '--build-arg'
- 'PROJECT=${_PROJECT}'
- '-t'
- '${_REGISTRY}/${_PROJECT}/gcsfuse-benchmarks/fio-filecache-benchmark-${_GCSFUSE_VERSION}:latest'
- 'fio'

options:
# Using a more powerful machine is recommended for multi-platform builds.
machineType: 'E2_HIGHCPU_32'
13 changes: 13 additions & 0 deletions npi/fio/filecache.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
ARG UBUNTU_VERSION=24.04
ARG GO_VERSION=1.24.5
ARG GCSFUSE_VERSION=master
ARG REGISTRY=us-docker.pkg.dev
ARG PROJECT=gcs-fuse-test

FROM ${REGISTRY}/${PROJECT}/gcsfuse-benchmarks/gcsfuse-${GCSFUSE_VERSION}-perf-base:latest
COPY run_fio_benchmark.py /run_fio_benchmark.py
COPY fio_benchmark_runner.py /fio_benchmark_runner.py
COPY run_fio_matrix.py /run_fio_matrix.py
COPY filecache_matrix.csv /filecache_matrix.csv
COPY filecache.fio /filecache.fio
ENTRYPOINT ["/run_fio_matrix.py", "--matrix-config", "/filecache_matrix.csv", "--fio-template", "/filecache.fio"]
28 changes: 28 additions & 0 deletions npi/fio/filecache.fio
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[global]
allrandrepeat=0
create_serialize=0
direct=1 # o-direct, no kernel call, directly to us
fadvise_hint=0
file_service_type=random
group_reporting=1 # combine all threads result into 1
iodepth=64
# ioengine=libaio is for high-performance asynchronous I/O, best for max throughput tests.
# It may produce 'disk util stat' errors in some container environments.
# ioengine=psync is for standard synchronous I/O, which is more stable and mimics typical application behavior.
ioengine=libaio
invalidate=1
numjobs=${NUM_JOBS}
openfiles=1
rw=${READ_TYPE}
thread=1
filename_format=file-cache-read/fs_${FILE_SIZE}/bs_${BLOCK_SIZE}/nf_${NR_FILES}/nj_${NUM_JOBS}/experiment.$jobnum.$filenum

[read]
stonewall
directory=${DIR}
# Update the block size value from the table for different experiments.
bs=${BLOCK_SIZE}
# Update the file size value from table(file size) for different experiments.
filesize=${FILE_SIZE}
# Set nrfiles per thread in such a way that the test runs for 1-2 min.
nrfiles=${NR_FILES}
25 changes: 25 additions & 0 deletions npi/fio/filecache_matrix.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
READ_TYPE,FILE_SIZE,BLOCK_SIZE,NR_FILES,NUM_JOBS
read,10M,1M,50,50
read,50M,1M,20,50
read,100M,1M,10,50
read,200M,1M,5,50
read,1G,1M,2,50
read,10G,1M,1,50
read,10M,128K,50,50
read,50M,128K,20,50
read,100M,128K,10,50
read,200M,128K,5,50
read,1G,128K,2,50
read,10G,128K,1,50
read,10M,1M,50,96
read,50M,1M,20,96
read,100M,1M,10,96
read,200M,1M,5,96
read,1G,1M,2,96
read,10G,1M,1,96
read,10M,128K,50,96
read,50M,128K,20,96
read,100M,128K,10,96
read,200M,128K,5,96
read,1G,128K,2,96
read,10G,128K,1,96
11 changes: 9 additions & 2 deletions npi/fio/fio_benchmark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def parse_fio_output(filename):
"file_size": options.get("filesize", 0),
"nr_files": options.get("nrfiles", 0),
"queue_depth": data["global options"].get("iodepth", 0),
"num_jobs": options.get("numjobs", 0),
"num_jobs": data["global options"].get("numjobs", 0),
"operation": data["global options"].get("rw", "unknown"),
"bw_mibps": bw_mibps,
"iops": iops,
Expand Down Expand Up @@ -295,8 +295,15 @@ def run_benchmark(
logging.info("Clearing page cache...")
run_command(["sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"])

mount_gcsfuse(gcsfuse_bin, gcsfuse_flags, bucket_name, mount_point, cpu_limit_list=cpu_limit_list)
# Clear the GCSfuse cache directory to ensure cold reads for file-cache tests.
# The --temp-dir flag for gcsfuse points to the cache location.
# In our Docker setup, this is mounted from the host's LSSD.
temp_dir_in_container = "/gcsfuse-temp"
logging.info(f"Clearing GCSfuse cache directory: {temp_dir_in_container}")
run_command(["sh", "-c", f"rm -rf {temp_dir_in_container}/*"])

mount_gcsfuse(gcsfuse_bin, gcsfuse_flags, bucket_name, mount_point,
cpu_limit_list=cpu_limit_list)
fio_cpu_list = cpu_limit_list if bind_fio else None

run_fio_test(fio_config, mount_point, i, output_dir,
Expand Down
2 changes: 1 addition & 1 deletion npi/fio/fullsweep.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ COPY fio_benchmark_runner.py /fio_benchmark_runner.py
COPY run_fio_matrix.py /run_fio_matrix.py
COPY fullsweep_matrix.csv /fullsweep_matrix.csv
COPY fullsweep.fio /fullsweep.fio
ENTRYPOINT ["/run_fio_matrix.py", "--matrix-config", "/fullsweep_matrix.csv", "--fio-template", "/fullsweep.fio"]
ENTRYPOINT ["/run_fio_matrix.py", "--matrix-config", "/fullsweep_matrix.csv", "--fio-template", "/fullsweep.fio"]
2 changes: 1 addition & 1 deletion npi/fio/orbax_emulated.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ COPY fio_benchmark_runner.py /fio_benchmark_runner.py
COPY run_fio_matrix.py /run_fio_matrix.py
COPY orbax_emulated_matrix.csv /orbax_emulated_matrix.csv
COPY orbax_emulated.fio /orbax_emulated.fio
ENTRYPOINT ["/run_fio_matrix.py", "--matrix-config", "/orbax_emulated_matrix.csv", "--fio-template", "/orbax_emulated.fio"]
ENTRYPOINT ["/run_fio_matrix.py", "--matrix-config", "/orbax_emulated_matrix.csv", "--fio-template", "/orbax_emulated.fio"]
2 changes: 1 addition & 1 deletion npi/fio/read.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ COPY fio_benchmark_runner.py /fio_benchmark_runner.py
COPY run_fio_matrix.py /run_fio_matrix.py
COPY read_matrix.csv /read_matrix.csv
COPY read.fio /read.fio
ENTRYPOINT ["/run_fio_matrix.py", "--matrix-config", "/read_matrix.csv", "--fio-template", "/read.fio"]
ENTRYPOINT ["/run_fio_matrix.py", "--matrix-config", "/read_matrix.csv", "--fio-template", "/read.fio"]
2 changes: 1 addition & 1 deletion npi/fio/write.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ COPY fio_benchmark_runner.py /fio_benchmark_runner.py
COPY run_fio_matrix.py /run_fio_matrix.py
COPY write_matrix.csv /write_matrix.csv
COPY write.fio /write.fio
ENTRYPOINT ["/run_fio_matrix.py", "--matrix-config", "/write_matrix.csv", "--fio-template", "/write.fio"]
ENTRYPOINT ["/run_fio_matrix.py", "--matrix-config", "/write_matrix.csv", "--fio-template", "/write.fio"]
107 changes: 80 additions & 27 deletions npi/npi.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class BenchmarkFactory:
'boot-disk').
"""

def __init__(self, bucket_name, bq_project_id, bq_dataset_id, gcsfuse_version, iterations, temp_dir):
def __init__(self, bucket_name, bq_project_id, bq_dataset_id, gcsfuse_version, iterations, temp_dir, file_cache_size_mb):
"""Initializes the BenchmarkFactory.

Args:
Expand All @@ -55,13 +55,16 @@ def __init__(self, bucket_name, bq_project_id, bq_dataset_id, gcsfuse_version, i
gcsfuse_version (str): The GCSfuse version.
iterations (int): The number of benchmark iterations.
temp_dir (str): The temporary directory type.
file_cache_size_mb (int): The size of the file cache in MiB.
-1 for unlimited.
"""
self.bucket_name = bucket_name
self.bq_project_id = bq_project_id
self.bq_dataset_id = bq_dataset_id
self.gcsfuse_version = gcsfuse_version
self.iterations = iterations
self.temp_dir = temp_dir
self.file_cache_size_mb = file_cache_size_mb
self._benchmark_definitions = self._get_benchmark_definitions()

def get_benchmark_command(self, name):
Expand Down Expand Up @@ -115,17 +118,23 @@ def _create_docker_command(self, benchmark_image_suffix, bq_table_id,
Returns:
str: The complete Docker command.
"""
container_temp_dir = "/gcsfuse-temp"
# Define a constant for the in-container temp path for consistency.
CONTAINER_TEMP_DIR = "/gcsfuse-temp"
volume_mount = ""
if self.temp_dir == "memory":
volume_mount = f"--mount type=tmpfs,destination={container_temp_dir}"
tmpfs_opts = "tmpfs-size=512g"
if "filecache" in benchmark_image_suffix:
volume_mount = f"--mount type=tmpfs,destination={CONTAINER_TEMP_DIR},{tmpfs_opts} --tmpfs /dev/hugepages:rw,exec,size=1g"
else:
volume_mount = f"--mount type=tmpfs,destination={CONTAINER_TEMP_DIR},{tmpfs_opts}"
elif self.temp_dir == "boot-disk":
volume_mount = f"-v <temp_dir_path>:{container_temp_dir}"
# This placeholder is replaced in the run_benchmark function.
volume_mount = f"-v <temp_dir_placeholder>:{CONTAINER_TEMP_DIR}"

if gcsfuse_flags:
gcsfuse_flags += f" --temp-dir={container_temp_dir}"
gcsfuse_flags += f" --temp-dir={CONTAINER_TEMP_DIR}"
else:
gcsfuse_flags = f"--temp-dir={container_temp_dir}"
gcsfuse_flags = f"--temp-dir={CONTAINER_TEMP_DIR}"

base_cmd = (
"docker run --pull=always --network=host --privileged --rm "
Expand Down Expand Up @@ -195,6 +204,8 @@ def _get_benchmark_definitions(self):
},
"read": {"image_suffix": "fio-read-benchmark"},
"write": {"image_suffix": "fio-write-benchmark"},
"filecache": {"image_suffix": "fio-filecache-benchmark",
"bq_table_id_override": "filecache_{config_name}"},
#"full_sweep": {"image_suffix": "fio-fullsweep-benchmark"}, # Comment out full_sweep for now since it takes a long long time.
}

Expand Down Expand Up @@ -229,17 +240,36 @@ def _get_benchmark_definitions(self):
else:
bq_table_id = f"fio_{full_bench_name}"

# Make a copy to avoid modifying the original config dict
final_config_params = config_params.copy()
final_gcsfuse_flags = final_config_params.get("gcsfuse_flags", "")

# For filecache benchmarks, always add the filecache flags.
if bench_name == "filecache":
# Default to unlimited for disk, 512GB for memory, unless overridden.
if self.file_cache_size_mb is not None:
cache_size_mb = self.file_cache_size_mb
else:
cache_size_mb = -1 if self.temp_dir == "boot-disk" else 512000

filecache_flags = (
f"--file-cache-max-size-mb={cache_size_mb} --cache-dir={self._create_docker_command.__func__.__code__.co_consts[1]} " # Access CONTAINER_TEMP_DIR
"--file-cache-enable-parallel-downloads=true"
)
final_gcsfuse_flags = f"{filecache_flags} {final_gcsfuse_flags}".strip()
final_config_params["gcsfuse_flags"] = final_gcsfuse_flags

# Use functools.partial to create a command function with pre-filled arguments
definitions[full_bench_name] = functools.partial(
self._create_docker_command,
benchmark_image_suffix=bench_config["image_suffix"],
bq_table_id=bq_table_id,
**config_params
**final_config_params
)
return definitions


def run_benchmark(benchmark_name, command_str, temp_dir_type):
def run_benchmark(benchmark_name, command_str, temp_dir_type, temp_dir_path=None):
"""Runs a single benchmark command locally.

This function executes a benchmark command using `subprocess.run`. It handles
Expand All @@ -251,36 +281,44 @@ def run_benchmark(benchmark_name, command_str, temp_dir_type):
command_str (str): The Docker command string to execute.
temp_dir_type (str): The type of temporary directory ('memory' or
'boot-disk').
temp_dir_path (str, optional): A specific path on the host to use as the
temporary directory. Used with 'boot-disk'.

Returns:
bool: True if the benchmark ran successfully, False otherwise.
"""
print(f"--- Running benchmark: {benchmark_name} on localhost ---")

host_temp_dir = None
if temp_dir_type == "boot-disk":
host_temp_dir = tempfile.mkdtemp(prefix="gcsfuse-npi-")
print(f"Created temporary directory on host: {host_temp_dir}")
command_str = command_str.replace("<temp_dir_path>", host_temp_dir)

command = shlex.split(command_str)
print(f"Command: {' '.join(command)}")

host_temp_dir_path = None
try:
if temp_dir_type == "boot-disk" and "<temp_dir_placeholder>" in command_str:
if temp_dir_path:
host_temp_dir_path = temp_dir_path
print(f"Using specified host temporary directory: {host_temp_dir_path}")
else:
host_temp_dir_path = tempfile.mkdtemp(prefix="gcsfuse-npi-")
print(f"Created temporary directory on host: {host_temp_dir_path}")
command_str = command_str.replace("<temp_dir_placeholder>", host_temp_dir_path, 1)

command = shlex.split(command_str)
print(f"Command: {' '.join(command)}")

subprocess.run(command, check=True)
print(f"--- Benchmark {benchmark_name} on localhost finished successfully ---")
success = True
except FileNotFoundError:
print("Error: Command not found. Ensure docker is in your PATH.", file=sys.stderr)
success = False
except subprocess.CalledProcessError as e:
print(f"--- Benchmark {benchmark_name} on localhost FAILED ---", file=sys.stderr)
print(f"Return code: {e.returncode}", file=sys.stderr)
success = False
except KeyboardInterrupt:
print(f"\n--- Benchmark {benchmark_name} on localhost INTERRUPTED ---", file=sys.stderr)
success = False
# Re-raise the exception so the main loop terminates
raise
finally:
if host_temp_dir:
print(f"Cleaning up temporary directory: {host_temp_dir}")
shutil.rmtree(host_temp_dir)
if host_temp_dir_path and not temp_dir_path: # Only remove dirs we created
print(f"Cleaning up temporary directory: {host_temp_dir_path}")
shutil.rmtree(host_temp_dir_path)

return success

Expand Down Expand Up @@ -322,6 +360,16 @@ def main():
default="boot-disk",
help="The temporary directory type to use for benchmark artifacts. 'memory' uses a tmpfs mount, 'boot-disk' uses the host's disk. Default: boot-disk."
)
parser.add_argument(
"--temp-dir-path",
help="Specify a path on the host for the temp directory. Only used when --temp-dir is 'boot-disk'. The script will not clean up this directory."
)
parser.add_argument(
"--file-cache-size-mb",
type=int,
default=None,
help="Set a custom file cache size in MiB. Use -1 for unlimited. Overrides defaults."
)

args = parser.parse_args()

Expand All @@ -331,7 +379,8 @@ def main():
bq_dataset_id=args.bq_dataset_id,
gcsfuse_version=args.gcsfuse_version,
iterations=args.iterations,
temp_dir=args.temp_dir
temp_dir=args.temp_dir,
file_cache_size_mb=args.file_cache_size_mb
)

available_benchmarks = factory.get_available_benchmarks()
Expand All @@ -357,9 +406,13 @@ def main():
print(f"--- [DRY RUN] Benchmark: {benchmark_name} ---")
print(f"Command: {command_str}\n")
else:
success = run_benchmark(benchmark_name, command_str, args.temp_dir)
if not success:
failed_benchmarks.append(benchmark_name)
try:
success = run_benchmark(benchmark_name, command_str, args.temp_dir, args.temp_dir_path)
if not success:
failed_benchmarks.append(benchmark_name)
except KeyboardInterrupt:
print("\nBenchmark orchestration stopped by user.", file=sys.stderr)
sys.exit(1)

if failed_benchmarks:
print(f"\n--- Some benchmarks failed: {', '.join(failed_benchmarks)} ---", file=sys.stderr)
Expand Down