Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions npi/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,23 @@ steps:
- '${_REGISTRY}/${_PROJECT}/gcsfuse-benchmarks/orbax-emulated-benchmark-${_GCSFUSE_VERSION}:latest'
- 'fio'

# Build and push the lssd-benchmark image.
# This uses a separate standalone Dockerfile but shares the same registry location.
- name: 'gcr.io/cloud-builders/docker'
id: 'lssd-raid0-benchmark'
waitFor: ['buildx-setup']
args:
- 'buildx'
- 'build'
- '--push'
- '-f'
- 'lssd_throughput_tests/lssd.dockerfile'
- '--platform'
- 'linux/amd64,linux/arm64'
- '-t'
- '${_REGISTRY}/${_PROJECT}/gcsfuse-benchmarks/lssd-raid0-benchmark-${_GCSFUSE_VERSION}:latest'
- '.'

options:
# Using a more powerful machine is recommended for multi-platform builds.
machineType: 'E2_HIGHCPU_32'
21 changes: 21 additions & 0 deletions npi/lssd_throughput_tests/lssd.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM python:3.11-slim-bookworm

# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
mdadm \
fio \
xfsprogs \
e2fsprogs \
libaio1 \
&& rm -rf /var/lib/apt/lists/*

# Install BigQuery client
RUN pip install --no-cache-dir google-cloud-bigquery

WORKDIR /app

COPY fio/fio_benchmark_runner.py /app/fio_benchmark_runner.py

COPY lssd_throughput_tests/lssd_benchmark.py /app/lssd_benchmark.py

ENTRYPOINT ["python3", "lssd_benchmark.py"]
226 changes: 226 additions & 0 deletions npi/lssd_throughput_tests/lssd_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import os
import sys
import glob
import subprocess
import json
import logging
import argparse
import shutil

# --- BigQuery Import ---
# This expects fio_benchmark_runner.py to be in the same directory (copied via Dockerfile)
try:
import fio_benchmark_runner
except ImportError:
fio_benchmark_runner = None
print("WARNING: fio_benchmark_runner module not found. Results will NOT be uploaded to BigQuery.")

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

MOUNT_POINT = "/mnt/lssd"
RAID_DEVICE = "/dev/md0"

def check_dependencies():
"""Checks if required system tools are installed."""
required_tools = ['mdadm', 'fio', 'mkfs.ext4']
for tool in required_tools:
if not shutil.which(tool):
logger.error(f"Required tool '{tool}' is not installed.")
sys.exit(1)

def get_lssd_devices():
"""Finds Google Local NVMe SSDs."""
pattern = '/dev/disk/by-id/google-local-nvme-ssd-*'
devices = glob.glob(pattern)

if not devices:
logger.warning(f"No devices found matching pattern: {pattern}")
# Diagnostic check for volume mount
if not os.path.exists('/dev/disk/by-id'):
logger.error("CRITICAL ERROR: /dev/disk/by-id does not exist inside the container.")
logger.error("FIX: Ensure '-v /dev:/dev' is passed in npi.py definitions.")
print("no lssd")
sys.exit(0)

return devices

def create_raid_array(devices):
"""Creates a RAID 0 array from the provided devices."""
num_devices = len(devices)
logger.info(f"Found {num_devices} local SSDs. Creating RAID 0 array...")

# Stop array if it already exists to avoid conflicts
if os.path.exists(RAID_DEVICE):
logger.warning(f"{RAID_DEVICE} already exists. Attempting to stop and remove...")
subprocess.run(['mdadm', '--stop', RAID_DEVICE], check=False, stderr=subprocess.DEVNULL)
subprocess.run(['mdadm', '--remove', RAID_DEVICE], check=False, stderr=subprocess.DEVNULL)

# Create RAID 0
# mdadm --create /dev/md0 --level=0 --raid-devices=N /dev/disk/...
cmd = [
'mdadm', '--create', RAID_DEVICE,
'--level=0',
f'--raid-devices={num_devices}',
'--force', '--run'
] + devices

try:
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.communicate(input=b'y\n')

if process.returncode != 0:
logger.error("Failed to create RAID array.")
sys.exit(1)

logger.info("RAID array created successfully.")

except Exception as e:
logger.error(f"Error during RAID creation: {str(e)}")
sys.exit(1)
Comment on lines +69 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current method of creating the RAID array by piping 'y' to mdadm is fragile and can fail if mdadm has other prompts. A more robust approach is to first zero out the superblocks on the devices to prevent any interactive prompts, and then use subprocess.check_call for simpler and cleaner execution and error handling.

Suggested change
try:
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.communicate(input=b'y\n')
if process.returncode != 0:
logger.error("Failed to create RAID array.")
sys.exit(1)
logger.info("RAID array created successfully.")
except Exception as e:
logger.error(f"Error during RAID creation: {str(e)}")
sys.exit(1)
# Zero out any old superblocks to avoid mdadm prompts.
for device in devices:
subprocess.run(['mdadm', '--zero-superblock', device], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
try:
# With superblocks zeroed, mdadm shouldn't need to prompt.
# Using check_call for simpler error handling.
subprocess.check_call(cmd)
logger.info("RAID array created successfully.")
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.error(f"Error during RAID creation: {str(e)}")
sys.exit(1)


def format_and_mount():
"""Formats the RAID device and mounts it."""
try:
logger.info("Formatting RAID array with ext4...")
subprocess.check_call(['mkfs.ext4', '-F', '-q', RAID_DEVICE])

logger.info(f"Mounting {RAID_DEVICE} to {MOUNT_POINT}...")
os.makedirs(MOUNT_POINT, exist_ok=True)

subprocess.run(['umount', MOUNT_POINT], check=False, stderr=subprocess.DEVNULL)
subprocess.check_call(['mount', RAID_DEVICE, MOUNT_POINT])
subprocess.check_call(['chmod', 'a+w', MOUNT_POINT])

logger.info("Mount successful.")

except subprocess.CalledProcessError as e:
logger.error(f"Failed during format/mount: {str(e)}")
sys.exit(1)

def run_fio_benchmarks(iterations=1, project_id=None, dataset_id=None, table_id=None):
"""Runs FIO tests and uploads results to BigQuery."""
logger.info("Starting FIO benchmarks for LSSD Throughput...")

# Tests configured for LSSD throughput limits
tests = [
{"name": "seq_write_fill", "desc": "Sequential Write", "args": ["--rw=write", "--bs=1M", "--iodepth=64", "--numjobs=16"]},
{"name": "seq_read_throughput", "desc": "Sequential Read", "args": ["--rw=read", "--bs=1M", "--iodepth=64", "--numjobs=16"]},
{"name": "rand_read_iops", "desc": "Random Read 4k", "args": ["--rw=randread", "--bs=4k", "--iodepth=64", "--numjobs=16"]}
]

all_results = []

for i in range(iterations):
logger.info(f"--- Iteration {i+1}/{iterations} ---")
iter_results = {"iteration": i + 1}

for test in tests:
logger.info(f"Running Test: {test['desc']}...")
fio_file = os.path.join(MOUNT_POINT, "fio_test_file")

# We output to a JSON file so the uploader can read it
json_output_path = f"/tmp/{test['name']}_iter{i+1}.json"

cmd = [
'fio',
'--name=' + test['name'],
'--filename=' + fio_file,
'--size=10G',
'--ioengine=libaio',
'--direct=1',
'--group_reporting',
'--runtime=60',
'--time_based',
'--output-format=json',
f'--output={json_output_path}'
] + test['args']

try:
# Run FIO
subprocess.check_output(cmd, stderr=subprocess.STDOUT)

# Parse for local logging
with open(json_output_path, 'r') as f:
data = json.load(f)

job_data = data['jobs'][0]
if 'read' in job_data:
stats = job_data['read']
else:
stats = job_data['write']
bw = stats['bw_bytes'] / (1024 * 1024)
iops = stats['iops']

logger.info(f"Result {test['name']}: BW={bw:.2f} MiB/s, IOPS={iops:.2f}")
iter_results[test['name']] = {"bw_MiBps": bw, "iops": iops}

# --- UPLOAD TO BIGQUERY ---
if fio_benchmark_runner and project_id and dataset_id and table_id:
logger.info("Uploading results to BigQuery...")
# Using a generic context since this isn't standard GCSFuse
fio_env = {
"TEST_TYPE": "lssd_raid0",
"TEST_NAME": test['name'],
"ITERATION": i + 1
}

try:
fio_benchmark_runner.upload_results_to_bq(
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
fio_json_path=json_output_path,
iteration=(i + 1),
gcsfuse_flags="LSSD_RAID0_NATIVE",
fio_env=fio_env,
cpu_limit_list=None
)
except Exception as e:
logger.error(f"BQ Upload failed: {e}")
# --------------------------

# Cleanup
if os.path.exists(fio_file):
os.remove(fio_file)
if os.path.exists(json_output_path):
os.remove(json_output_path)

except subprocess.CalledProcessError as e:
logger.error(f"FIO Test failed: {e.output.decode() if e.output else 'Unknown error'}")

all_results.append(iter_results)

return all_results

if __name__ == "__main__":
# Parse arguments passed by npi.py
parser = argparse.ArgumentParser()
parser.add_argument('--iterations', type=int, default=1)
parser.add_argument('--project-id', required=True)
parser.add_argument('--bq-dataset-id', required=True)
parser.add_argument('--bq-table-id', required=True)
# npi.py passes these, but we don't strictly need them for LSSD logic
parser.add_argument('--bucket-name', help="Ignored for LSSD test")
parser.add_argument('--gcsfuse-flags', help="Ignored for LSSD test")

# parse_known_args allows us to ignore other random flags if npi.py adds them
args, unknown = parser.parse_known_args()

check_dependencies()
devices = get_lssd_devices()

create_raid_array(devices)
format_and_mount()

metrics = run_fio_benchmarks(
iterations=args.iterations,
project_id=args.project_id,
dataset_id=args.bq_dataset_id,
table_id=args.bq_table_id
)

print("\n--- Final LSSD Performance Report ---")
print(json.dumps(metrics, indent=2))
print("-------------------------------------")
27 changes: 22 additions & 5 deletions npi/npi.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ def get_available_benchmarks(self):

def _create_docker_command(self, benchmark_image_suffix, bq_table_id,
bucket_name, project_id, bq_dataset_id,
gcsfuse_flags=None, cpu_list=None, bind_fio=None):
gcsfuse_flags=None, cpu_list=None, bind_fio=None,
extra_docker_flags=None): # <--- [NEW ARGUMENT]
"""Helper to construct the full docker run command.

This method assembles the final `docker run` command string with all
Expand All @@ -111,15 +112,17 @@ def _create_docker_command(self, benchmark_image_suffix, bq_table_id,
gcsfuse_flags (str, optional): Additional flags for GCSfuse.
cpu_list (str, optional): The list of CPUs to pin the container to.
bind_fio (bool, optional): Whether to bind FIO to the same CPUs.

Returns:
str: The complete Docker command.
extra_docker_flags (str, optional): Extra flags for the docker run command (e.g. volumes).
"""
container_temp_dir = "/gcsfuse-temp"
volume_mount = ""

# Handle existing temp_dir logic
if self.temp_dir == "memory":
volume_mount = f"--mount type=tmpfs,destination={container_temp_dir}"
elif self.temp_dir == "boot-disk":
# FIX: Use the <temp_dir_path> placeholder.
# run_benchmark() replaces this with the actual path later.
volume_mount = f"-v <temp_dir_path>:{container_temp_dir}"

default_gcsfuse_flags = f"--temp-dir={container_temp_dir} -o allow_other"
Expand All @@ -130,9 +133,14 @@ def _create_docker_command(self, benchmark_image_suffix, bq_table_id,
else:
gcsfuse_flags = default_gcsfuse_flags

# [NEW LOGIC] Handle extra docker flags
docker_flags_str = ""
if extra_docker_flags:
docker_flags_str = f" {extra_docker_flags}"

base_cmd = (
"docker run --pull=always --network=host --privileged --rm "
f"{volume_mount} "
f"{volume_mount}{docker_flags_str} " # <--- [INJECTED HERE]
f"us-docker.pkg.dev/{project_id}/gcsfuse-benchmarks/{benchmark_image_suffix}-{self.gcsfuse_version}:latest "
f"--iterations={self.iterations} "
f"--bucket-name={bucket_name} "
Expand Down Expand Up @@ -239,6 +247,15 @@ def _get_benchmark_definitions(self):
bq_table_id=bq_table_id,
**config_params
)

# --- Add LSSD RAID0 Benchmark ---
definitions["lssd_raid0_benchmark"] = functools.partial(
self._create_docker_command,
benchmark_image_suffix="lssd-raid0-benchmark",
bq_table_id="lssd_raid0_benchmark",
# CRITICAL: This mounts the host's /dev directory so the container can see the NVMe disks
extra_docker_flags="-v /dev:/dev"
)
return definitions


Expand Down