Options
All
  • Public
  • Public/Protected
  • All
Menu

Status

Build Status NPM Maintainability Test Coverage

Description

node-celery-ts is a Celery client for Node.js written in TypeScript. node-celery-ts supports RabbitMQ and Redis result brokers and RPC (over RabbitMQ) and Redis result backends. node-celery-ts provides higher performance than Celery on PyPy and provides greater feature support than node-celery, including Redis Sentinel and Cluster, RPC result backends, YAML serialization, zlib task compression, and Promise-based interfaces. node-celery-ts uses amqplib and ioredis for RabbitMQ and Redis, respectively. node-celery-ts does not support Amazon SQS or Zookeeper message brokers, nor does it support SQLAlchemy, Memcached, Cassandra, Elasticsearch, IronCache, Couchbase, CouchDB, filesystem, or Consul result backends.

Usage

Basic

import * as Celery from "celery-ts";

const client: Celery.Client = Celery.createClient({
    brokerUrl: "amqp://localhost",
    resultBackend: "redis://localhost",
});

const task: Celery.Task<number> = client.createTask<number>("tasks.add");
const result: Celery.Result<number> = task.applyAsync({
    args: [0, 1],
    kwargs: { },
});

const promise: Promise<number> = result.get();

promise.then(console.log)
    .catch(console.error);

Advanced

import * as Celery from "celery-ts";

const id = "7a5b72ab-03d1-47d9-8a9d-54af7c26bd59";
const brokers: Array<Celery.MessageBroker> = [
    Celery.createBroker("amqp://localhost"),
];
const backend: Celery.ResultBackend = Celery.createBackend("redis://localhost");

const client: Celery.Client = new Celery.Client({
    backend,
    brokers,
    id,
});

Message Broker Failover

const id = "7a5b72ab-03d1-47d9-8a9d-54af7c26bd59";
const brokers: Array<Celery.MessageBroker> = [
    Celery.createBroker("amqp://localhost"),
    Celery.createBroker("amqp://localhost:5673"),
];
const backend: Celery.ResultBackend = Celery.createBackend("redis://localhost");

const failoverStrategy: Celery.FailoverStrategy = (
    brokers: Array<Celery.MessageBroker>,
): Celery.MessageBroker => {
    return brokers[Math.floor(Math.random() * 2)];
};

const client: Celery.Client = new Celery.Client({
    backend,
    brokers,
    failoverStrategy,
    id,
});

Task Options

const client: Celery.Client = Celery.createClient({
    brokerUrl: "amqp://localhost",
    resultBackend: "redis://localhost",
});

const task: Celery.Task<number> = client.createTask<number>("tasks.add");
const result: Celery.Result<number> = task.applyAsync({
    args: [0, 1],
    compression: Celery.Compressor.Zlib,
    eta: new Date(Date.now() + 1000),
    expires: new Date(Date.now() + 5000),
    kwargs: { },
    serializer: Celery.Serializer.Yaml,
});

const promise: Promise<number> = result.get();

promise.then(console.log)
    .catch(console.error);

RabbitMQ

AmqpBroker

const options: Celery.AmqpOptions = {
    hostname: "localhost",
    protocol: "amqp",
};
const broker = new Celery.AmqpBroker(options);

RpcBackend

const id = "7a5b72ab-03d1-47d9-8a9d-54af7c26bd59";
const options: Celery.AmqpOptions = {
    hostname: "localhost",
    protocol: "amqp",
};
const backend = new Celery.RpcBackend(id, options);

Redis

RedisBackend and RedisBroker both accept a RedisOptions object, which is an interface that can be extended by the user to allow new creational patterns.

TCP

const tcp: RedisOptions = new Celery.RedisTcpOptions({
    host: "localhost",
    protocol: "redis",
});

Unix Socket

const socket: RedisOptions = new Celery.RedisSocketOptions({
    path: "/tmp/redis.sock",
    protocol: "redis+socket",
});

If you so desire, you may also provide options directly to ioredis when using a TCP or Unix Socket connection. See BasicRedisOptions for the full list.

Sentinel

const sentinel: RedisOptions = new Celery.RedisSentinelOptions({
    sentinels: [
        { host: "localhost", port: 26379 },
        { host: "localhost", port: 26380 },
    ],
    name: "mymaster",
});

Cluster

const cluster: RedisOptions = new Celery.RedisClusterOptions({
    nodes: [
        { host: "localhost", port: 6379 },
        { host: "localhost", port: 6380 },
    ],
});

Thanks

node-celery-ts was inspired by node-celery. Special thanks to Cameron Will for his guidance.

License

node-celery-ts is licensed under the BSD-3-Clause license.

Index

Enumerations

Classes

Interfaces

Type aliases

Variables

Functions

Object literals

Type aliases

Args

Args: Array<any>

ContentType

ContentType: "json" | "yaml" | "application/json" | "application/x-yaml"

FailoverStrategy

FailoverStrategy: function

Type declaration

NativeOptions

NativeOptions: IoRedis.RedisOptions | BasicRedisClusterOptions

Parser

Parser<T>: function

Type parameters

  • T

Type declaration

    • (raw: string | Array<string>): T
    • Parameters

      • raw: string | Array<string>

      Returns T

Priority

Priority: number

Variables

Const DEFAULT_REDIS_OPTIONS

DEFAULT_REDIS_OPTIONS: RedisTcpOptions = new RedisTcpOptions({protocol: "redis"})

Functions

Const asArray

  • asArray<T>(scalarOrArray: T | Array<T>): Array<T>
  • Type parameters

    • T

    Parameters

    • scalarOrArray: T | Array<T>

      A value to assert as an Array.

    Returns Array<T>

    If scalarOrArray is a scalar, [scalarOrArray]. If scalarOrArray is an Array, scalarOrArray.

Const asScalar

  • asScalar<T>(scalarOrArray: T | Array<T>): T
  • If scalarOrArray is an Array, it cannot be empty.

    Type parameters

    • T

    Parameters

    • scalarOrArray: T | Array<T>

      A value to assert as a scalar.

    Returns T

    If scalarOrArray is a scalar, scalarOrArray. If scalarOrArray is an Array, scalarOrArray[scalarOrArray.length - 1].

Const createBackend

  • Supports Redis over TCP or Unix Socket, Redis Sentinel, or RabbitMQ RPC.

    Parameters

    • id: string

      The UUID of this app.

    • rawUri: string

      The URI where a result backend can be found.

    Returns ResultBackend

    A ResultBackend with settings parsed from rawUri.

Const createBase64Encoder

  • uses Buffer.toString() and Buffer.from() with encoding "base64"

    Returns Encoder

    an Encoder that uses the Base64 encoding method

Const createBooleanQueryDescriptor

  • createBooleanQueryDescriptor(source: string, target?: undefined | string): QueryDescriptor<boolean>
  • If multiple values are provided, the last one provided is used.

    Parameters

    • source: string

      The property name to map from.

    • Optional target: undefined | string

      The property name to map into. Defaults to source.

    Returns QueryDescriptor<boolean>

    A QueryDescriptor that transforms to a boolean.

Const createBroker

  • Supports Redis over TCP and Sentinel, Redis Sentinel, and RabbitMQ.

    Parameters

    • rawUri: string

      The URI where a message broker can be found.

    Returns MessageBroker

    A MessageBroker with settings parsed from rawUri.

Const createClient

  • createClient(__namedParameters: object): Client
  • Delegates to createBackend and createBroker.

    Parameters

    • __namedParameters: object

    Returns Client

    A newly constructed Client which will use the provided message broker(s) and result backend.

Const createDefaultPacker

  • createDefaultPacker(): Packer
  • creates a Packer with the default serializer, compressor, and encoder

    Returns Packer

    a Packer using JSON serialization, no compression, and Base64 encoding

Const createGzipCompressor

Const createIdentityCompressor

Const createIntegerQueryDescriptor

  • createIntegerQueryDescriptor(source: string, target?: undefined | string): QueryDescriptor<number>
  • If multiple values are provided, the last one provided is used.

    Parameters

    • source: string

      The property name to map from.

    • Optional target: undefined | string

      The property name to map into. Defaults to source.

    Returns QueryDescriptor<number>

    A QueryDescriptor that transforms to a number.

Const createJsonSerializer

Const createOptions

Const createPacker

  • createPacker(__namedParameters: object): Packer
  • Parameters

    • __namedParameters: object

    Returns Packer

    a Packer that uses the specified serializer, compressor, and encoder

Const createPathArrayQueryDescriptor

  • createPathArrayQueryDescriptor(source: string, target?: undefined | string): QueryDescriptor<Array<Buffer>>
  • Interprets the values as paths, then reads it into a buffer with Fs.readFileSync.

    Parameters

    • source: string

      The property name to map from.

    • Optional target: undefined | string

      The property name to map into. Defaults to source.

    Returns QueryDescriptor<Array<Buffer>>

    A QueryDescriptor that transforms into an Array<Buffer>.

Const createPathQueryDescriptor

  • createPathQueryDescriptor(source: string, target?: undefined | string): QueryDescriptor<Buffer>
  • Interprets the value as a path, then reads from each path's corresponding file using Fs.readFileSync. If multiple values are provided, the last one provided is used.

    Parameters

    • source: string

      The property name to map from.

    • Optional target: undefined | string

      The property name to map into. Defaults to source.

    Returns QueryDescriptor<Buffer>

    A QueryDescriptor that maps to a Buffer.

Const createPlaintextEncoder

  • createPlaintextEncoder(): Encoder
  • uses Buffer.toString() and Buffer.from() with encoding "utf8"

    Returns Encoder

    an Encoder that interprets its inputs as UTF-8 encoded text

Const createTimeoutPromise

  • createTimeoutPromise<T>(promise: T | PromiseLike<T>, timeout?: undefined | number): Promise<T>
  • Implemented using Promise.race and createTimerPromise. If timeout is undefined, will not set a timeout.

    Type parameters

    • T

    Parameters

    • promise: T | PromiseLike<T>

      The Promise to race.

    • Optional timeout: undefined | number

      The time (in milliseconds) to wait before rejecting.

    Returns Promise<T>

    A Promise that will follow promise or reject after at least timeout milliseconds, whichever comes first.

Const createTimerPromise

  • createTimerPromise(timeout: number): Promise<never>
  • Implemented using setTimeout.

    Parameters

    • timeout: number

      The time (in milliseconds) to wait before rejecting.

    Returns Promise<never>

    A Promise that rejects after at least timeout milliseconds.

Const createYamlSerializer

  • implemented using js-yaml's safeDump and safeLoad cannot serialize undefined inputs

    Returns Serializer

    a Serializer that serializes data to and from YAML

Const createZlibCompressor

Const filterMapEvent

  • filterMapEvent<T>(__namedParameters: object): Promise<T>
  • Type parameters

    • T

    Parameters

    • __namedParameters: object

    Returns Promise<T>

    A Promise that settles when the specified emitter emits an event with matching name and the filtering condition is met.

Const getRoundRobinStrategy

  • Parameters

    • size: number

      The number of brokers to cycle through.

    Returns FailoverStrategy

    A FailoverStrategy that will cycle through each result broker sequentially with modulo arithmetic.

Const getScheme

  • getScheme(rawUri: string): Scheme
  • Only looks at the beginning of the string to match a scheme.

    Parameters

    • rawUri: string

      A valid URI.

    Returns Scheme

    An enum corresponding to the scheme of rawUri.

Const isNull

  • isNull<T>(value: T | null): boolean
  • Type parameters

    • T

    Parameters

    • value: T | null

      A potentially null value.

    Returns boolean

    value is null.

Const isNullOrUndefined

  • isNullOrUndefined<T>(value: T | null | undefined): boolean
  • Type parameters

    • T

    Parameters

    • value: T | null | undefined

      A potentially null or undefined value.

    Returns boolean

    value is null | undefined.

Const isUndefined

  • isUndefined<T>(value: T | undefined): boolean
  • Type parameters

    • T

    Parameters

    • value: T | undefined

      A potentially undefined value.

    Returns boolean

    value is undefined.

Const parse

  • uri should be of the format: redis[s]://[user[:pass]@]host[:port][/db][?query=value[&query=value]...] snake_case query keys will be converted to camelCase. Supported queries are "noDelay" and "password", uri should be of the format: redis[s]+socket://path[?query=value[&query=value]...] Valid queries are "noDelay" and "password". snake_case will be converted to camelCase. If multiple duplicate queries are provided, the last one provided will be used.

    Parameters

    • rawUri: string

      The URI to parse.

    Returns Options

    The Options parsed from uri.

  • uri should be of the format: redis[s]://[user[:pass]@]host[:port][/db][?query=value[&query=value]...] snake_case query keys will be converted to camelCase. Supported queries are "noDelay" and "password", uri should be of the format: redis[s]+socket://path[?query=value[&query=value]...] Valid queries are "noDelay" and "password". snake_case will be converted to camelCase. If multiple duplicate queries are provided, the last one provided will be used.

    Parameters

    • uri: string

      The URI to parse.

    Returns Options

    The Options parsed from uri.

Const parseAmqpUri

  • Parses a URI formatted according to the rules set forth by RabbitMQ. https://www.rabbitmq.com/uri-spec.html https://www.rabbitmq.com/uri-query-parameters.html Potential queries are authMechanism, channelMax, connectionTimeout, frameMax, heartbeat, and locale. snake_case and camelCase are accepted. Should be formatted roughly as follows: amqp[s]://[user[:pass]@]host[:port][/vhost][?key0=value0[&key1=value1]...] Or as: rpc[s]://[user[:pass]@]host[:port][/vhost][?key0=value0[&key1=value1]...]

    Parameters

    • rawUri: string

      A RabbitMQ URI.

    Returns AmqpOptions

    An object representation of uri.

Const parseBoolean

  • parseBoolean(maybeBoolean: string): boolean
  • Strips whitespace on the left and right and parses case-insensitively. "true" | "1" | "on" | "yes" -> true "false" | "0" | "off" | "no" -> false

    Parameters

    • maybeBoolean: string

      A boolean value.

    Returns boolean

    The parsed boolean value.

Const parseInteger

  • parseInteger(maybeInt: string): number
  • Follows the same rules as C/C++ integral literal parsing, stripping away whitespace on the left or right before attempting to parse. Examples: "0644" -> 420, "0x20" -> 32, "0b101" -> 5, " 15 " => 15

    Parameters

    • maybeInt: string

      A a non-negative base-{2,8,10,16} integer.

    Returns number

    The parsed integer.

Const parseRedisQuery

Const parseSentinelUri

  • parseSentinelUri(rawUris: string): Options
  • The URI should be formatted as follows: sentinel://host:port[;sentinel://host:port...] Any one of the URIs can have name or role specified in their query. The last query values will be used. name is the name of the master to connect to and must be specified. role determines the specific node from the Sentinel group that is connected to. Must be either "master" or "slave".

    Parameters

    • rawUris: string

    Returns Options

    Options parsed from rawUri.

Const parseUri

  • parseUri(toParse: string): Uri
  • Parameters

    • toParse: string

      A valid URI.

    Returns Uri

    A normalized object representation of a URI.

Const promisifyEvent

  • promisifyEvent<T>(emitter: Events.EventEmitter, name: string | symbol): Promise<T>
  • Implemented using EventEmitter#once.

    Type parameters

    • T

    Parameters

    • emitter: Events.EventEmitter

      The emitter to listen to.

    • name: string | symbol

      The name of the event.

    Returns Promise<T>

    A Promise that settles when the specified emitter emits an event with matching name.

Const toCamelCase

  • toCamelCase(toConvert: string): string
  • Converts from snake_case to camelCase.

    Parameters

    • toConvert: string

      The string to convert.

    Returns string

    A camelCase string.

Object literals

Const DEFAULT_AMQP_OPTIONS

DEFAULT_AMQP_OPTIONS: object

hostname

hostname: string = "localhost"

protocol

protocol: string = "amqp"

Const DEFAULT_CONFIG

DEFAULT_CONFIG: object

brokerFailoverStrategy

brokerFailoverStrategy: "round-robin" = "round-robin"

brokerUrl

brokerUrl: string = "amqp://localhost"

resultPersistent

resultPersistent: false = false

resultSerializer

resultSerializer: "json" = "json"

taskDefaultDeliveryMode

taskDefaultDeliveryMode: "persistent" = "persistent"

taskDefaultQueue

taskDefaultQueue: string = "celery"

taskIgnoreResult

taskIgnoreResult: false = false

taskProtocol

taskProtocol: 2 = 2

taskSerializer

taskSerializer: "json" = "json"

Legend

  • Module
  • Object literal
  • Variable
  • Function
  • Function with type parameter
  • Index signature
  • Type alias
  • Type alias with type parameter
  • Enumeration
  • Enumeration member
  • Property
  • Method
  • Interface
  • Interface with type parameter
  • Constructor
  • Property
  • Method
  • Index signature
  • Class
  • Class with type parameter
  • Constructor
  • Property
  • Method
  • Accessor
  • Index signature
  • Inherited constructor
  • Inherited property
  • Inherited method
  • Inherited accessor
  • Protected property
  • Protected method
  • Protected accessor
  • Private property
  • Private method
  • Private accessor
  • Static property
  • Static method

Generated using TypeDoc