Overview
Semaphore with async interface, which limits simultaneous access to a resouce. When a resouce is ready, it "signals" the acquirer by resolve the promise returned from acquire.
But it's more powerful than any other semaphores, thanks to the extra grant | revoke and freeze | unfreeze primitives which will be introduced later. For more examples, see examples.
Terminology:
- permit: each acquirer is acquiring a
permit
from the semaphore. Once acquired, it executes its own logic before releasing the permit to other acquirers.
create a semaphore
Constructs a semaphore with given number of permits, which can be later used to limit concurrency. A semaphore may have unlimited number of permits.
const sem = new Semaphore(5)
acquire & release
You can acquire a permit from semaphore, and the returned promise won't resolve until the semaphore has remaining permits. Don't forget to release them once you've done.
There are some synchronos variant available, see API
const sem = new Semaphore(5)
const tarballs = Promise.all(packages.map(async (package) => {
const release = await sem.acquire()
const tarball = await download(package)
release()
return tarball
}))
grant & revoke
grant
and revoke
primitives let you give a semaphore more permits, or revoke them, indicating that there are more resources available.
const sem = new Semaphore(5)
const emitter = new EventEmitter()
const queue = []
emitter.on('event', (e) => {
queue.push(e)
sem.grant()
})
// this is basically the same as how flowp's Channel works
while(await sem.revoke()) {
const data = queue.pop()
}
freeze & unfreeze
You can freeze the semaphore so it stops granting any new permits because the resource is temporarily unavailable. revoke
is affected since it needs to collect permits (acquire) first before destroying them.
const sem = new Semaphore(5)
sem.freeze()
sem.frozen // true
await sem.revoke() // hanging forever
await sem.acquire() // hanging forever
const sem = new Semaphore(1)
let socket = net.connect(host, port)
// socket disconnected, freeze the semaphore until reconnection
socket.on('close', err => {
sem.freeze()
socket = net.connect(host, port)
socket.on('connect', () => sem.unfreeze())
})
// some other place
await sem.acquire()
socket.write(buf)
inspect the semaphore
There are several getters that help you inspect current state of the semaphore.
const sem = new Semaphore(5)
sem.isEmpty // true
sem.isFull // false
sem.permits // 5
sem.remain // 5
Mutex
Mutex is a special type of semaphore with only 1 permit, and its interfaces are different. You can optionally wrap an object to protect it from concurrent access.
lock & release
const mutex = new Mutex()
// canLock and tryLock are seperated into two operations, but it's okay to assume we run in a single main thread
if (!mutex.canLock()) return
const release = mutex.tryLock()
// do something
release()
// wrap an object
const mutex = new Mutex({ a: 1 })
const { release, value } = await mutex.lock()
const ref = value // value is a temporary reference which does not equal the value stores in mutex
ref.a // => 1
release()
ref.a // => TypeError, temporary reference destroyed
freeze & unfreeze
same as Semaphore.freeze