node-celery-ts
is a Celery client for Node.js written in TypeScript.
node-celery-ts
supports RabbitMQ and Redis result brokers and RPC (over
RabbitMQ) and Redis result backends. node-celery-ts
provides
higher performance than Celery on PyPy and provides greater feature support than
node-celery
, including Redis Sentinel
and Cluster, RPC result backends, YAML serialization, zlib task compression, and
Promise-based interfaces. node-celery-ts
uses
amqplib
and
ioredis
for RabbitMQ and Redis,
respectively. node-celery-ts
does not support Amazon SQS or Zookeeper message
brokers, nor does it support SQLAlchemy, Memcached, Cassandra, Elasticsearch,
IronCache, Couchbase, CouchDB, filesystem, or Consul result backends.
import * as Celery from "celery-ts";
const client: Celery.Client = Celery.createClient({
brokerUrl: "amqp://localhost",
resultBackend: "redis://localhost",
});
const task: Celery.Task<number> = client.createTask<number>("tasks.add");
const result: Celery.Result<number> = task.applyAsync({
args: [0, 1],
kwargs: { },
});
const promise: Promise<number> = result.get();
promise.then(console.log)
.catch(console.error);
import * as Celery from "celery-ts";
const id = "7a5b72ab-03d1-47d9-8a9d-54af7c26bd59";
const brokers: Array<Celery.MessageBroker> = [
Celery.createBroker("amqp://localhost"),
];
const backend: Celery.ResultBackend = Celery.createBackend("redis://localhost");
const client: Celery.Client = new Celery.Client({
backend,
brokers,
id,
});
const id = "7a5b72ab-03d1-47d9-8a9d-54af7c26bd59";
const brokers: Array<Celery.MessageBroker> = [
Celery.createBroker("amqp://localhost"),
Celery.createBroker("amqp://localhost:5673"),
];
const backend: Celery.ResultBackend = Celery.createBackend("redis://localhost");
const failoverStrategy: Celery.FailoverStrategy = (
brokers: Array<Celery.MessageBroker>,
): Celery.MessageBroker => {
return brokers[Math.floor(Math.random() * 2)];
};
const client: Celery.Client = new Celery.Client({
backend,
brokers,
failoverStrategy,
id,
});
const client: Celery.Client = Celery.createClient({
brokerUrl: "amqp://localhost",
resultBackend: "redis://localhost",
});
const task: Celery.Task<number> = client.createTask<number>("tasks.add");
const result: Celery.Result<number> = task.applyAsync({
args: [0, 1],
compression: Celery.Compressor.Zlib,
eta: new Date(Date.now() + 1000),
expires: new Date(Date.now() + 5000),
kwargs: { },
serializer: Celery.Serializer.Yaml,
});
const promise: Promise<number> = result.get();
promise.then(console.log)
.catch(console.error);
AmqpBroker
const options: Celery.AmqpOptions = {
hostname: "localhost",
protocol: "amqp",
};
const broker = new Celery.AmqpBroker(options);
RpcBackend
const id = "7a5b72ab-03d1-47d9-8a9d-54af7c26bd59";
const options: Celery.AmqpOptions = {
hostname: "localhost",
protocol: "amqp",
};
const backend = new Celery.RpcBackend(id, options);
RedisBackend
and RedisBroker
both accept a RedisOptions
object, which is
an interface that can be extended by the user to allow new creational patterns.
const tcp: RedisOptions = new Celery.RedisTcpOptions({
host: "localhost",
protocol: "redis",
});
const socket: RedisOptions = new Celery.RedisSocketOptions({
path: "/tmp/redis.sock",
protocol: "redis+socket",
});
If you so desire, you may also provide options directly to ioredis
when using
a TCP or Unix Socket connection. See BasicRedisOptions
for the full list.
const sentinel: RedisOptions = new Celery.RedisSentinelOptions({
sentinels: [
{ host: "localhost", port: 26379 },
{ host: "localhost", port: 26380 },
],
name: "mymaster",
});
const cluster: RedisOptions = new Celery.RedisClusterOptions({
nodes: [
{ host: "localhost", port: 6379 },
{ host: "localhost", port: 6380 },
],
});
node-celery-ts
was inspired by
node-celery
. Special thanks to
Cameron Will for his guidance.
node-celery-ts
is licensed under the BSD-3-Clause license.
If scalarOrArray
is a scalar, [scalarOrArray]
. If
scalarOrArray
is an Array
, scalarOrArray
.
If scalarOrArray
is an Array
, it cannot be empty.
A value to assert as a scalar.
If scalarOrArray
is a scalar, scalarOrArray
. If scalarOrArray
is an Array
, scalarOrArray[scalarOrArray.length - 1]
.
Supports Redis over TCP or Unix Socket, Redis Sentinel, or RabbitMQ RPC.
The UUID of this app.
The URI where a result backend can be found.
A ResultBackend
with settings parsed from rawUri
.
uses Buffer.toString() and Buffer.from() with encoding "base64"
an Encoder that uses the Base64 encoding method
If multiple values are provided, the last one provided is used.
The property name to map from.
The property name to map into. Defaults to source
.
A QueryDescriptor
that transforms to a boolean
.
Supports Redis over TCP and Sentinel, Redis Sentinel, and RabbitMQ.
The URI where a message broker can be found.
A MessageBroker
with settings parsed from rawUri
.
Delegates to createBackend
and createBroker
.
A newly constructed Client which will use the provided message broker(s) and result backend.
creates a Packer with the default serializer, compressor, and encoder
a Packer using JSON serialization, no compression, and Base64 encoding
uses zlib.g{un,}zipSync
a Compressor that uses the GZip compression method
a Compressor that returns a copy of its inputs
If multiple values are provided, the last one provided is used.
The property name to map from.
The property name to map into. Defaults to source
.
A QueryDescriptor
that transforms to a number
.
implemented using JSON.{stringify,parse}
a Serializer that serializes data to and from JSON
Options that might be used to construct ioredis clients.
A transformed NativeOptions
object.
a Packer that uses the specified serializer, compressor, and encoder
Interprets the values as paths, then reads it into a buffer with
Fs.readFileSync
.
The property name to map from.
The property name to map into. Defaults to source
.
A QueryDescriptor
that transforms into an Array<Buffer>
.
Interprets the value as a path, then reads from each path's corresponding
file using Fs.readFileSync
. If multiple values are provided, the last one
provided is used.
The property name to map from.
The property name to map into. Defaults to source
.
A QueryDescriptor
that maps to a Buffer
.
uses Buffer.toString() and Buffer.from() with encoding "utf8"
an Encoder that interprets its inputs as UTF-8 encoded text
Implemented using Promise.race
and createTimerPromise
. If timeout
is
undefined
, will not set a timeout.
The Promise
to race.
The time (in milliseconds) to wait before rejecting.
A Promise
that will follow promise
or reject after at least
timeout
milliseconds, whichever comes first.
Implemented using setTimeout
.
The time (in milliseconds) to wait before rejecting.
A Promise
that rejects after at least timeout
milliseconds.
implemented using js-yaml's safeDump and safeLoad cannot serialize undefined inputs
a Serializer that serializes data to and from YAML
uses zlib.{de,in}flateSync
a Compressor that uses the Zlib compression method
A Promise
that settles when the specified emitter emits an event
with matching name and the filtering condition is met.
The number of brokers to cycle through.
A FailoverStrategy
that will cycle through each result broker
sequentially with modulo arithmetic.
Only looks at the beginning of the string to match a scheme.
A valid URI.
An enum corresponding to the scheme of rawUri
.
A potentially null value.
value is null
.
A potentially null or undefined value.
value is null | undefined
.
A potentially undefined value.
value is undefined
.
uri
should be of the format:
redis[s]://[user[:pass]@]host[:port][/db][?query=value[&query=value]...]
snake_case query keys will be converted to camelCase. Supported queries are
"noDelay"
and "password"
,
uri
should be of the format:
redis[s]+socket://path[?query=value[&query=value]...]
Valid queries are "noDelay"
and "password"
. snake_case will be converted
to camelCase. If multiple duplicate queries are provided, the last one
provided will be used.
The URI to parse.
The Options
parsed from uri
.
uri
should be of the format:
redis[s]://[user[:pass]@]host[:port][/db][?query=value[&query=value]...]
snake_case query keys will be converted to camelCase. Supported queries are
"noDelay"
and "password"
,
uri
should be of the format:
redis[s]+socket://path[?query=value[&query=value]...]
Valid queries are "noDelay"
and "password"
. snake_case will be converted
to camelCase. If multiple duplicate queries are provided, the last one
provided will be used.
The URI to parse.
The Options
parsed from uri
.
Parses a URI formatted according to the rules set forth by RabbitMQ. https://www.rabbitmq.com/uri-spec.html https://www.rabbitmq.com/uri-query-parameters.html Potential queries are authMechanism, channelMax, connectionTimeout, frameMax, heartbeat, and locale. snake_case and camelCase are accepted. Should be formatted roughly as follows: amqp[s]://[user[:pass]@]host[:port][/vhost][?key0=value0[&key1=value1]...] Or as: rpc[s]://[user[:pass]@]host[:port][/vhost][?key0=value0[&key1=value1]...]
A RabbitMQ URI.
An object representation of uri
.
Strips whitespace on the left and right and parses case-insensitively. "true" | "1" | "on" | "yes" -> true "false" | "0" | "off" | "no" -> false
A boolean value.
The parsed boolean value.
Follows the same rules as C/C++ integral literal parsing, stripping away whitespace on the left or right before attempting to parse. Examples: "0644" -> 420, "0x20" -> 32, "0b101" -> 5, " 15 " => 15
A a non-negative base-{2,8,10,16} integer.
The parsed integer.
The object representation of a Redis URI.
A RedisQueryOptions
object with options present in uri
mapped.
The URI should be formatted as follows:
sentinel://host:port[;sentinel://host:port...]
Any one of the URIs can have name
or role
specified in their query. The
last query values will be used. name
is the name of the master to connect
to and must be specified. role
determines the specific node from the
Sentinel group that is connected to. Must be either "master"
or "slave"
.
Options
parsed from rawUri
.
A valid URI.
A normalized object representation of a URI.
Implemented using EventEmitter#once
.
The emitter to listen to.
The name of the event.
A Promise
that settles when the specified emitter emits an event
with matching name.
Converts from snake_case to camelCase.
The string to convert.
A camelCase string.
Generated using TypeDoc
A value to assert as an
Array
.