diff --git a/cmd/tools/vbuild-examples.v b/cmd/tools/vbuild-examples.v index 7d1e535b869de3..d7179a22164ce9 100644 --- a/cmd/tools/vbuild-examples.v +++ b/cmd/tools/vbuild-examples.v @@ -11,6 +11,7 @@ const efolders = [ 'examples/vweb_orm_jwt', 'examples/vweb_fullstack', 'examples/vanilla_http_server', + 'examples/fasthttp', ] pub fn normalised_vroot_path(path string) string { diff --git a/examples/fasthttp/README.md b/examples/fasthttp/README.md new file mode 100644 index 00000000000000..49e358828244b5 --- /dev/null +++ b/examples/fasthttp/README.md @@ -0,0 +1,71 @@ +# Fasthttp Example + +A simple HTTP server example using the `fasthttp` module from `vlib/fasthttp`. + +## Features + +- Handles GET and POST requests +- Routes requests to different controllers based on HTTP method and path +- Returns appropriate HTTP responses with status codes and content + +## Building + +```sh +./v examples/fasthttp +``` + +## Running + +```sh +./examples/fasthttp/fasthttp +``` + +The server will listen on `http://localhost:3000` + +## Testing + +### Home endpoint + +```sh +curl http://localhost:3000/ +``` + +### Get user by ID + +```sh +curl http://localhost:3000/user/123 +``` + +### Create user + +```sh +curl -X POST http://localhost:3000/user +``` + +### 404 response + +```sh +curl http://localhost:3000/notfound +``` + +## File Structure + +- `main.v` - Entry point and request router +- `controllers.v` - Request handlers for different routes +- `v.mod` - Module metadata + +## Architecture + +The example demonstrates: + +1. **Request Routing**: The `handle_request()` function routes incoming HTTP requests based on + method and path +2. **Response Handling**: Controllers return HTTP responses with proper headers and status codes +3. **Content Type**: All responses are returned as `[]u8` (byte arrays) + +The fasthttp module handles: + +- Low-level socket management +- Request parsing +- Connection handling +- Non-blocking I/O with epoll (Linux) or kqueue (macOS) diff --git a/examples/fasthttp/controllers.v b/examples/fasthttp/controllers.v new file mode 100644 index 00000000000000..c062493e72f296 --- /dev/null +++ b/examples/fasthttp/controllers.v @@ -0,0 +1,27 @@ +module main + +fn home_controller() ![]u8 { + response := 'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n\r\nHello, World!' + return response.bytes() +} + +fn get_user_controller(id string) ![]u8 { + body := 'User ID: ${id}' + content_length := body.len + response := 'HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: ${content_length}\r\n\r\n${body}' + return response.bytes() +} + +fn create_user_controller() ![]u8 { + body := 'User created successfully' + content_length := body.len + response := 'HTTP/1.1 201 Created\r\nContent-Type: text/plain\r\nContent-Length: ${content_length}\r\n\r\n${body}' + return response.bytes() +} + +fn not_found_response() ![]u8 { + body := '404 Not Found' + content_length := body.len + response := 'HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: ${content_length}\r\n\r\n${body}' + return response.bytes() +} diff --git a/examples/fasthttp/main.v b/examples/fasthttp/main.v new file mode 100644 index 00000000000000..136f592c910ca2 --- /dev/null +++ b/examples/fasthttp/main.v @@ -0,0 +1,37 @@ +module main + +import fasthttp + +fn handle_request(req fasthttp.HttpRequest) ![]u8 { + method := req.buffer[req.method.start..req.method.start + req.method.len].bytestr() + path := req.buffer[req.path.start..req.path.start + req.path.len].bytestr() + + if method == 'GET' { + if path == '/' { + return home_controller()! + } else if path.starts_with('/user/') { + id := path[6..] + return get_user_controller(id)! + } + } else if method == 'POST' { + if path == '/user' { + return create_user_controller()! + } + } + + return not_found_response()! +} + +fn main() { + mut server := fasthttp.new_server(fasthttp.ServerConfig{ + port: 3000 + handler: handle_request + }) or { + eprintln('Failed to create server: ${err}') + return + } + + println('Starting fasthttp server on port http://localhost:3000...') + + server.run() or { eprintln('error: ${err}') } +} diff --git a/examples/fasthttp/v.mod b/examples/fasthttp/v.mod new file mode 100644 index 00000000000000..dddc1ad77fb056 --- /dev/null +++ b/examples/fasthttp/v.mod @@ -0,0 +1,6 @@ +Module { + name: 'fasthttp_example' + description: 'A simple HTTP server example using the fasthttp module' + version: '0.0.1' + license: 'MIT' +} diff --git a/vlib/fasthttp/README.md b/vlib/fasthttp/README.md new file mode 100644 index 00000000000000..ddc9bdd875d1c9 --- /dev/null +++ b/vlib/fasthttp/README.md @@ -0,0 +1,161 @@ +# fasthttp + +The `fasthttp` module is a high-performance HTTP server library for V that provides low-level socket management and non-blocking I/O. + +## Features + +- **High Performance**: Uses platform-specific I/O multiplexing: + - `epoll` on Linux for efficient connection handling + - `kqueue` on macOS for high-performance event notification +- **Non-blocking I/O**: Handles multiple concurrent connections efficiently +- **Simple API**: Easy-to-use request handler pattern +- **Cross-platform**: Supports Linux and macOS + +## Installation + +The module is part of the standard V library. Import it in your V code: + +```v +import fasthttp +``` + +## Quick Start + +Here's a minimal HTTP server example: + +```v +import fasthttp + +fn handle_request(req fasthttp.HttpRequest) ![]u8 { + path := req.buffer[req.path.start..req.path.start + req.path.len].bytestr() + + if path == '/' { + return 'Hello, World!'.bytes() + } + + return '404 Not Found'.bytes() +} + +fn main() { + mut server := fasthttp.new_server(fasthttp.ServerConfig{ + port: 3000 + handler: handle_request + }) or { + eprintln('Failed to create server: ${err}') + return + } + + println('Server listening on http://localhost:3000') + server.run() or { eprintln('error: ${err}') } +} +``` + +## API Reference + +### `HttpRequest` Struct + +Represents an incoming HTTP request. + +**Fields:** + +- `buffer: []u8` - The raw request buffer containing the complete HTTP request +- `method: Slice` - The HTTP method (GET, POST, etc.) +- `path: Slice` - The request path +- `version: Slice` - The HTTP version (e.g., "HTTP/1.1") +- `client_conn_fd: int` - Internal socket file descriptor + +### `Slice` Struct + +Represents a slice of the request buffer. + +**Fields:** + +- `start: int` - Starting index in the buffer +- `len: int` - Length of the slice + +**Usage:** + +```v ignore +method := req.buffer[req.method.start..req.method.start + req.method.len].bytestr() +path := req.buffer[req.path.start..req.path.start + req.path.len].bytestr() +``` + +## Request Handler Pattern + +The handler function receives an `HttpRequest` and must return either: + +- `[]u8` - A byte array containing the HTTP response body +- An error if processing failed + +The handler should extract method and path information from the request and route accordingly. + +**Example:** + +```v ignore +fn my_handler(req fasthttp.HttpRequest) ![]u8 { + method := req.buffer[req.method.start..req.method.start + req.method.len].bytestr() + path := req.buffer[req.path.start..req.path.start + req.path.len].bytestr() + + match method { + 'GET' { + if path == '/' { + return 'Home page'.bytes() + } + } + 'POST' { + if path == '/api/data' { + return 'Data received'.bytes() + } + } + else {} + } + + return '404 Not Found'.bytes() +} +``` + +## Response Format + +Responses should be returned as byte arrays. +The server will send them directly to the client as HTTP response bodies. + +```v ignore +// Simple text response +return 'Hello, World!'.bytes() + +// HTML response +return 'Hello'.bytes() + +// JSON response +return '{"message": "success"}'.bytes() +``` + +## Example + +See the complete example in `examples/fasthttp/` for a more +detailed server implementation with multiple routes and controllers. + +```sh +./v examples/fasthttp +./examples/fasthttp/fasthttp +``` + +## Platform Support + +- **Linux**: Uses `epoll` for high-performance I/O multiplexing +- **macOS**: Uses `kqueue` for event notification +- **Windows**: Currently not supported + +## Performance Considerations + +- The `fasthttp` module is designed for high throughput and low latency +- Handler functions should be efficient; blocking operations will affect other connections +- Use goroutines within handlers if you need to perform long-running operations without + blocking the I/O loop + +## Notes + +- HTTP headers are currently not parsed; the entire request is available in the buffer +- Only the request method, path, and version are parsed automatically +- Response status codes and headers must be manually constructed if needed +- The module provides low-level access for maximum control and performance diff --git a/vlib/fasthttp/fasthttp.v b/vlib/fasthttp/fasthttp.v index 6c69f2e74af27c..08b8d168b44ed1 100644 --- a/vlib/fasthttp/fasthttp.v +++ b/vlib/fasthttp/fasthttp.v @@ -3,79 +3,51 @@ // that can be found in the LICENSE file. module fasthttp -import os -import time -import term +import runtime import net -#include -#include -#include -#include -#include #include -#include -#include -#include -#include #include -#include -// Generic POSIX/C definitions -fn C.setsockopt(sockfd int, level int, optname int, optval voidptr, optlen u32) int -fn C.bind(sockfd int, addr voidptr, addrlen u32) int -fn C.listen(sockfd int, backlog int) int -fn C.accept(sockfd int, addr voidptr, addrlen voidptr) int -fn C.fcntl(fd int, cmd int, arg int) int -fn C.pipe(pipefd &int) int +$if !windows { + #include + #include + #include +} + +const max_thread_pool_size = runtime.nr_cpus() +const max_connection_size = 65536 // Max events per epoll_wait + +const tiny_bad_request_response = 'HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes() +const status_444_response = 'HTTP/1.1 444 No Response\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes() +const status_413_response = 'HTTP/1.1 413 Payload Too Large\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes() + +fn C.socket(domain net.AddrFamily, typ net.SocketType, protocol int) int + +fn C.bind(sockfd int, addr &net.Addr, addrlen u32) int + +fn C.send(__fd int, __buf voidptr, __n usize, __flags int) int + +fn C.recv(__fd int, __buf voidptr, __n usize, __flags int) int + +fn C.setsockopt(__fd int, __level int, __optname int, __optval voidptr, __optlen u32) int + +fn C.listen(__fd int, __n int) int + +fn C.perror(s &u8) + fn C.close(fd int) int -fn C.read(fd int, buf voidptr, count int) int -fn C.write(fd int, buf voidptr, count int) int -fn C.malloc(size int) &u8 -fn C.free(ptr voidptr) -fn C.memset(dest voidptr, ch int, count int) voidptr -fn C.memcmp(s1 voidptr, s2 voidptr, n int) int -fn C.memmem(haystack voidptr, haystacklen int, needle voidptr, needlelen int) voidptr -fn C.strchr(s &u8, c int) &u8 -fn C.perror(s &char) - -// fn C.snprintf(str &char, size usize, format &char, args ...voidptr) int -fn C.pthread_create(thread &C.pthread_t, attr voidptr, start_routine fn (voidptr) voidptr, arg voidptr) int -fn C.pthread_mutex_init(mutex &C.pthread_mutex_t, attr voidptr) int -fn C.pthread_mutex_lock(mutex &C.pthread_mutex_t) int -fn C.pthread_mutex_unlock(mutex &C.pthread_mutex_t) int -fn C.pthread_cond_init(cond &C.pthread_cond_t, attr voidptr) int -fn C.pthread_cond_wait(cond &C.pthread_cond_t, mutex &C.pthread_mutex_t) int -fn C.pthread_cond_signal(cond &C.pthread_cond_t) int -fn C.htons(__hostshort u16) u16 -struct C.sockaddr_in { -mut: - sin_len u8 - sin_family u8 - sin_port u16 - sin_addr u32 - sin_zero [8]char -} +fn C.htons(__hostshort u16) u16 -const backlog = 128 -const buf_size = 8_000 -const num_threads = 8 +fn C.fcntl(fd int, cmd int, arg int) int -// Slice represents a part of a larger buffer, without owning the memory. -// It's useful for representing parts of the request buffer like the method and path. pub struct Slice { -pub mut: - buf &u8 = unsafe { nil } - len int +pub: + start int + len int } -pub fn (s Slice) str() string { - return unsafe { s.buf.vstring_with_len(s.len) } -} - -// HttpRequest represents a parsed HTTP request. The slices point to memory -// within the connection's buffer and are only valid for the duration of the request. pub struct HttpRequest { pub mut: buffer []u8 // A V slice of the read buffer for convenience @@ -83,130 +55,66 @@ pub mut: path Slice version Slice client_conn_fd int + user_data voidptr // User-defined context data } -// Internal struct to hold connection-specific data -struct Conn { -mut: - fd int - read_buf [buf_size]u8 - read_len int - write_buf voidptr - write_len int - write_pos int +// ServerConfig bundles the parameters needed to start a fasthttp server. +pub struct ServerConfig { +pub: + port int = 3000 + max_request_buffer_size int = 8192 + handler fn (HttpRequest) ![]u8 @[required] + user_data voidptr } -// Task for the worker thread pool -struct Task { -mut: - c &Conn = unsafe { nil } - req HttpRequest // Pass the parsed request to the worker - next &Task = unsafe { nil } - // req_buffer []u8 // The worker will own this copied buffer -} - -// Completed task data -struct Done { -mut: - c &Conn - resp voidptr - len int - next &Done -} +@[direct_array_access] +fn parse_request_line(buffer []u8) !HttpRequest { + mut req := HttpRequest{ + buffer: buffer + } -// Shared data for worker threads -struct WorkerData { -mut: - task_mutex C.pthread_mutex_t - task_cond C.pthread_cond_t - task_head &Task = unsafe { nil } - task_tail &Task = unsafe { nil } - done_mutex C.pthread_mutex_t - done_head &Done = unsafe { nil } - done_tail &Done = unsafe { nil } - quit bool - wake_pipe [2]int -} + mut i := 0 + // Parse HTTP method + for i < buffer.len && buffer[i] != ` ` { + i++ + } + req.method = Slice{ + start: 0 + len: i + } + i++ -// Server holds the entire state of the web server instance. -pub struct Server { -pub mut: - port int - socket_fd int - poll_fd int // kqueue/epoll fd - request_handler fn (req HttpRequest) ![]u8 = unsafe { nil } - worker_data WorkerData - threads [num_threads]C.pthread_t -} + // Parse path + mut path_start := i + for i < buffer.len && buffer[i] != ` ` { + i++ + } + req.path = Slice{ + start: path_start + len: i - path_start + } + i++ -// new_server creates and initializes a new Server instance. -pub fn new_server(port int, handler fn (req HttpRequest) ![]u8) !&Server { - mut s := &Server{ - port: port - request_handler: handler + // Parse HTTP version + mut version_start := i + for i < buffer.len && buffer[i] != `\r` { + i++ + } + req.version = Slice{ + start: version_start + len: i - version_start } - return s -} -fn (mut s Server) close_conn(c &Conn) { - if c.write_buf != unsafe { nil } { - C.free(c.write_buf) + // Move to the end of the request line + if i + 1 < buffer.len && buffer[i] == `\r` && buffer[i + 1] == `\n` { + i += 2 + } else { + return error('Invalid HTTP request line') } - C.close(c.fd) - unsafe { C.free(c) } + + return req } -// worker_func is the function executed by each worker thread. -// It processes tasks from the queue, calls the request handler, -// and puts the result in the 'done' queue. -fn worker_func(arg voidptr) voidptr { - // println('worker func') - mut s := unsafe { &Server(arg) } - for { - C.pthread_mutex_lock(&s.worker_data.task_mutex) - for s.worker_data.task_head == unsafe { nil } && !s.worker_data.quit { - C.pthread_cond_wait(&s.worker_data.task_cond, &s.worker_data.task_mutex) - } - if s.worker_data.quit && s.worker_data.task_head == unsafe { nil } { - C.pthread_mutex_unlock(&s.worker_data.task_mutex) - break - } - mut t := s.worker_data.task_head - s.worker_data.task_head = t.next - if s.worker_data.task_head == unsafe { nil } { - s.worker_data.task_tail = unsafe { nil } - } - C.pthread_mutex_unlock(&s.worker_data.task_mutex) - - // Call the user-provided request handler to get the response body - mut body := s.request_handler(t.req) or { panic('Request handler failed: ${err}') } - - // Copy the body with headers to done.resp - body_with_headers := body.bytestr() - resp := C.malloc(buf_size) - C.snprintf(resp, buf_size, c'%s', body_with_headers.str) - len := body_with_headers.len - - // Enqueue done - mut d := unsafe { &Done(C.malloc(sizeof(Done))) } - d.c = t.c - d.resp = resp - d.len = int(len) - d.next = unsafe { nil } - - C.pthread_mutex_lock(&s.worker_data.done_mutex) - if s.worker_data.done_tail != unsafe { nil } { - s.worker_data.done_tail.next = d - } else { - s.worker_data.done_head = d - } - s.worker_data.done_tail = d - C.pthread_mutex_unlock(&s.worker_data.done_mutex) - - // Wake IO thread (Pipe write is generic POSIX) - x := u8(`x`) - C.write(s.worker_data.wake_pipe[1], &x, 1) - unsafe { C.free(t) } - } - return unsafe { nil } +fn decode_http_request(buffer []u8) !HttpRequest { + return parse_request_line(buffer) } diff --git a/vlib/fasthttp/fasthttp_darwin.v b/vlib/fasthttp/fasthttp_darwin.v index 709d8d63b8ab3a..c5d98644f13689 100644 --- a/vlib/fasthttp/fasthttp_darwin.v +++ b/vlib/fasthttp/fasthttp_darwin.v @@ -2,6 +2,14 @@ module fasthttp #include +const buf_size = max_connection_size +const kqueue_max_events = 128 +const backlog = max_connection_size + +fn C.accept(sockfd int, address &C.sockaddr_in, addrlen &u32) int +fn C.kevent(kq int, changelist &C.kevent, nchanges int, eventlist &C.kevent, nevents int, timeout &C.timespec) int +fn C.kqueue() int + struct C.kevent { ident u64 filter i16 @@ -11,9 +19,6 @@ struct C.kevent { udata voidptr } -fn C.kqueue() int -fn C.kevent(kq int, changelist &C.kevent, nchanges int, eventlist &C.kevent, nevents int, timeout &C.timespec) int - // Helper to set fields of a kevent struct fn ev_set(mut ev C.kevent, ident u64, filter i16, flags u16, fflags u32, data isize, udata voidptr) { ev.ident = ident @@ -24,122 +29,214 @@ fn ev_set(mut ev C.kevent, ident u64, filter i16, flags u16, fflags u32, data is ev.udata = udata } -// process_dones handles connections that have been processed by a worker thread. -// It manipulates the Kqueue state (EVFILT_WRITE/READ) -fn (mut s Server) process_dones() { - C.pthread_mutex_lock(&s.worker_data.done_mutex) - mut local_head := s.worker_data.done_head - s.worker_data.done_head = unsafe { nil } - s.worker_data.done_tail = unsafe { nil } - C.pthread_mutex_unlock(&s.worker_data.done_mutex) - - for local_head != unsafe { nil } { - d := local_head - local_head = d.next - mut c := d.c - c.write_buf = d.resp - c.write_len = d.len - c.write_pos = 0 - - // Try to write immediately - write_ptr := unsafe { &u8(c.write_buf) + c.write_pos } - written := C.write(c.fd, write_ptr, c.write_len - c.write_pos) - if written > 0 { - c.write_pos += int(written) - } else if written < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { - s.close_conn(c) - unsafe { C.free(d) } - continue +struct Conn { + fd int +mut: + read_buf [buf_size]u8 + read_len int + write_buf []u8 + write_pos int +} + +pub struct Server { +pub mut: + port int + socket_fd int + poll_fd int // kqueue fd + request_handler fn (HttpRequest) ![]u8 @[required] +} + +// new_server creates and initializes a new Server instance. +pub fn new_server(config ServerConfig) !&Server { + mut server := &Server{ + port: config.port + request_handler: config.handler + } + return server +} + +fn set_nonblocking(fd int) { + flags := C.fcntl(fd, C.F_GETFL, 0) + if flags == -1 { + return + } + C.fcntl(fd, C.F_SETFL, flags | C.O_NONBLOCK) +} + +fn add_event(kq int, ident u64, filter i16, flags u16, udata voidptr) int { + mut ev := C.kevent{} + ev_set(mut &ev, ident, filter, flags, u32(0), isize(0), udata) + return C.kevent(kq, &ev, 1, unsafe { nil }, 0, unsafe { nil }) +} + +fn delete_event(kq int, ident u64, filter i16, udata voidptr) { + mut ev := C.kevent{} + ev_set(mut &ev, ident, filter, u16(C.EV_DELETE), u32(0), isize(0), udata) + C.kevent(kq, &ev, 1, unsafe { nil }, 0, unsafe { nil }) +} + +fn close_conn(kq int, c_ptr voidptr) { + mut c := unsafe { &Conn(c_ptr) } + delete_event(kq, u64(c.fd), i16(C.EVFILT_READ), c) + delete_event(kq, u64(c.fd), i16(C.EVFILT_WRITE), c) + C.close(c.fd) + c.write_buf.clear() + unsafe { free(c_ptr) } +} + +fn send_pending(c_ptr voidptr) bool { + mut c := unsafe { &Conn(c_ptr) } + if c.write_buf.len == 0 { + return false + } + remaining := c.write_buf.len - c.write_pos + if remaining <= 0 { + return false + } + write_ptr := unsafe { &c.write_buf[0] + c.write_pos } + sent := C.send(c.fd, write_ptr, remaining, 0) + if sent > 0 { + c.write_pos += int(sent) + } + if sent < 0 && (C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK) { + return true + } + return c.write_pos < c.write_buf.len +} + +fn send_bad_request(fd int) { + C.send(fd, tiny_bad_request_response.data, tiny_bad_request_response.len, 0) +} + +fn handle_write(kq int, c_ptr voidptr) { + mut c := unsafe { &Conn(c_ptr) } + if send_pending(c_ptr) { + return + } + delete_event(kq, u64(c.fd), i16(C.EVFILT_WRITE), c) + close_conn(kq, c_ptr) +} + +fn handle_read(mut s Server, kq int, c_ptr voidptr) { + mut c := unsafe { &Conn(c_ptr) } + n := C.recv(c.fd, &c.read_buf[c.read_len], buf_size - c.read_len, 0) + if n <= 0 { + if n < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { + // Unexpected recv error - send 444 No Response + C.send(c.fd, status_444_response.data, status_444_response.len, 0) + close_conn(kq, c_ptr) + return } + // Normal client closure (n == 0 or would block) + close_conn(kq, c_ptr) + return + } + + c.read_len += int(n) + if c.read_len == 0 { + return + } + + // Check if request exceeds buffer size + if c.read_len >= buf_size { + C.send(c.fd, status_413_response.data, status_413_response.len, 0) + close_conn(kq, c_ptr) + return + } + + mut req_buf := []u8{cap: c.read_len} + unsafe { + req_buf.push_many(&c.read_buf[0], c.read_len) + } + + mut decoded := decode_http_request(req_buf) or { + send_bad_request(c.fd) + close_conn(kq, c_ptr) + return + } + decoded.client_conn_fd = c.fd + + resp := s.request_handler(decoded) or { + send_bad_request(c.fd) + close_conn(kq, c_ptr) + return + } + + c.write_buf = resp.clone() + c.write_pos = 0 + c.read_len = 0 + + if send_pending(c_ptr) { + add_event(kq, u64(c.fd), i16(C.EVFILT_WRITE), u16(C.EV_ADD | C.EV_ENABLE | C.EV_CLEAR), + c) + return + } + + close_conn(kq, c_ptr) +} - if c.write_pos < c.write_len { - // Add write event if not all data was sent - mut ev := C.kevent{} - ev_set(mut &ev, u64(c.fd), i16(C.EVFILT_WRITE), u16(C.EV_ADD | C.EV_EOF), - u32(0), isize(0), c) - C.kevent(s.poll_fd, &ev, 1, unsafe { nil }, 0, unsafe { nil }) - } else { - // Response sent, re-enable reading for keep-alive - C.free(c.write_buf) - c.write_buf = unsafe { nil } - mut ev := C.kevent{} - ev_set(mut &ev, u64(c.fd), i16(C.EVFILT_READ), u16(C.EV_ADD | C.EV_EOF), u32(0), - isize(0), c) - C.kevent(s.poll_fd, &ev, 1, unsafe { nil }, 0, unsafe { nil }) - c.read_len = 0 +fn accept_clients(kq int, listen_fd int) { + for { + client_fd := C.accept(listen_fd, unsafe { nil }, unsafe { nil }) + if client_fd < 0 { + if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK { + break + } + C.perror(c'accept') + break + } + set_nonblocking(client_fd) + mut c := &Conn{ + fd: client_fd } - unsafe { C.free(d) } + add_event(kq, u64(client_fd), i16(C.EVFILT_READ), u16(C.EV_ADD | C.EV_ENABLE | C.EV_CLEAR), + c) } } // run starts the server and enters the main event loop (Kqueue version). pub fn (mut s Server) run() ! { - // Create server socket - s.socket_fd = C.socket(.ip, .tcp, 0) + s.socket_fd = C.socket(C.AF_INET, C.SOCK_STREAM, 0) if s.socket_fd < 0 { C.perror(c'socket') - return error('socket creation failed') + return } opt := 1 C.setsockopt(s.socket_fd, C.SOL_SOCKET, C.SO_REUSEADDR, &opt, sizeof(int)) mut addr := C.sockaddr_in{} - C.memset(&addr, 0, sizeof(addr)) - addr.sin_family = C.AF_INET + unsafe { + C.memset(&addr, 0, sizeof(addr)) + } + addr.sin_family = u16(C.AF_INET) addr.sin_port = u16(C.htons(u16(s.port))) if C.bind(s.socket_fd, voidptr(&addr), sizeof(addr)) < 0 { C.perror(c'bind') - return error('socket bind failed') + return } if C.listen(s.socket_fd, backlog) < 0 { C.perror(c'listen') - return error('socket listen failed') + return } - C.fcntl(s.socket_fd, C.F_SETFL, C.O_NONBLOCK) - // Create kqueue + set_nonblocking(s.socket_fd) + s.poll_fd = C.kqueue() if s.poll_fd < 0 { C.perror(c'kqueue') - return error('kqueue creation failed') + return } - mut ev := C.kevent{} - ev_set(mut &ev, u64(s.socket_fd), i16(C.EVFILT_READ), u16(C.EV_ADD), u32(0), isize(0), + add_event(s.poll_fd, u64(s.socket_fd), i16(C.EVFILT_READ), u16(C.EV_ADD | C.EV_ENABLE | C.EV_CLEAR), unsafe { nil }) - C.kevent(s.poll_fd, &ev, 1, unsafe { nil }, 0, unsafe { nil }) - - // Initialize worker data - C.pthread_mutex_init(&s.worker_data.task_mutex, unsafe { nil }) - C.pthread_cond_init(&s.worker_data.task_cond, unsafe { nil }) - C.pthread_mutex_init(&s.worker_data.done_mutex, unsafe { nil }) - - // Create wake pipe - if C.pipe(&s.worker_data.wake_pipe[0]) < 0 { - C.perror(c'pipe') - return error('pipe creation failed') - } - C.fcntl(s.worker_data.wake_pipe[0], C.F_SETFL, C.O_NONBLOCK) - C.fcntl(s.worker_data.wake_pipe[1], C.F_SETFL, C.O_NONBLOCK) - // Add wake pipe to kqueue - ev_set(mut &ev, u64(s.worker_data.wake_pipe[0]), i16(C.EVFILT_READ), u16(C.EV_ADD), - u32(0), isize(0), unsafe { nil }) - C.kevent(s.poll_fd, &ev, 1, unsafe { nil }, 0, unsafe { nil }) + println('listening on http://localhost:${s.port}/') - // Create worker threads - for i := 0; i < num_threads; i++ { - C.pthread_create(&s.threads[i], unsafe { nil }, worker_func, s) - } - - println('Server listening on port ${s.port}') - - // Event loop - events := [64]C.kevent{} + mut events := [kqueue_max_events]C.kevent{} for { - nev := C.kevent(s.poll_fd, unsafe { nil }, 0, &events[0], 64, unsafe { nil }) + nev := C.kevent(s.poll_fd, unsafe { nil }, 0, &events[0], kqueue_max_events, unsafe { nil }) if nev < 0 { C.perror(c'kevent') break @@ -147,130 +244,39 @@ pub fn (mut s Server) run() ! { for i := 0; i < nev; i++ { event := events[i] - mut c := unsafe { &Conn(event.udata) } - if event.flags & u16(C.EV_ERROR) != 0 { - if c != unsafe { nil } { - s.close_conn(c) - } - continue - } - - if event.ident == u64(s.socket_fd) { // New connection - client_fd := C.accept(s.socket_fd, unsafe { nil }, unsafe { nil }) - if client_fd < 0 { - continue - } - mut new_c := unsafe { &Conn(C.malloc(sizeof(Conn))) } - C.memset(new_c, 0, sizeof(Conn)) - new_c.fd = client_fd - C.fcntl(new_c.fd, C.F_SETFL, C.O_NONBLOCK) - ev_set(mut &ev, u64(new_c.fd), i16(C.EVFILT_READ), u16(C.EV_ADD | C.EV_EOF), - u32(0), isize(0), new_c) - C.kevent(s.poll_fd, &ev, 1, unsafe { nil }, 0, unsafe { nil }) - } else if event.ident == u64(s.worker_data.wake_pipe[0]) { // Worker is done - buf := [1024]u8{} - for C.read(s.worker_data.wake_pipe[0], &buf[0], sizeof(buf)) > 0 {} - s.process_dones() - } else if event.filter == i16(C.EVFILT_READ) { // Data from client - if event.flags & u16(C.EV_EOF) != 0 { - s.close_conn(c) + if event.ident == u64(s.socket_fd) { + C.perror(c'listener error') continue } - n := C.read(c.fd, &c.read_buf[c.read_len], buf_size - c.read_len) - if n <= 0 { - if n < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { - s.close_conn(c) - } - continue + if event.udata != unsafe { nil } { + close_conn(s.poll_fd, event.udata) } - c.read_len += int(n) - - header_end := C.memmem(&c.read_buf[0], c.read_len, c'\r\n\r\n', 4) - if header_end == unsafe { nil } { - if c.read_len >= buf_size { - s.close_conn(c) - } - continue - } - - if C.memcmp(&c.read_buf[0], c'GET ', 4) != 0 { - s.close_conn(c) - continue - } - path_start := &c.read_buf[4] - path_end := C.strchr(path_start, ` `) - if path_end == unsafe { nil } { - s.close_conn(c) - continue - } - path_len := unsafe { path_end - path_start } - - req := HttpRequest{ - buffer: c.read_buf[..c.read_len] - method: Slice{ - buf: &c.read_buf[0] - len: 3 - } - path: Slice{ - buf: path_start - len: path_len - } - client_conn_fd: c.fd - } - - c.read_len = 0 - - // Offload to worker thread - // DELETE Read event so we don't trigger while worker is busy - ev_set(mut &ev, u64(c.fd), i16(C.EVFILT_READ), u16(C.EV_DELETE), u32(0), - isize(0), c) - C.kevent(s.poll_fd, &ev, 1, unsafe { nil }, 0, unsafe { nil }) + continue + } - mut t := unsafe { &Task(C.malloc(sizeof(Task))) } - t.c = c - t.req = req - t.next = unsafe { nil } + if event.ident == u64(s.socket_fd) { + accept_clients(s.poll_fd, s.socket_fd) + continue + } - C.pthread_mutex_lock(&s.worker_data.task_mutex) - if s.worker_data.task_tail != unsafe { nil } { - s.worker_data.task_tail.next = t - } else { - s.worker_data.task_head = t - } - s.worker_data.task_tail = t - C.pthread_cond_signal(&s.worker_data.task_cond) - C.pthread_mutex_unlock(&s.worker_data.task_mutex) - } else if event.filter == i16(C.EVFILT_WRITE) { // Ready to write more data - if event.flags & u16(C.EV_EOF) != 0 { - s.close_conn(c) - continue - } - write_ptr := unsafe { &u8(c.write_buf) + c.write_pos } - written := C.write(c.fd, write_ptr, c.write_len - c.write_pos) - if written > 0 { - c.write_pos += int(written) - } else if written < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { - s.close_conn(c) - continue - } + if event.udata == unsafe { nil } { + continue + } - if c.write_pos >= c.write_len { - C.free(c.write_buf) - c.write_buf = unsafe { nil } - // Done writing, delete write filter - ev_set(mut &ev, u64(c.fd), i16(C.EVFILT_WRITE), u16(C.EV_DELETE), - u32(0), isize(0), c) - C.kevent(s.poll_fd, &ev, 1, unsafe { nil }, 0, unsafe { nil }) + if event.flags & u16(C.EV_EOF) != 0 { + close_conn(s.poll_fd, event.udata) + continue + } - c.read_len = 0 - } + if event.filter == i16(C.EVFILT_READ) { + handle_read(mut s, s.poll_fd, event.udata) + } else if event.filter == i16(C.EVFILT_WRITE) { + handle_write(s.poll_fd, event.udata) } } } C.close(s.socket_fd) C.close(s.poll_fd) - C.close(s.worker_data.wake_pipe[0]) - C.close(s.worker_data.wake_pipe[1]) } diff --git a/vlib/fasthttp/fasthttp_linux.v b/vlib/fasthttp/fasthttp_linux.v index 9e04168dd02cc4..40115667a6e906 100644 --- a/vlib/fasthttp/fasthttp_linux.v +++ b/vlib/fasthttp/fasthttp_linux.v @@ -1,19 +1,19 @@ module fasthttp +import net + #include +#include + +fn C.accept4(sockfd int, addr &net.Addr, addrlen &u32, flags int) int + +fn C.epoll_create1(__flags int) int -// Epoll constants -const epoll_ctl_add = 1 -const epoll_ctl_del = 2 -const epoll_ctl_mod = 3 -const epoll_in = 1 -const epoll_out = 4 -const epoll_err = 8 -const epoll_hup = 16 -const epoll_rdhup = 8192 +fn C.epoll_ctl(__epfd int, __op int, __fd int, __event &C.epoll_event) int + +fn C.epoll_wait(__epfd int, __events &C.epoll_event, __maxevents int, __timeout int) int union C.epoll_data { -mut: ptr voidptr fd int u32 u32 @@ -21,268 +21,289 @@ mut: } struct C.epoll_event { -mut: events u32 data C.epoll_data } -fn C.epoll_create1(flags int) int -fn C.epoll_ctl(epfd int, op int, fd int, event &C.epoll_event) int -fn C.epoll_wait(epfd int, events &C.epoll_event, maxevents int, timeout int) int +struct Server { +pub: + port int = 3000 + max_request_buffer_size int = 8192 + user_data voidptr +mut: + listen_fds []int = []int{len: max_thread_pool_size, cap: max_thread_pool_size} + epoll_fds []int = []int{len: max_thread_pool_size, cap: max_thread_pool_size} + threads []thread = []thread{len: max_thread_pool_size, cap: max_thread_pool_size} + request_handler fn (HttpRequest) ![]u8 @[required] +} -// Helper to wrap epoll_ctl for cleaner code -fn control_epoll(epfd int, op int, fd int, events u32, data voidptr) { - mut ev := C.epoll_event{ - events: events +// new_server creates and initializes a new Server instance. +pub fn new_server(config ServerConfig) !&Server { + if config.max_request_buffer_size <= 0 { + return error('max_request_buffer_size must be greater than 0') + } + mut server := &Server{ + port: config.port + max_request_buffer_size: config.max_request_buffer_size + user_data: config.user_data + request_handler: config.handler } - ev.data.ptr = data - C.epoll_ctl(epfd, op, fd, &ev) + unsafe { + server.listen_fds.flags.set(.noslices | .noshrink | .nogrow) + server.epoll_fds.flags.set(.noslices | .noshrink | .nogrow) + server.threads.flags.set(.noslices | .noshrink | .nogrow) + } + return server } -// process_dones handles connections that have been processed by a worker thread. -fn (mut s Server) process_dones() { - C.pthread_mutex_lock(&s.worker_data.done_mutex) - mut local_head := s.worker_data.done_head - s.worker_data.done_head = unsafe { nil } - s.worker_data.done_tail = unsafe { nil } - C.pthread_mutex_unlock(&s.worker_data.done_mutex) - - for local_head != unsafe { nil } { - d := local_head - local_head = d.next - mut c := d.c - c.write_buf = d.resp - c.write_len = d.len - c.write_pos = 0 - - // Try to write immediately - write_ptr := unsafe { &u8(c.write_buf) + c.write_pos } - written := C.write(c.fd, write_ptr, c.write_len - c.write_pos) - if written > 0 { - c.write_pos += int(written) - } else if written < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { - s.close_conn(c) - unsafe { C.free(d) } - continue - } - - if c.write_pos < c.write_len { - // Not all data sent, add WRITE event - // Note: The connection was removed from epoll before sending to worker, so we ADD here. - control_epoll(s.poll_fd, epoll_ctl_add, c.fd, u32(epoll_out | epoll_rdhup), - c) - } else { - // Response sent, re-enable reading for keep-alive - C.free(c.write_buf) - c.write_buf = unsafe { nil } +fn set_blocking(fd int, blocking bool) { + flags := C.fcntl(fd, C.F_GETFL, 0) + if flags == -1 { + // TODO: better error handling + eprintln(@LOCATION) + return + } + if blocking { + // This removes the O_NONBLOCK flag from flags and set it. + C.fcntl(fd, C.F_SETFL, flags & ~C.O_NONBLOCK) + } else { + // This adds the O_NONBLOCK flag from flags and set it. + C.fcntl(fd, C.F_SETFL, flags | C.O_NONBLOCK) + } +} - // Note: The connection was removed from epoll before sending to worker, so we ADD here. - control_epoll(s.poll_fd, epoll_ctl_add, c.fd, u32(epoll_in | epoll_rdhup), - c) - c.read_len = 0 +fn close_socket(fd int) bool { + ret := C.close(fd) + if ret == -1 { + if C.errno == C.EINTR { + // Interrupted by signal, retry is safe + return close_socket(fd) } - unsafe { C.free(d) } + eprintln('ERROR: close(fd=${fd}) failed with errno=${C.errno}') + return false } + return true } -// run starts the server and enters the main event loop (Epoll version). -pub fn (mut s Server) run() ! { - // Create server socket - s.socket_fd = C.socket(.ip, .tcp, 0) - if s.socket_fd < 0 { - C.perror(c'socket') - return error('socket creation failed') +fn create_server_socket(port int) int { + // Create a socket with non-blocking mode + server_fd := C.socket(net.AddrFamily.ip, net.SocketType.tcp, 0) + if server_fd < 0 { + eprintln(@LOCATION) + C.perror(c'Socket creation failed') + return -1 } - opt := 1 - C.setsockopt(s.socket_fd, C.SOL_SOCKET, C.SO_REUSEADDR, &opt, sizeof(int)) - - mut addr := C.sockaddr_in{} - C.memset(&addr, 0, sizeof(addr)) - addr.sin_family = u16(C.AF_INET) - addr.sin_port = u16(C.htons(u16(s.port))) + set_blocking(server_fd, false) - if C.bind(s.socket_fd, voidptr(&addr), sizeof(addr)) < 0 { - C.perror(c'bind') - return error('socket bind failed') + // Enable SO_REUSEADDR and SO_REUSEPORT + opt := 1 + if C.setsockopt(server_fd, C.SOL_SOCKET, C.SO_REUSEADDR, &opt, sizeof(opt)) < 0 { + eprintln(@LOCATION) + C.perror(c'setsockopt SO_REUSEADDR failed') + close_socket(server_fd) + return -1 } - if C.listen(s.socket_fd, backlog) < 0 { - C.perror(c'listen') - return error('socket listen failed') + if C.setsockopt(server_fd, C.SOL_SOCKET, C.SO_REUSEPORT, &opt, sizeof(opt)) < 0 { + eprintln(@LOCATION) + C.perror(c'setsockopt SO_REUSEPORT failed') + close_socket(server_fd) + return -1 } - C.fcntl(s.socket_fd, C.F_SETFL, C.O_NONBLOCK) - // Create epoll instance - s.poll_fd = C.epoll_create1(0) - if s.poll_fd < 0 { - C.perror(c'epoll_create1') - return error('epoll creation failed') + addr := net.new_ip(u16(port), [u8(0), 0, 0, 0]!) + alen := addr.len() + if C.bind(server_fd, voidptr(&addr), alen) < 0 { + eprintln(@LOCATION) + C.perror(c'Bind failed') + close_socket(server_fd) + return -1 } - - // Add listener socket to epoll - // We pass the fd as the pointer value to identify it later. - control_epoll(s.poll_fd, epoll_ctl_add, s.socket_fd, u32(epoll_in), voidptr(isize(s.socket_fd))) - - // Initialize worker data - C.pthread_mutex_init(&s.worker_data.task_mutex, unsafe { nil }) - C.pthread_cond_init(&s.worker_data.task_cond, unsafe { nil }) - C.pthread_mutex_init(&s.worker_data.done_mutex, unsafe { nil }) - - // Create wake pipe - if C.pipe(&s.worker_data.wake_pipe[0]) < 0 { - C.perror(c'pipe') - return error('pipe creation failed') + if C.listen(server_fd, max_connection_size) < 0 { + eprintln(@LOCATION) + C.perror(c'Listen failed') + close_socket(server_fd) + return -1 } - C.fcntl(s.worker_data.wake_pipe[0], C.F_SETFL, C.O_NONBLOCK) - C.fcntl(s.worker_data.wake_pipe[1], C.F_SETFL, C.O_NONBLOCK) - - // Add wake pipe to epoll - control_epoll(s.poll_fd, epoll_ctl_add, s.worker_data.wake_pipe[0], u32(epoll_in), - voidptr(isize(s.worker_data.wake_pipe[0]))) + return server_fd +} - // Create worker threads - for i := 0; i < num_threads; i++ { - C.pthread_create(&s.threads[i], unsafe { nil }, worker_func, s) +// Function to add a file descriptor to the epoll instance +fn add_fd_to_epoll(epoll_fd int, fd int, events u32) int { + mut ev := C.epoll_event{ + events: events + } + ev.data.fd = fd + if C.epoll_ctl(epoll_fd, C.EPOLL_CTL_ADD, fd, &ev) == -1 { + eprintln(@LOCATION) + C.perror(c'epoll_ctl') + return -1 } + return 0 +} - println('Server listening on port ${s.port}') +// Function to remove a file descriptor from the epoll instance +fn remove_fd_from_epoll(epoll_fd int, fd int) bool { + ret := C.epoll_ctl(epoll_fd, C.EPOLL_CTL_DEL, fd, C.NULL) + if ret == -1 { + eprintln('ERROR: epoll_ctl(DEL, fd=${fd}) failed with errno=${C.errno}') + return false + } + return true +} - // Event loop - events := [64]C.epoll_event{} +fn handle_accept_loop(epoll_fd int, listen_fd int) { for { - nev := C.epoll_wait(s.poll_fd, &events[0], 64, -1) - if nev < 0 { - C.perror(c'epoll_wait') + client_fd := C.accept4(listen_fd, C.NULL, C.NULL, C.SOCK_NONBLOCK) + if client_fd < 0 { + if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK { + break // No more incoming connections; exit loop. + } + eprintln(@LOCATION) + C.perror(c'Accept failed') break } + // Enable TCP_NODELAY for lower latency + opt := 1 + C.setsockopt(client_fd, C.IPPROTO_TCP, C.TCP_NODELAY, &opt, sizeof(opt)) + // Register client socket with epoll + if add_fd_to_epoll(epoll_fd, client_fd, u32(C.EPOLLIN | C.EPOLLET)) == -1 { + close_socket(client_fd) + } + } +} - for i := 0; i < nev; i++ { - event := events[i] - ptr_val := isize(event.data.ptr) - - // 1. Check for Listener Socket - if ptr_val == s.socket_fd { - client_fd := C.accept(s.socket_fd, unsafe { nil }, unsafe { nil }) - if client_fd < 0 { - continue - } - mut new_c := unsafe { &Conn(C.malloc(sizeof(Conn))) } - C.memset(new_c, 0, sizeof(Conn)) - new_c.fd = client_fd - C.fcntl(new_c.fd, C.F_SETFL, C.O_NONBLOCK) - - control_epoll(s.poll_fd, epoll_ctl_add, new_c.fd, u32(epoll_in | epoll_rdhup), - new_c) - continue - } +fn handle_client_closure(epoll_fd int, client_fd int) { + // Never close the listening socket here + if client_fd == 0 { + return + } + if client_fd <= 0 { + eprintln('ERROR: Invalid FD=${client_fd} for closure') + return + } + remove_fd_from_epoll(epoll_fd, client_fd) + close_socket(client_fd) +} - // 2. Check for Wake Pipe (Worker finished a task) - if ptr_val == s.worker_data.wake_pipe[0] { - buf := [1024]u8{} - for C.read(s.worker_data.wake_pipe[0], &buf[0], sizeof(buf)) > 0 {} - s.process_dones() +fn process_events(mut server Server, epoll_fd int, listen_fd int) { + mut events := [max_connection_size]C.epoll_event{} + mut request_buffer := []u8{len: server.max_request_buffer_size, cap: server.max_request_buffer_size} + unsafe { + request_buffer.flags.set(.noslices | .nogrow | .noshrink) + } + for { + num_events := C.epoll_wait(epoll_fd, &events[0], max_connection_size, -1) + for i := 0; i < num_events; i++ { + // Accept new connections when the listening socket is readable + if unsafe { events[i].data.fd } == listen_fd { + handle_accept_loop(epoll_fd, listen_fd) continue } - // 3. Client Connection - mut c := unsafe { &Conn(event.data.ptr) } - - // Handle Errors or HUP - if (event.events & u32(epoll_err | epoll_hup | epoll_rdhup)) != 0 { - s.close_conn(c) + if events[i].events & u32((C.EPOLLHUP | C.EPOLLERR)) != 0 { + client_fd := unsafe { events[i].data.fd } + if client_fd == listen_fd { + eprintln('ERROR: listen fd had HUP/ERR') + continue + } + if client_fd > 0 { + // Try to send 444 No Response before closing abnormal connection + C.send(client_fd, status_444_response.data, status_444_response.len, + C.MSG_NOSIGNAL) + handle_client_closure(epoll_fd, client_fd) + } else { + eprintln('ERROR: Invalid FD from epoll: ${client_fd}') + } continue } - - // Handle Read - if (event.events & u32(epoll_in)) != 0 { - n := C.read(c.fd, &c.read_buf[c.read_len], buf_size - c.read_len) - if n <= 0 { - if n < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { - s.close_conn(c) - } else if n == 0 { - s.close_conn(c) + if events[i].events & u32(C.EPOLLIN) != 0 { + client_fd := unsafe { events[i].data.fd } + bytes_read := C.recv(client_fd, unsafe { &request_buffer[0] }, server.max_request_buffer_size - 1, + 0) + if bytes_read > 0 { + // Check if request exceeds buffer size + if bytes_read >= server.max_request_buffer_size - 1 { + C.send(client_fd, status_413_response.data, status_413_response.len, + C.MSG_NOSIGNAL) + handle_client_closure(epoll_fd, client_fd) + continue } - continue - } - c.read_len += int(n) - - header_end := C.memmem(&c.read_buf[0], c.read_len, c'\r\n\r\n', 4) - if header_end == unsafe { nil } { - if c.read_len >= buf_size { - s.close_conn(c) + mut readed_request_buffer := []u8{cap: bytes_read} + unsafe { + readed_request_buffer.push_many(&request_buffer[0], bytes_read) } - continue - } - - if C.memcmp(&c.read_buf[0], c'GET ', 4) != 0 { - s.close_conn(c) - continue - } - path_start := &c.read_buf[4] - path_end := C.strchr(path_start, ` `) - if path_end == unsafe { nil } { - s.close_conn(c) - continue - } - path_len := unsafe { path_end - path_start } - - req := HttpRequest{ - buffer: c.read_buf[..c.read_len] - method: Slice{ - buf: &c.read_buf[0] - len: 3 + mut decoded_http_request := decode_http_request(readed_request_buffer) or { + eprintln('Error decoding request ${err}') + C.send(client_fd, tiny_bad_request_response.data, tiny_bad_request_response.len, + C.MSG_NOSIGNAL) + handle_client_closure(epoll_fd, client_fd) + continue + } + decoded_http_request.client_conn_fd = client_fd + decoded_http_request.user_data = server.user_data + response_buffer := server.request_handler(decoded_http_request) or { + eprintln('Error handling request ${err}') + C.send(client_fd, tiny_bad_request_response.data, tiny_bad_request_response.len, + C.MSG_NOSIGNAL) + handle_client_closure(epoll_fd, client_fd) + continue } - path: Slice{ - buf: path_start - len: path_len + // Send response + sent := C.send(client_fd, response_buffer.data, response_buffer.len, + C.MSG_NOSIGNAL | C.MSG_DONTWAIT) + if sent < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { + eprintln('ERROR: send() failed with errno=${C.errno}') + handle_client_closure(epoll_fd, client_fd) + continue } - client_conn_fd: c.fd + // Leave the connection open; closure is driven by client FIN or errors + } else if bytes_read == 0 { + // Normal client closure (FIN received) + handle_client_closure(epoll_fd, client_fd) + } else if bytes_read < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { + // Unexpected recv error - send 444 No Response + C.send(client_fd, status_444_response.data, status_444_response.len, + C.MSG_NOSIGNAL) + handle_client_closure(epoll_fd, client_fd) } + } + } + } +} - c.read_len = 0 - - // Offload to worker thread - // Remove from epoll so we don't trigger while worker is busy - control_epoll(s.poll_fd, epoll_ctl_del, c.fd, 0, unsafe { nil }) - - mut t := unsafe { &Task(C.malloc(sizeof(Task))) } - t.c = c - t.req = req - t.next = unsafe { nil } +// run starts the server and begins listening for incoming connections. +pub fn (mut server Server) run() ! { + $if windows { + eprintln('Windows is not supported yet') + return + } + for i := 0; i < max_thread_pool_size; i++ { + server.listen_fds[i] = create_server_socket(server.port) + if server.listen_fds[i] < 0 { + return + } - C.pthread_mutex_lock(&s.worker_data.task_mutex) - if s.worker_data.task_tail != unsafe { nil } { - s.worker_data.task_tail.next = t - } else { - s.worker_data.task_head = t - } - s.worker_data.task_tail = t - C.pthread_cond_signal(&s.worker_data.task_cond) - C.pthread_mutex_unlock(&s.worker_data.task_mutex) - } else if (event.events & u32(epoll_out)) != 0 { // Handle Write - write_ptr := unsafe { &u8(c.write_buf) + c.write_pos } - written := C.write(c.fd, write_ptr, c.write_len - c.write_pos) - if written > 0 { - c.write_pos += int(written) - } else if written < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { - s.close_conn(c) - continue - } + server.epoll_fds[i] = C.epoll_create1(0) + if server.epoll_fds[i] < 0 { + C.perror(c'epoll_create1 failed') + close_socket(server.listen_fds[i]) + return + } - if c.write_pos >= c.write_len { - C.free(c.write_buf) - c.write_buf = unsafe { nil } - // Done writing, modify epoll to stop listening for OUT and start listening for IN - control_epoll(s.poll_fd, epoll_ctl_mod, c.fd, u32(epoll_in | epoll_rdhup), - c) - c.read_len = 0 - } - } + // Register the listening socket with each worker epoll for distributed accepts (edge-triggered) + if add_fd_to_epoll(server.epoll_fds[i], server.listen_fds[i], u32(C.EPOLLIN | C.EPOLLET)) == -1 { + close_socket(server.listen_fds[i]) + close_socket(server.epoll_fds[i]) + return } + + server.threads[i] = spawn process_events(mut server, server.epoll_fds[i], server.listen_fds[i]) } - C.close(s.socket_fd) - C.close(s.poll_fd) - C.close(s.worker_data.wake_pipe[0]) - C.close(s.worker_data.wake_pipe[1]) + println('listening on http://localhost:${server.port}/') + // Main thread waits for workers; accepts are handled in worker epoll loops + for i in 0 .. max_thread_pool_size { + server.threads[i].wait() + } } diff --git a/vlib/fasthttp/fasthttp_test.v b/vlib/fasthttp/fasthttp_test.v index ca15d9d8e91481..7cc22d1380248e 100644 --- a/vlib/fasthttp/fasthttp_test.v +++ b/vlib/fasthttp/fasthttp_test.v @@ -1,3 +1,5 @@ +module fasthttp + /* $if darwin { import fasthttp @@ -36,7 +38,10 @@ $if darwin { fn test_lol() { // Create a new server instance on port 8092, passing our handler function. - mut server := fasthttp.new_server(8092, request_handler) or { + mut server := fasthttp.new_server(fasthttp.ServerConfig{ + port: 8092 + handler: request_handler + }) or { eprintln('Failed to create server: ${err}') return } @@ -47,6 +52,91 @@ $if darwin { } */ -fn test_x() { - assert true +fn test_parse_request_line() { + // Test basic GET request + request := 'GET / HTTP/1.1\r\n'.bytes() + req := parse_request_line(request) or { + assert false, 'Failed to parse valid request: ${err}' + return + } + + assert req.buffer.len == request.len + assert req.method.start == 0 + assert req.method.len == 3 + assert req.path.start == 4 + assert req.path.len == 1 + assert req.version.start == 6 + assert req.version.len == 8 + + method := req.buffer[req.method.start..req.method.start + req.method.len].bytestr() + path := req.buffer[req.path.start..req.path.start + req.path.len].bytestr() + version := req.buffer[req.version.start..req.version.start + req.version.len].bytestr() + + assert method == 'GET' + assert path == '/' + assert version == 'HTTP/1.1' +} + +fn test_parse_request_line_with_path() { + // Test GET request with path + request := 'GET /users/123 HTTP/1.1\r\n'.bytes() + req := parse_request_line(request) or { + assert false, 'Failed to parse valid request: ${err}' + return + } + + path := req.buffer[req.path.start..req.path.start + req.path.len].bytestr() + assert path == '/users/123' +} + +fn test_parse_request_line_post() { + // Test POST request + request := 'POST /api/data HTTP/1.1\r\n'.bytes() + req := parse_request_line(request) or { + assert false, 'Failed to parse valid request: ${err}' + return + } + + method := req.buffer[req.method.start..req.method.start + req.method.len].bytestr() + path := req.buffer[req.path.start..req.path.start + req.path.len].bytestr() + + assert method == 'POST' + assert path == '/api/data' +} + +fn test_parse_request_line_invalid() { + // Test invalid request (missing \r\n) + request := 'GET / HTTP/1.1'.bytes() + parse_request_line(request) or { + assert err.msg() == 'Invalid HTTP request line' + return + } + assert false, 'Should have failed to parse invalid request' +} + +fn test_decode_http_request() { + request := 'GET /test HTTP/1.1\r\n'.bytes() + req := decode_http_request(request) or { + assert false, 'Failed to decode request: ${err}' + return + } + + method := req.buffer[req.method.start..req.method.start + req.method.len].bytestr() + assert method == 'GET' +} + +fn test_new_server() { + handler := fn (req HttpRequest) ![]u8 { + return 'HTTP/1.1 200 OK\r\n\r\nHello'.bytes() + } + + server := new_server(ServerConfig{ + port: 8080 + handler: handler + }) or { + assert false, 'Failed to create server: ${err}' + return + } + + assert server.port == 8080 } diff --git a/vlib/fasthttp/fasthttp_windows.v b/vlib/fasthttp/fasthttp_windows.v new file mode 100644 index 00000000000000..757fceaef8a1fa --- /dev/null +++ b/vlib/fasthttp/fasthttp_windows.v @@ -0,0 +1,23 @@ +module fasthttp + +struct Server { +pub: + port int = 3000 +mut: + request_handler fn (HttpRequest) ![]u8 @[required] +} + +// new_server creates and initializes a new Server instance. +pub fn new_server(config ServerConfig) !&Server { + mut server := &Server{ + port: config.port + request_handler: config.handler + } + + return server +} + +// run starts the server and begins listening for incoming connections. +pub fn (mut server Server) run() ! { + println('TODO: implement fasthttp.Server.run on windows') +} diff --git a/vlib/net/http/request.v b/vlib/net/http/request.v index ff5c9a32a45805..6c1633cf1e2e18 100644 --- a/vlib/net/http/request.v +++ b/vlib/net/http/request.v @@ -405,15 +405,15 @@ pub fn parse_request_head(mut reader io.BufferedReader) !Request { mut pos := parse_header_fast(line)! key := line.substr_unsafe(0, pos) for pos < line.len - 1 && line[pos + 1].is_space() { - if line[pos + 1].is_space() { - // Skip space or tab in value name - pos++ - } + // Skip space or tab in value name + pos++ + } + if pos + 1 < line.len { + value := line.substr_unsafe(pos + 1, line.len) + _, _ = key, value + // println('key,value=${key},${value}') + header.add_custom(key, value)! } - value := line.substr_unsafe(pos + 1, line.len) - _, _ = key, value - // println('key,value=${key},${value}') - header.add_custom(key, value)! line = reader.read_line()! } // header.coerce(canonicalize: true) @@ -436,29 +436,16 @@ pub fn parse_request_head(mut reader io.BufferedReader) !Request { // parse_request_head parses *only* the header of a raw HTTP request into a Request object pub fn parse_request_head_str(s string) !Request { // TODO called by veb twice!? - // println('parse_request_head_str s ="${s}"') - println('skek=') - println(s) - println('==========================') - // println('FIRST') - // println(s[0].ascii_str()) - // println(s.bytes()) - pos0 := s.index('\n') or { 0 } lines := s.split('\n') - println('nr lines=${lines.len}') line0 := s[..pos0].trim_space() - println('line0="${line0}"') method, target, version := parse_request_line(line0)! - println(method) - println(target) - println(version) // headers mut header := new_header() for i := 1; i < lines.len; i++ { - line := lines[i] + mut line := lines[i].trim_right('\r') if !line.contains(':') { continue } @@ -466,15 +453,15 @@ pub fn parse_request_head_str(s string) !Request { mut pos := parse_header_fast(line)! key := line.substr_unsafe(0, pos) for pos < line.len - 1 && line[pos + 1].is_space() { - if line[pos + 1].is_space() { - // Skip space or tab in value name - pos++ - } + // Skip space or tab in value name + pos++ + } + if pos + 1 < line.len { + value := line.substr_unsafe(pos + 1, line.len) + _, _ = key, value + // println('key,value=${key},${value}') + header.add_custom(key, value)! } - value := line.substr_unsafe(pos + 1, line.len) - _, _ = key, value - // println('key,value=${key},${value}') - header.add_custom(key, value)! // header.coerce(canonicalize: true) } diff --git a/vlib/net/http/request_test.v b/vlib/net/http/request_test.v index 44bc78a68e462f..73f136182284c4 100644 --- a/vlib/net/http/request_test.v +++ b/vlib/net/http/request_test.v @@ -230,3 +230,38 @@ fn test_parse_multipart_form_issue_24974_cooked() { assert files['files'][0].filename == 'mikhail-vasilyev-IFxjDdqK_0U-unsplash.jpg' assert files['files'][0].content_type == 'image/jpeg' } + +fn test_parse_request_head_str_basic() { + s := 'GET / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 0\r\n\r\n' + req := http.parse_request_head_str(s) or { panic('did not parse: ${err}') } + assert req.method == .get + assert req.url == '/' + assert req.version == .v1_1 + assert req.host == 'example.com' +} + +fn test_parse_request_head_str_post_with_headers() { + s := 'POST /api HTTP/1.1\r\nHost: test.com\r\nContent-Type: application/json\r\nContent-Length: 10\r\n\r\n' + req := http.parse_request_head_str(s) or { panic('did not parse: ${err}') } + assert req.method == .post + assert req.url == '/api' + assert req.version == .v1_1 + assert req.host == 'test.com' + assert req.header.custom_values('Content-Type') == ['application/json'] +} + +fn test_parse_request_head_str_with_spaces_in_header_values() { + s := 'GET /path HTTP/1.1\r\nX-Custom-Header: value with spaces\r\n\r\n' + req := http.parse_request_head_str(s) or { panic('did not parse: ${err}') } + assert req.method == .get + assert req.url == '/path' + assert req.header.custom_values('X-Custom-Header') == ['value with spaces'] +} + +fn test_parse_request_head_str_multiple_same_header() { + s := 'GET / HTTP/1.1\r\nHost: example.com\r\nSet-Cookie: session=abc\r\nSet-Cookie: user=xyz\r\n\r\n' + req := http.parse_request_head_str(s) or { panic('did not parse: ${err}') } + assert req.method == .get + assert req.host == 'example.com' + assert req.header.custom_values('Set-Cookie') == ['session=abc', 'user=xyz'] +} diff --git a/vlib/veb/veb_d_new_veb.v b/vlib/veb/veb_d_new_veb.v index 82f2f5770f51f6..4846ea9d9bcc59 100644 --- a/vlib/veb/veb_d_new_veb.v +++ b/vlib/veb/veb_d_new_veb.v @@ -6,9 +6,7 @@ module veb import fasthttp import net.http import time -import net import net.urllib -import os struct RequestParams { global_app voidptr @@ -16,12 +14,6 @@ struct RequestParams { routes &map[string]Route } -// TODO remove global hack -//__global gparams RequestParams -const gparams = RequestParams{ - routes: unsafe { nil } -} - const http_ok_response = 'HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 0\r\nConnection: keep-alive\r\n\r\n'.bytes() pub fn run_at[A, X](mut global_app A, params RunParams) ! { @@ -38,16 +30,20 @@ pub fn run_new[A, X](mut global_app A, port int) ! { // Generate routes and controllers just like the original run() function. routes := generate_routes[A, X](global_app)! controllers_sorted := check_duplicate_routes_in_controllers[A](global_app, routes)! - unsafe { - gparams = &RequestParams{ - global_app: global_app - controllers_sorted: controllers_sorted - routes: &routes - // timeout_in_seconds: - } + + // Allocate params on the heap to keep it valid for the server lifetime + params := &RequestParams{ + global_app: global_app + controllers_sorted: controllers_sorted + routes: &routes } + // Configure and run the fasthttp server - mut server := fasthttp.new_server(port, parallel_request_handler[A, X]) or { + mut server := fasthttp.new_server(fasthttp.ServerConfig{ + port: port + handler: parallel_request_handler[A, X] + user_data: voidptr(params) + }) or { eprintln('Failed to create server: ${err}') return } @@ -64,19 +60,19 @@ fn parallel_request_handler[A, X](req fasthttp.HttpRequest) ![]u8 { return test_text.bytes() } */ - // mut global_app := unsafe { &A(gapp) } - mut global_app := unsafe { &A(gparams.global_app) } - // println('parallel_request_handler() params.routes=${gparams.routes}') + // Get parameters from user_data - copy to avoid use-after-free + params := unsafe { *(&RequestParams(req.user_data)) } + mut global_app := unsafe { &A(params.global_app) } + // println('parallel_request_handler() params.routes=${params.routes}') // println('global_app=$global_app') - // println('params=$gparams') + // println('params=$params') // println('req=$req') // println('buffer=${req.buffer.bytestr()}') s := req.buffer.bytestr() - method := unsafe { tos(req.method.buf, req.method.len) } - println('method=${method}') - path := unsafe { tos(req.path.buf, req.path.len) } - println('path=${path}') - req_bytes := req.buffer + // method := unsafe { tos(&req.buffer[req.method.start], req.method.len) } + // println('method=${method}') + // path := unsafe { tos(&req.buffer[req.path.start], req.path.len) } + // println('path=${path}') client_fd := req.client_conn_fd // Parse the raw request bytes into a standard `http.Request`. @@ -89,16 +85,15 @@ fn parallel_request_handler[A, X](req fasthttp.HttpRequest) ![]u8 { } // Create and populate the `veb.Context`. completed_context := handle_request_and_route[A, X](mut global_app, req2, client_fd, - gparams.routes, gparams.controllers_sorted) + params.routes, params.controllers_sorted) // Serialize the final `http.Response` into a byte array. if completed_context.takeover { eprintln('[veb] WARNING: ctx.takeover_conn() was called, but this is not supported by this server backend. The connection will be closed after this response.') } // The fasthttp server expects a complete response buffer to be returned. return completed_context.res.bytes() -} +} // handle_request_and_route is a unified function that creates the context, -// handle_request_and_route is a unified function that creates the context, // runs middleware, and finds the correct route for a request. fn handle_request_and_route[A, X](mut app A, req http.Request, client_fd int, routes &map[string]Route, controllers []&ControllerPath) &Context { /* @@ -155,7 +150,7 @@ fn handle_request_and_route[A, X](mut app A, req http.Request, client_fd int, ro // Create a new user context and pass veb's context mut user_context := X{} user_context.Context = ctx - println('calling handle_route') + // println('calling handle_route') handle_route[A, X](mut app, mut user_context, url, host, routes) return &user_context.Context } diff --git a/vlib/veb/veb_picoev.v b/vlib/veb/veb_picoev.v index 989cf25609fdc9..2b0c513a584e4b 100644 --- a/vlib/veb/veb_picoev.v +++ b/vlib/veb/veb_picoev.v @@ -1,8 +1,7 @@ module veb -import os - $if !new_veb ? { + import os import picoev import time import net