Semaphore & Mutex
Semaphore Class
class Semaphore
Semaphore is a simple class without any implements / extends or generic parameters.
Properties
permits
get permits(): number
Returns the maximum number of permits of this semaphore.
remain
get remain(): number
Inspects current remaining permits of this semaphore.
isFull
get isFull(): boolean
Check if the semaphore has drained its permits. Always return true
if permits = 0
isEmpty
get isEmpty(): boolean
Check if the semaphore has not yet give out any permits. Always return true
if permits = 0
Methods
constructor
constructor(permits?: number)
Constructs a new Semaphore with n permits. The number of permits can be adjusted later using grant or revoke methods.
acquire
acquire(timeout?: number): Promise<() => void>
Acquires a permit from the semaphore and returns a Promise which will be fulfilled with a function to release the permit. The release function can only be called once and subsequent calls will have no effect.
tryAcquire
tryAcquire(): () => void
Synchronosly acquires a permit and throws if the semaphore has no remaining permit.
schedule
schedule<T>(fn: () => T): Promise<Awaited<T>>
Schedule a task to run when a permit is available and automatically release after run.
Examples:
const sem = new Semaphore(5)
// concurrency limit
const tarballs = Promise.all(packages.map(async (package) => {
const release = await sem.acquire()
const tarball = await download(package)
release()
return tarball
}))
// drop requests
server.on((request) => {
try {
const release = sem.tryAcquire()
handle(request)
release()
} catch (error) {
console.log('dropped request')
}
})
// or schedule concurrent tasks with automatic release
await Promise.all(tasks => sem.schedule(() => do(task)))
grant
grant(permits?: number): void
Give n permits to semaphore, will immediately start this number of waiting tasks if not frozen.
revoke
revoke(permits?: number): Promise<void>
Revoke n permits from semaphore. It collect this number of permits first by calling acquire and destroys them, then reduce max number of permits by n.
Examples:
const sem = new Semaphore(5)
const emitter = new EventEmitter()
const queue = []
emitter.on('event', (e) => {
queue.push(e)
sem.grant()
})
// this is basically the same as how flowp's Channel works
while (await sem.revoke()) {
const data = queue.pop()
}
freeze
freeze(): void
Freeze this semaphore, calling acquire won't resolve and tryAcquire will throw (release can still be called).
unfreeze
unfreeze(): Promise<void>
Unfreeze this semaphore, queued tasks start to run immediately.
it is synchronos and the returned value should be ignored.
Examples:
const sem = new Semaphore(5)
sem.freeze()
setTimeout(() => sem.unfreeze(), 5000)
await sem.revoke() // hanging until timeout
await sem.acquire() // hanging until timeout
const sem = new Semaphore(1)
let socket = net.connect(host, port)
// socket disconnected, freeze the semaphore until reconnection
socket.on('close', (err) => {
sem.freeze()
socket = net.connect(host, port)
socket.on('connect', () => sem.unfreeze())
})
// some other place
await sem.acquire()
socket.write(buf)
Mutex Class
Interface
MutexGuard
a value created by mutex.lock()
, mutex.tryLock()
or mutex.schedule()
. It contains a value
property which is a temporary reference to the value stored in the mutex, and you can access its properties as long as you didn't release the lock.
/**
* calling the `guard()` or `guard.release()` will release the mutex and revoke `MutexGuard.value`
* so that any subsequent access to the value will throw a TypeError
*/
export type MutexGuard<V> = V extends object
? {
(): void
release: () => void
value: V
}
: () => void
Class
class Mutex<V = void>
Where:
V
: type of the object wrapped by the mutex, and a immutable T does not make sense
Mutex does not extend Semaphore because it removes some unnecessary methods / props from Semaphore.
Properties
canLock
get canLock(): boolean
Alias for Semaphore.isEmpty
frozen
get frozen(): boolean
Check if mutex is frozen
Methods
constructor
constructor()
Constructs a Mutex and its capacity is always 1.
lock
lock(timeout?: number): Promise<MutexGuard<V>>
acquire lock, returns a MutexGuard
const mutex = new Mutex({ a: 1 })
const { release, value } = await mutex.lock()
const ref = value
ref.a // => 1
release()
ref.a // => TypeError, temporary reference destroyed
### tryLock
```typescript
tryLock(): () => void
acquire lock, returns a MutexGuard
const mutex = new Mutex({ a: 1 })
const { release, value } = mutex.tryLock()
const ref = value
ref.a // => 1
release()
ref.a // => TypeError, temporary reference destroyed
schedule
schedule<T>(fn: (v: V) => T): Promise<Awaited<T>>
Same as Semaphore.schedule.
Examples:
const mut = new Mutex()
// Promise.serial
const tarballs = Promise.all(packages.map(async (package) => {
const release = await mut.lock()
const tarball = await download(package)
release()
return tarball
}))
// or serialize concurrent tasks with automatic release
await Promise.all(tasks => mut.schedule(() => do(task)))
freeze
freeze(): void
Same as Semaphore.freeze.
unfreeze
unfreeze(): Promise<void>
Same as Semaphore.unfreeze.
frozen
get frozen(): boolean
Same as Semaphore.frozen.