aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPolesznyák Márk <contact@pml68.dev>2026-04-14 10:45:32 +0200
committerPolesznyák Márk <contact@pml68.dev>2026-04-14 23:33:36 +0200
commit8a4664ac4c0e3c68a6a32bf451d35452c9409f2b (patch)
treebd83026cca79d388ff072cbdc0a8cee89def56f8
parentfeat: Hare updates (rt -> sys), fix sorting and parsing (diff)
downloadhare-1brc-8a4664ac4c0e3c68a6a32bf451d35452c9409f2b.tar.gz
feat: mmap + multithreading, down to 7s on i5 9400f
-rw-r--r--Makefile11
-rw-r--r--hashmap.ha39
-rw-r--r--main.ha203
-rw-r--r--pthread.ha28
4 files changed, 176 insertions, 105 deletions
diff --git a/Makefile b/Makefile
index 2a5eda8..6eb9b92 100644
--- a/Makefile
+++ b/Makefile
@@ -1,13 +1,18 @@
-1brc: main.ha hashmap.ha
- hare build -o 1brc .
+JOBS ?= $(shell nproc)
+
+1brc: main.ha hashmap.ha pthread.ha
+ hare build -D JOBS=$(JOBS) -lpthread -j $(JOBS) -o 1brc .
measurements.txt:
./generate.py 1_000_000_000
+run: 1brc measurements.txt
+ ./1brc
+
clean:
rm -f 1brc
check:
hare test
-.PHONY: clean check
+.PHONY: run clean check
diff --git a/hashmap.ha b/hashmap.ha
index 41235ea..81c9f94 100644
--- a/hashmap.ha
+++ b/hashmap.ha
@@ -1,28 +1,29 @@
-def BUCKETS: size = 1 << 17;
+def HASHMAP_SIZE: size = 1 << 14;
-type hashmap = [BUCKETS][](size, data);
+type hashmap = [HASHMAP_SIZE](size, stat);
-fn getitem(map: *hashmap, hash: size) data = {
- let bucket = &map[hash & (BUCKETS - 1)];
- for (let i = 0z; i < len(bucket); i += 1) {
- if (bucket[i].0 == hash)
- return bucket[i].1;
+fn getitem(map: *hashmap, hash: size) stat = {
+ let i = hash & (HASHMAP_SIZE - 1);
+ for (true) {
+ if (map[i].0 == 0 && map[i].1.count == 0)
+ break;
+ if (map[i].0 == hash)
+ return map[i].1;
+ i = (i + 1) & (HASHMAP_SIZE - 1);
};
- return data { ... };
+ return stat { ... };
};
-fn setitem(map: *hashmap, hash: size, value: data) void = {
- let bucket = &map[hash & (BUCKETS - 1)];
- for (let i = 0z; i < len(bucket); i += 1) {
- if (bucket[i].0 == hash) {
- bucket[i].1 = value;
+fn setitem(map: *hashmap, hash: size, value: stat) void = {
+ let i = hash & (HASHMAP_SIZE - 1);
+ for (true) {
+ if (map[i].0 == 0 && map[i].1.count == 0)
+ break;
+ if (map[i].0 == hash) {
+ map[i].1 = value;
return;
};
+ i = (i + 1) & (HASHMAP_SIZE - 1);
};
- append(bucket, (hash, value))!;
-};
-
-fn finish(map: *hashmap) void = {
- for (let i = 0z; i < len(map); i += 1)
- free(map[i]);
+ map[i] = (hash, value);
};
diff --git a/main.ha b/main.ha
index 05c9dab..287cbfd 100644
--- a/main.ha
+++ b/main.ha
@@ -1,8 +1,5 @@
-use bufio;
use bytes;
use fmt;
-use hash;
-use hash::fnv;
use io;
use os;
use sys;
@@ -11,13 +8,24 @@ use sort::cmp;
use strings;
use types;
-type data = struct {
+def JOBS: size = 4;
+
+type stat = struct {
min: i64,
max: i64,
sum: i64,
count: size
};
+type thread_job = struct {
+ chunk: []u8,
+ map: hashmap,
+ names_buf: [10000](size, str),
+ names: [](size, str)
+};
+
+const jobs: [JOBS]thread_job = [thread_job { ... }...];
+
fn i64parse(bytes: []u8) (i64, size) = {
let index = 0z;
let u: i64 = 0;
@@ -51,146 +59,175 @@ fn i64parse(bytes: []u8) (i64, size) = {
assert(i64parse(strings::toutf8("-98.2")).0 == -982);
};
+fn work(arg: *opaque) nullable *opaque = {
+ let job = arg: *thread_job;
+ for (true) {
+ let hash = 14695981039346656037z;
+ let sep_idx = 0z;
+ let has_semi = false;
+ for (const char .. job.chunk) {
+ if (char == ';') {
+ has_semi = true;
+ break;
+ };
+
+ hash ^= char;
+ hash *= 1099511628211;
+ sep_idx += 1;
+ };
+
+ if (!has_semi) break;
+
+ let after = job.chunk[sep_idx + 1..];
+ const name = strings::fromutf8_unsafe(job.chunk[..sep_idx]);
+ const (temp, index) = i64parse(after);
+
+ job.chunk = after[index..];
+
+ let stat = getitem(&job.map, hash);
+
+ if (stat.count == 0) {
+ static append(job.names, (hash, strings::dup(name)!))!;
+ stat.min = temp;
+ stat.max = temp;
+ } else {
+ if (stat.min > temp)
+ stat.min = temp;
+ if (stat.max < temp)
+ stat.max = temp;
+ };
+
+ stat.sum += temp;
+ stat.count += 1;
+
+ setitem(&job.map, hash, stat);
+ };
+
+ return null;
+};
+
fn mmap(fd: io::file) []u8 = {
- let sb = sys::st { ... };
- sys::fstat(fd, &sb)!;
+ let st = sys::st { ... };
+ sys::fstat(fd, &st)!;
- let ptr = sys::mmap(null, sb.sz, sys::PROT_READ, sys::MAP_PRIVATE, fd, 0)!;
+ let ptr = sys::mmap(null, st.sz, sys::PROT_READ, sys::MAP_PRIVATE, fd, 0)!;
// Hardcoded value for MADV_SEQUENTIAL, from libc-rs it seems like it's
// 2 on all currently supported platforms (Linux, FreeBSD, NetBSD,
// OpenBSD, Dragonfly).
- assert(sys::syscall(sys::SYS_madvise, ptr: uintptr: u64, sb.sz: u64, 2) == 0);
+ assert(sys::syscall(sys::SYS_madvise, ptr: uintptr: u64, st.sz: u64, 2) == 0);
return *(&types::slice {
data = ptr,
- length = sb.sz,
+ length = st.sz,
capacity = 0
}: *[]u8);
};
-fn free_mmap(map: []u8) void = {
+fn munmap(map: []u8) void = {
let map = &map: *types::slice;
sys::munmap(map.data: *opaque, map.length)!;
};
fn names_cmp(a: *opaque, b: *opaque) int = {
- const a = *(a: *(u64, str));
- const b = *(b: *(u64, str));
+ const a = *(a: *(size, str));
+ const b = *(b: *(size, str));
return cmp::strs(&a.1, &b.1);
};
export fn main() void = {
- let map: hashmap = [[]...];
- defer finish(&map);
- if (len(os::args) != 2)
+ if (len(os::args) > 2)
fmt::fatalf("usage: {} <file>", os::args[0]);
- const handle = os::open(os::args[1])!;
+ let file = "measurements.txt";
+ if (len(os::args) == 2)
+ file = os::args[1];
+
+ const handle = os::open(file)!;
defer io::close(handle)!;
- const fmap = mmap(handle);
- free_mmap(fmap);
+ const map = mmap(handle);
+ defer munmap(map);
- let buf: *[1024 * 1024]u8 = alloc([0...])!;
- defer free(buf);
- let read_start = 0z;
+ const chunk_size = len(map) / JOBS;
+ let threads: [JOBS]pthread_t = [0...];
- let names_buf: *[10000](u64, str) = alloc([(0, "")...])!;
- let names = names_buf[..0];
+ let at = 0z;
- let fnv = fnv::fnv64a();
- for (true) {
- const n = match (io::read(handle, buf[read_start..])) {
- case let n: size =>
- yield n;
- case io::EOF =>
- break;
- };
-
- if (read_start + n == 0) break;
-
- let chunk = buf[..read_start + n];
+ for (let i = 0z; i < JOBS; i += 1) {
+ const start = at;
+ let end = at + chunk_size;
+ if (end < len(map)) {
+ let newline_at = match(bytes::index(map[end..], '\n')) {
+ case let idx: size =>
+ yield idx;
+ case void =>
+ abort();
+ };
- let newline = match(bytes::rindex(chunk, '\n': u8)) {
- case let idx: size =>
- yield idx;
- case void =>
- break;
+ end = end + newline_at + 1;
+ } else {
+ end = len(map);
};
- let remaining = chunk[newline + 1..];
- chunk = chunk[..newline + 1];
-
- for (true) {
- let sep_idx = 0z;
- let has_semi = false;
- for (const char .. chunk) {
- if (char == ';': u8) {
- has_semi = true;
- break;
- };
-
- fnv.v ^= char;
- fnv.v *= 1099511628211;
- sep_idx += 1;
- };
- if (!has_semi) break;
+ jobs[i].chunk = map[start..end];
+ jobs[i].names = jobs[i].names_buf[..0];
+ pthread_create(&threads[i], null, &work, &jobs[i])!;
- const hash = fnv::sum64(&fnv);
- hash::reset(&fnv);
+ at = end;
+ };
- let after = chunk[sep_idx + 1..];
- const name = strings::fromutf8_unsafe(chunk[..sep_idx]);
- const (temp, index) = i64parse(after);
+ for (const thread .. threads)
+ pthread_join(thread, null)!;
- chunk = after[index..];
+ let names_buf: [10000](size, str) = [(0, "")...];
+ let names = names_buf[..0];
+ let stats: hashmap = [(0, stat { ... })...];
- let item = getitem(&map, hash);
+ for (let job &.. jobs) {
+ for (let (hash, name) .. job.names) {
+ let stat = getitem(&stats, hash);
+ let new_stat = getitem(&job.map, hash);
- if (item.count == 0) {
+ if (stat.count == 0) {
static append(names, (hash, strings::dup(name)!))!;
- item.min = temp;
- item.max = temp;
+
+ stat.min = new_stat.min;
+ stat.max = new_stat.max;
} else {
- if (item.min > temp)
- item.min = temp;
- if (item.max < temp)
- item.max = temp;
+ if (stat.min > new_stat.min)
+ stat.min = new_stat.min;
+ if (stat.max < new_stat.max)
+ stat.max = new_stat.max;
};
- item.sum += temp;
- item.count += 1;
+ stat.sum += new_stat.sum;
+ stat.count += new_stat.count;
- setitem(&map, hash, item);
+ setitem(&stats, hash, stat);
};
-
- read_start = len(remaining);
- buf[..read_start] = remaining[..];
};
- sort::sort(names, size((u64, str)), &names_cmp)!;
+ sort::sort(names, size((size, str)), &names_cmp)!;
fmt::print("{")!;
for (let i = 0z; i < len(names); i += 1) {
let station = names[i].1;
- let item = getitem(&map, names[i].0);
+ let item = getitem(&stats, names[i].0);
if (i > 0)
fmt::print(", ")!;
- fmt::printf("\"{}\"={:.1f}/{:.1f}/{:.1f}",
+ fmt::printf("{}={:.1f}/{:.1f}/{:.1f}",
station,
item.min: f64 / 10.0,
- item.max: f64 / 10.0,
item.sum: f64 / 10.0 / item.count: f64,
+ item.max: f64 / 10.0,
)!;
free(names[i].1);
};
fmt::println("}")!;
-
- free(names);
};
diff --git a/pthread.ha b/pthread.ha
new file mode 100644
index 0000000..82c4900
--- /dev/null
+++ b/pthread.ha
@@ -0,0 +1,28 @@
+use sys::{errno};
+
+def __SIZEOF_PTHREAD_CONDATTR_T: size = 56;
+
+type pthread_attr_t = union {
+ __size: [__SIZEOF_PTHREAD_CONDATTR_T]i8,
+ __align: i64
+};
+
+type pthread_t = u64;
+
+@symbol("pthread_create") fn _pthread_create(thread: *pthread_t, attr: const nullable *pthread_attr_t, start_routine: *fn(arg: *opaque) nullable *opaque, arg: *opaque) int;
+
+fn pthread_create(thread: *pthread_t, attr: const nullable *pthread_attr_t, start_routine: *fn(arg: *opaque) nullable *opaque, arg: *opaque) (void | errno) = {
+ const ret = _pthread_create(thread, attr, start_routine, arg);
+
+ if (ret != 0)
+ return ret: errno;
+};
+
+@symbol("pthread_join") fn _pthread_join(thread: pthread_t, value_ptr: nullable **opaque) int;
+
+fn pthread_join(thread: pthread_t, value_ptr: nullable **opaque) (void | errno) = {
+ const ret = _pthread_join(thread, value_ptr);
+
+ if (ret != 0)
+ return ret: errno;
+};