What's new in SwiftMQ Streams 10.2.0

New Admin Streams

We now provide a set of pre-installed so-called Admin Streams. They monitor Router resources and send alerts on condition, others provide services inside the Router.

SwiftMQ 10.2.0 contains the following Admin Streams:

  • Monitors to observe the inactivity of clients, routes and durable subscribers.
  • A monitor to observe any property of the Management Tree against a threshold.
  • Predefined mail streams that makes it easy to connect to different mail servers on different accounts.
  • A new message scheduler stream that is able to handle hundreds of thousands of message schedules.

Read more here.

New Features of the Stream Interface

Read docs of the Stream Interface here.

Code Completion Support

We provide TypeScript stubs that can be installed in IDEs like IntelliJ IDEA so that the IDE supports JavaScript code completion for the Streams Interface.

Queue Wiretap

You can wiretap any queue by adding a QueueWireTapInput to it. This is fully dynamic and doesn't require any configuration changes. Once a WireTap has been added to a queue, it sends a copy of each message send to this queue also to the WireTap. There can be multiple WireTaps on a queue, each will receive a message copy. So any Stream can register it's own WireTap with an optional message selector.

A WireTap can have multiple subscribers (Streams). With this it is possible to scale up processing of messages sent to the WireTap. Messages are distributed in a round-robin fashion to the subscribers of the WireTap.

Time Support

The Streams Interface provides a convenience class TimeSupport with various time methods, e.g. getting the current time, format/parse time, getting seconds, minute, day, week, month, year from a time. This class is accessible from Streams through variable "time".

Example:

        print("Week+45: "+time.week(time.currentTime(),45));

Output:

        Week+45: 7

Time Unit Change Limit

A Time Unit Change Limit limits a Memory by time but synchronizes it to a time unit. It is a tumbling window.

It can be used to retire windows at a change of the time unit, e.g. a limit of seconds(10) retires all Messages up to 12:00:09 when it changes to 12:00:10.

When a Stream is started it is mostly in the middle of a time unit. For example, if a Streams is started at 12:27:00 and has a limit of hours(1), the time unit changes ar 13:00:00 and all Messages with a time less than 13:00:00 will retire.

At Timer, Next Timer

An At Timer executes the onTimer callback at specific absolute times. Multiple times can be specified. The Timer is recurring. That is, when all times have been exuted, it continues the next day. The time format is either "HH:MM" or "HH:MM:SS".

The following Timer fires on 10:00, 11:35:45 and 17:30 each day:

        stream.create().timer("trigger").at().time("10:00").time("11:35:45").time("17:30").onTimer(function (timer) {
          // Do something
        });

A Next Timer is a one-shot Timer that executes the onTimer callback at the begin of a specific time: begin of the next second, minute, hour or day.

        stream.create().timer("daystarter").next().beginOfDay().onTimer(function (timer) {
          // Do something
        });

Inputs on same Destination

Sometimes it might be necessary to create multiple Inputs on the same destination, e.g. to have multiple subscribers on the same topic, each with a different message selector. To archive this, create the Inputs under a different name and use "destinationName(name)" to set the destination name.

        // Input 1
        stream.create().input("input1").topic().destinationName(inputTopic)
              .selector("ORDERHEADID between 1 and 10000")
              .onInput(function (input) {
                // Do something
              });

        // Input 2
        stream.create().input("input2").topic().destinationName(inputTopic)
              .selector("ORDERHEADID between 100001 and 20000")
              .onInput(function (input) {
                // Do something
              });

forEach for MemoryGroup, GroupResult

Memories contained in a Memory Group can be accessed by using a forEach callback:

        stream.memoryGroup("hour").forEach(function(mem){
          print("hour contains memory: "+mem.name());
        });

Memories contained in a GroupResult can be accessed by using a forEach callback:

        stream.memory("itemstats").group("ITEMNO").forEach(function(mem){
          // Do something
        });

JNDI Lookup

When multiple Streams communicate, one Stream will use a TempQueue as input and registers it in JNDI under a name. To be able that other Streams can send to this queue, a JNDI lookup can be performed and an Output can be created.

        // One Stream registers its Input queue in JNDI
        stream.create().input(stream.create().tempQueue("requestQueue").registerJNDI()).queue();
        // The other stream performs a lookup and creates an Output
        stream.create().output("requestQueue").forAddress(stream.lookupJNDI("requestQueue"));

Management Input: include

On an Management Input an onChange Message per default includes only the Property that has changed its value. To include additional Properties, use "include(names)". The Message will then also contain these Properties, even if the value has not changed.

        stream.create().input("sys$net/usage")
            .management()
            .include("swiftlet throughput-input throughput-output")
            .selector("swiftlet is not null")
            .onAdd(function (input) {
                stream.memoryGroup("hour").add(input.current());
            })
            .onChange(function (input) {
                stream.memoryGroup("hour").add(input.current());
            });

onStop Callback

A Stream can set an onStop callback which is called just before all Stream resources are closed. The intention of this callback is to let others know that the Stream is now unavailable. This can be accomplished by sending a message to a registry to unregister or send an eMail to someone.

The onStop callback is a function without parameters:

        stream.onStop(function() {
          stream.output("registry")
                .send(stream.create()
                            .message().textMessage()
                                      .body(stream.name()+": done")));
        });

Changes, Bugfixes

  • Output: Suppress exceptions when sending to a temp queue (which might be deleted in the meantime).
  • MemoryGroup: Missing the first message on add.
  • TopicInput: Close of nondurable with selector throws NullPointerException.