Skip to content

Commit 48dae22

Browse files
authored
Merge branch 'main' into 59-support-orderedmaprange-callbacks-mutating-the-map
2 parents 4db1198 + 2c70257 commit 48dae22

File tree

10 files changed

+442
-34
lines changed

10 files changed

+442
-34
lines changed

README.md

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@ making it suitable both for highest performance (in-place), and for LRU caches (
1717

1818
See the available types by underlying storage
1919

20-
| Type | Slice | Map | List | List+sync.Pool | List+int. pool | Recommended |
21-
|------------|:-----:|:---:|:----:|:--------------:|:--------------:|----------------------|
22-
| OrderedMap | Y | | | | | Slice with size hint |
23-
| Queue | Y | | Y | Y | Y | Slice with size hint |
24-
| Set | | Y | | | | Map with size hint |
25-
| Stack | Y | | Y | Y | Y | Slice with size hint |
26-
27-
**CAVEAT**: In order to optimize performance,
20+
| Type | Slice | Map | List | List+sync.Pool | List+int. pool | Recommended |
21+
|---------------|:-----:|:---:|:----:|:--------------:|:--------------:|----------------------|
22+
| OrderedMap | Y | | | | | Slice with size hint |
23+
| Queue | Y | | Y | Y | Y | Slice with size hint |
24+
| WaitableQueue | Y | | | | | Slice with size hint |
25+
| Set | | Y | | | | Map with size hint |
26+
| Stack | Y | | Y | Y | Y | Slice with size hint |
27+
28+
29+
**CAVEAT**: In order to optimize performance, except for WaitableQueue,
2830
all of these implementations are unsafe for concurrent execution,
2931
so they need protection in concurrency situations.
3032

33+
WaitableQueue being designed for concurrent code, on the other hand, is concurrency-safe.
34+
3135
Generally speaking, in terms of performance:
3236

3337
- Slice > list+internal pool > plain List > list+sync.Pool
@@ -51,23 +55,44 @@ om := orderedmap.NewSlice[Key, Value](sizeHint, stable)
5155
om.Store(k, v)
5256
om.Range(func (k K, v V) bool { fmt.Println(k, v); return true })
5357
v, loaded := om.Load(k)
54-
if !loaded { fmt.Printf("No entry for key %v\n", k)}
58+
if !loaded {
59+
fmt.Fprintf(w, "No entry for key %v\n", k)
60+
}
5561
om.Delete(k) // Idempotent: does not fail on nonexistent keys.
5662
```
5763

58-
### Queues
64+
### Classic Queues without flow control
5965

6066
```go
6167
var e Element
6268
q := queue.NewSliceQueue[Element](sizeHint)
6369
q.Enqueue(e)
6470
if lq, ok := q.(container.Countable); ok {
65-
fmt.Printf("elements in queue: %d\n", lq.Len())
71+
fmt.Fprintf(w, "elements in queue: %d\n", lq.Len())
6672
}
6773
for i := 0; i < 2; i++ {
68-
e, ok := q.Dequeue()
69-
fmt.Printf("Element: %v, ok: %t\n", e, ok)
74+
e, ok := q.Dequeue()
75+
fmt.Fprintf(w, "Element: %v, ok: %t\n", e, ok)
76+
}
77+
```
78+
79+
### WaitableQueue: a concurrent queue with flow control
80+
81+
```go
82+
var e Element
83+
q, _ := queue.NewWaitableQueue[Element](sizeHint, lowWatermark, highWatermark)
84+
go func() {
85+
wqs := q.Enqueue(e)
86+
if lq, ok := q.(container.Countable); ok {
87+
fmt.Fprintf(w, "elements in queue: %d, status: %s\n", lq.Len(), wqs)
88+
}
89+
}
90+
<-q.WaitChan() // Wait for elements to be available to dequeue
91+
for i := 0; i < 2; i++ { // Then dequeue them
92+
e, ok, wqs := q.Dequeue() // Non-blocking, ok will be true for the first and false for the second
93+
fmt.Fprintf(w, "Element: %v, ok: %t, status: %s\n", e, ok, wqs)
7094
}
95+
q.Close() // Only needed if consumers may still be waiting on <-q.WaitChan
7196
```
7297

7398
### Sets
@@ -78,10 +103,10 @@ s := set.NewBasicMap[Element](sizeHint)
78103
s.Add(e)
79104
s.Add(e)
80105
if cs, ok := q.(container.Countable); ok {
81-
fmt.Printf("elements in set: %d\n", cs.Len()) // 1
106+
fmt.Fprintf(w, "elements in set: %d\n", cs.Len()) // 1
82107
}
83108
for e := range s.Items() {
84-
fmt.Fprintln(w, e)
109+
fmt.Fprintln(w, e)
85110
}
86111

87112
```
@@ -92,11 +117,11 @@ fmt.Fprintln(w, e)
92117
s := stack.NewSliceStack[Element](sizeHint)
93118
s.Push(e)
94119
if ls, ok := s.(container.Countable); ok {
95-
fmt.Printf("elements in stack: %d\n", ls.Len())
120+
fmt.Printf("elements in stack: %d\n", ls.Len())
96121
}
97122
for i := 0; i < 2; i++ {
98-
e, ok := s.Pop()
99-
fmt.Printf("Element: %v, ok: %t\n", e, ok)
123+
e, ok := s.Pop()
124+
fmt.Printf("Element: %v, ok: %t\n", e, ok)
100125
}
101126
```
102127

cmd/waitablequeue/main.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package main
2+
3+
import (
4+
"os"
5+
)
6+
7+
func main() {
8+
os.Exit(realMain(os.Stdout, 60, 20, 30))
9+
}

cmd/waitablequeue/real_main.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"time"
7+
8+
"github.com/fgm/container"
9+
"github.com/fgm/container/queue"
10+
)
11+
12+
type Element int
13+
14+
func consumer(w io.Writer, wq container.WaitableQueue[Element]) {
15+
lq := wq.(container.Countable) // This implementation provides Countable, so the assertion cannot fail.
16+
17+
for {
18+
_, ok := <-wq.WaitChan()
19+
if !ok {
20+
fmt.Fprintf(w, "Consumer exiting on WaitableQueue closure, %d remaining in queue\n", lq.Len())
21+
return
22+
}
23+
fmt.Fprintln(w, "Queue might not be empty, looking for items")
24+
var (
25+
item Element
26+
wqs container.WaitableQueueState
27+
)
28+
for ok {
29+
item, ok, wqs = wq.Dequeue()
30+
if !ok {
31+
fmt.Fprintln(w, "Back to wait")
32+
break
33+
}
34+
fmt.Fprintf(w, "Received: %v, %d in queue, status %s\n", item, lq.Len(), wqs)
35+
time.Sleep(30 * time.Millisecond) // Consume more slowly than producer
36+
}
37+
}
38+
}
39+
40+
func producer(w io.Writer, wq container.WaitableQueue[Element]) {
41+
delay := time.After(3 * time.Second)
42+
for i := range 60 {
43+
wqs := wq.Enqueue(Element(i))
44+
fmt.Fprintf(w, "Sent: %v, status %s\n", i, wqs)
45+
time.Sleep(2 * time.Millisecond)
46+
}
47+
<-delay
48+
wq.Close()
49+
}
50+
51+
func realMain(w io.Writer, sizeHint, low, high int) int {
52+
wq, err := queue.NewWaitableQueue[Element](sizeHint, low, high)
53+
if err != nil {
54+
fmt.Fprintf(w, "Failed to create WaitableQueue: %v\n", err)
55+
return 1
56+
}
57+
58+
go consumer(w, wq)
59+
producer(w, wq)
60+
time.Sleep(100 * time.Millisecond) // Leave time for consumer() to display its exit message.
61+
return 0
62+
}

0 commit comments

Comments
 (0)