diff options
| author | Polesznyák Márk <contact@pml68.dev> | 2026-04-14 10:45:32 +0200 |
|---|---|---|
| committer | Polesznyák Márk <contact@pml68.dev> | 2026-04-14 23:33:36 +0200 |
| commit | 8a4664ac4c0e3c68a6a32bf451d35452c9409f2b (patch) | |
| tree | bd83026cca79d388ff072cbdc0a8cee89def56f8 | |
| parent | feat: Hare updates (rt -> sys), fix sorting and parsing (diff) | |
| download | hare-1brc-8a4664ac4c0e3c68a6a32bf451d35452c9409f2b.tar.gz | |
feat: mmap + multithreading, down to 7s on i5 9400f
| -rw-r--r-- | Makefile | 11 | ||||
| -rw-r--r-- | hashmap.ha | 39 | ||||
| -rw-r--r-- | main.ha | 203 | ||||
| -rw-r--r-- | pthread.ha | 28 |
4 files changed, 176 insertions, 105 deletions
@@ -1,13 +1,18 @@ -1brc: main.ha hashmap.ha - hare build -o 1brc . +JOBS ?= $(shell nproc) + +1brc: main.ha hashmap.ha pthread.ha + hare build -D JOBS=$(JOBS) -lpthread -j $(JOBS) -o 1brc . measurements.txt: ./generate.py 1_000_000_000 +run: 1brc measurements.txt + ./1brc + clean: rm -f 1brc check: hare test -.PHONY: clean check +.PHONY: run clean check @@ -1,28 +1,29 @@ -def BUCKETS: size = 1 << 17; +def HASHMAP_SIZE: size = 1 << 14; -type hashmap = [BUCKETS][](size, data); +type hashmap = [HASHMAP_SIZE](size, stat); -fn getitem(map: *hashmap, hash: size) data = { - let bucket = &map[hash & (BUCKETS - 1)]; - for (let i = 0z; i < len(bucket); i += 1) { - if (bucket[i].0 == hash) - return bucket[i].1; +fn getitem(map: *hashmap, hash: size) stat = { + let i = hash & (HASHMAP_SIZE - 1); + for (true) { + if (map[i].0 == 0 && map[i].1.count == 0) + break; + if (map[i].0 == hash) + return map[i].1; + i = (i + 1) & (HASHMAP_SIZE - 1); }; - return data { ... }; + return stat { ... }; }; -fn setitem(map: *hashmap, hash: size, value: data) void = { - let bucket = &map[hash & (BUCKETS - 1)]; - for (let i = 0z; i < len(bucket); i += 1) { - if (bucket[i].0 == hash) { - bucket[i].1 = value; +fn setitem(map: *hashmap, hash: size, value: stat) void = { + let i = hash & (HASHMAP_SIZE - 1); + for (true) { + if (map[i].0 == 0 && map[i].1.count == 0) + break; + if (map[i].0 == hash) { + map[i].1 = value; return; }; + i = (i + 1) & (HASHMAP_SIZE - 1); }; - append(bucket, (hash, value))!; -}; - -fn finish(map: *hashmap) void = { - for (let i = 0z; i < len(map); i += 1) - free(map[i]); + map[i] = (hash, value); }; @@ -1,8 +1,5 @@ -use bufio; use bytes; use fmt; -use hash; -use hash::fnv; use io; use os; use sys; @@ -11,13 +8,24 @@ use sort::cmp; use strings; use types; -type data = struct { +def JOBS: size = 4; + +type stat = struct { min: i64, max: i64, sum: i64, count: size }; +type thread_job = struct { + chunk: []u8, + map: hashmap, + names_buf: [10000](size, str), + names: [](size, str) +}; + +const jobs: [JOBS]thread_job = [thread_job { ... }...]; + fn i64parse(bytes: []u8) (i64, size) = { let index = 0z; let u: i64 = 0; @@ -51,146 +59,175 @@ fn i64parse(bytes: []u8) (i64, size) = { assert(i64parse(strings::toutf8("-98.2")).0 == -982); }; +fn work(arg: *opaque) nullable *opaque = { + let job = arg: *thread_job; + for (true) { + let hash = 14695981039346656037z; + let sep_idx = 0z; + let has_semi = false; + for (const char .. job.chunk) { + if (char == ';') { + has_semi = true; + break; + }; + + hash ^= char; + hash *= 1099511628211; + sep_idx += 1; + }; + + if (!has_semi) break; + + let after = job.chunk[sep_idx + 1..]; + const name = strings::fromutf8_unsafe(job.chunk[..sep_idx]); + const (temp, index) = i64parse(after); + + job.chunk = after[index..]; + + let stat = getitem(&job.map, hash); + + if (stat.count == 0) { + static append(job.names, (hash, strings::dup(name)!))!; + stat.min = temp; + stat.max = temp; + } else { + if (stat.min > temp) + stat.min = temp; + if (stat.max < temp) + stat.max = temp; + }; + + stat.sum += temp; + stat.count += 1; + + setitem(&job.map, hash, stat); + }; + + return null; +}; + fn mmap(fd: io::file) []u8 = { - let sb = sys::st { ... }; - sys::fstat(fd, &sb)!; + let st = sys::st { ... }; + sys::fstat(fd, &st)!; - let ptr = sys::mmap(null, sb.sz, sys::PROT_READ, sys::MAP_PRIVATE, fd, 0)!; + let ptr = sys::mmap(null, st.sz, sys::PROT_READ, sys::MAP_PRIVATE, fd, 0)!; // Hardcoded value for MADV_SEQUENTIAL, from libc-rs it seems like it's // 2 on all currently supported platforms (Linux, FreeBSD, NetBSD, // OpenBSD, Dragonfly). - assert(sys::syscall(sys::SYS_madvise, ptr: uintptr: u64, sb.sz: u64, 2) == 0); + assert(sys::syscall(sys::SYS_madvise, ptr: uintptr: u64, st.sz: u64, 2) == 0); return *(&types::slice { data = ptr, - length = sb.sz, + length = st.sz, capacity = 0 }: *[]u8); }; -fn free_mmap(map: []u8) void = { +fn munmap(map: []u8) void = { let map = &map: *types::slice; sys::munmap(map.data: *opaque, map.length)!; }; fn names_cmp(a: *opaque, b: *opaque) int = { - const a = *(a: *(u64, str)); - const b = *(b: *(u64, str)); + const a = *(a: *(size, str)); + const b = *(b: *(size, str)); return cmp::strs(&a.1, &b.1); }; export fn main() void = { - let map: hashmap = [[]...]; - defer finish(&map); - if (len(os::args) != 2) + if (len(os::args) > 2) fmt::fatalf("usage: {} <file>", os::args[0]); - const handle = os::open(os::args[1])!; + let file = "measurements.txt"; + if (len(os::args) == 2) + file = os::args[1]; + + const handle = os::open(file)!; defer io::close(handle)!; - const fmap = mmap(handle); - free_mmap(fmap); + const map = mmap(handle); + defer munmap(map); - let buf: *[1024 * 1024]u8 = alloc([0...])!; - defer free(buf); - let read_start = 0z; + const chunk_size = len(map) / JOBS; + let threads: [JOBS]pthread_t = [0...]; - let names_buf: *[10000](u64, str) = alloc([(0, "")...])!; - let names = names_buf[..0]; + let at = 0z; - let fnv = fnv::fnv64a(); - for (true) { - const n = match (io::read(handle, buf[read_start..])) { - case let n: size => - yield n; - case io::EOF => - break; - }; - - if (read_start + n == 0) break; - - let chunk = buf[..read_start + n]; + for (let i = 0z; i < JOBS; i += 1) { + const start = at; + let end = at + chunk_size; + if (end < len(map)) { + let newline_at = match(bytes::index(map[end..], '\n')) { + case let idx: size => + yield idx; + case void => + abort(); + }; - let newline = match(bytes::rindex(chunk, '\n': u8)) { - case let idx: size => - yield idx; - case void => - break; + end = end + newline_at + 1; + } else { + end = len(map); }; - let remaining = chunk[newline + 1..]; - chunk = chunk[..newline + 1]; - - for (true) { - let sep_idx = 0z; - let has_semi = false; - for (const char .. chunk) { - if (char == ';': u8) { - has_semi = true; - break; - }; - - fnv.v ^= char; - fnv.v *= 1099511628211; - sep_idx += 1; - }; - if (!has_semi) break; + jobs[i].chunk = map[start..end]; + jobs[i].names = jobs[i].names_buf[..0]; + pthread_create(&threads[i], null, &work, &jobs[i])!; - const hash = fnv::sum64(&fnv); - hash::reset(&fnv); + at = end; + }; - let after = chunk[sep_idx + 1..]; - const name = strings::fromutf8_unsafe(chunk[..sep_idx]); - const (temp, index) = i64parse(after); + for (const thread .. threads) + pthread_join(thread, null)!; - chunk = after[index..]; + let names_buf: [10000](size, str) = [(0, "")...]; + let names = names_buf[..0]; + let stats: hashmap = [(0, stat { ... })...]; - let item = getitem(&map, hash); + for (let job &.. jobs) { + for (let (hash, name) .. job.names) { + let stat = getitem(&stats, hash); + let new_stat = getitem(&job.map, hash); - if (item.count == 0) { + if (stat.count == 0) { static append(names, (hash, strings::dup(name)!))!; - item.min = temp; - item.max = temp; + + stat.min = new_stat.min; + stat.max = new_stat.max; } else { - if (item.min > temp) - item.min = temp; - if (item.max < temp) - item.max = temp; + if (stat.min > new_stat.min) + stat.min = new_stat.min; + if (stat.max < new_stat.max) + stat.max = new_stat.max; }; - item.sum += temp; - item.count += 1; + stat.sum += new_stat.sum; + stat.count += new_stat.count; - setitem(&map, hash, item); + setitem(&stats, hash, stat); }; - - read_start = len(remaining); - buf[..read_start] = remaining[..]; }; - sort::sort(names, size((u64, str)), &names_cmp)!; + sort::sort(names, size((size, str)), &names_cmp)!; fmt::print("{")!; for (let i = 0z; i < len(names); i += 1) { let station = names[i].1; - let item = getitem(&map, names[i].0); + let item = getitem(&stats, names[i].0); if (i > 0) fmt::print(", ")!; - fmt::printf("\"{}\"={:.1f}/{:.1f}/{:.1f}", + fmt::printf("{}={:.1f}/{:.1f}/{:.1f}", station, item.min: f64 / 10.0, - item.max: f64 / 10.0, item.sum: f64 / 10.0 / item.count: f64, + item.max: f64 / 10.0, )!; free(names[i].1); }; fmt::println("}")!; - - free(names); }; diff --git a/pthread.ha b/pthread.ha new file mode 100644 index 0000000..82c4900 --- /dev/null +++ b/pthread.ha @@ -0,0 +1,28 @@ +use sys::{errno}; + +def __SIZEOF_PTHREAD_CONDATTR_T: size = 56; + +type pthread_attr_t = union { + __size: [__SIZEOF_PTHREAD_CONDATTR_T]i8, + __align: i64 +}; + +type pthread_t = u64; + +@symbol("pthread_create") fn _pthread_create(thread: *pthread_t, attr: const nullable *pthread_attr_t, start_routine: *fn(arg: *opaque) nullable *opaque, arg: *opaque) int; + +fn pthread_create(thread: *pthread_t, attr: const nullable *pthread_attr_t, start_routine: *fn(arg: *opaque) nullable *opaque, arg: *opaque) (void | errno) = { + const ret = _pthread_create(thread, attr, start_routine, arg); + + if (ret != 0) + return ret: errno; +}; + +@symbol("pthread_join") fn _pthread_join(thread: pthread_t, value_ptr: nullable **opaque) int; + +fn pthread_join(thread: pthread_t, value_ptr: nullable **opaque) (void | errno) = { + const ret = _pthread_join(thread, value_ptr); + + if (ret != 0) + return ret: errno; +}; |
