Skip to content

Processed request response stuck when workers are not available #247

@Pebin

Description

@Pebin

Hello,

I started bjoern server with 2 workers and sent 6 requests in parallel - scripts below

the server just sleeps for 1 second and then responds

What I would expect to happen is that the request would finish twice every second and their duration would be like this
1,1, 2, 2, 3, 3

But the actual duration is 2, 3, 3 ,3, 3, 3

IO task 1: Request duration 0:00:02.009872
IO task 4: Request duration 0:00:03.010739
IO task 5: Request duration 0:00:03.010668
IO task 6: Request duration 0:00:03.010534
IO task 2: Request duration 0:00:03.011427
IO task 3: Request duration 0:00:03.011170
Elapsed time: 0:00:03.012082

why is this happening? am I setting up the workers incorrectly?

the time before and after the sleep method is exactly what I would expect: the following output is just sorted by worker

1294942 - start 15:41:18.793109
1294942 - stop 15:41:19.794185
1294942 - start 15:41:19.794448
1294942 - stop 15:41:20.795519
1294942 - start 15:41:20.795629
1294942 - stop 15:41:21.796366

1294941 - start 15:41:18.793265
1294941 - stop 15:41:19.794420
1294941 - start 15:41:19.794570
1294941 - stop 15:41:20.795605
1294941 - start 15:41:20.795678
1294941 - stop 15:41:21.796336

If I send just 2 requests then both are returned in 1 second

IO task 1: Request duration 0:00:01.005457
IO task 2: Request duration 0:00:01.005327
Elapsed time: 0:00:01.006112

if I send three then duration of 2 of them is 2 seconds already 👀

IO task 1: Request duration 0:00:01.005968
IO task 3: Request duration 0:00:02.006177
IO task 2: Request duration 0:00:02.006436
Elapsed time: 0:00:02.007061

but the request doesn't end after the return? It seems like the worker is already busy with the new request before the response is sent

bjoern 3.2.2

from time import sleep

import bjoern
import os, signal
from datetime import datetime

HOST = '0.0.0.0'
PORT = 8080
N_WORKERS = 2

worker_pids = []

def app(e, s):
    s('200 OK', [])
    print(f"worker id: {os.getpid()} - {e['PATH_INFO']} - start {datetime.now().strftime('%H:%M:%S.%f')}")
    sleep(1)
    print(f"worker id: {os.getpid()} - {e['PATH_INFO']} - stop {datetime.now().strftime('%H:%M:%S.%f')}")
    return b'%i: %s' % (
        os.getpid(),
        str(datetime.now()).encode('utf-8')
    )

bjoern.listen(app, HOST, PORT)
for _ in range(N_WORKERS):
    pid = os.fork()
    if pid > 0:  # parent
        worker_pids.append(pid)
    elif pid == 0:  # worker
        try:
            bjoern.run()
        except KeyboardInterrupt:
            pass
        exit()

try:
    for _ in range(N_WORKERS):
        os.wait()
except KeyboardInterrupt:
    for pid in worker_pids:
        os.kill(pid, signal.SIGINT)
import random
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import requests

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

def task():
    now = datetime.now()
    request_id = random.randint(1, 100)
    print(f"request_id: {request_id} - start: {now}")
    requests.get(f"http://localhost:8080/request_id={request_id}")
    request_end = datetime.now()
    print(f"request_id: {request_id} - stop: {request_end}")
    return f"Request duration {(request_end - now)}, ID: {request_id}"

now = datetime.now()
run_io_tasks_in_parallel([
    lambda: print(f'IO task 1: {task()}'),
    lambda: print(f'IO task 2: {task()}'),
    lambda: print(f'IO task 3: {task()}'),
    # lambda: print(f'IO task 4: {task()}'),
    # lambda: print(f'IO task 5: {task()}'),
    # lambda: print(f'IO task 6: {task()}'),
])

print("Elapsed time:", datetime.now() - now)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions