Transactions

Let's see how we can use Redis transactions with Node. As a reminder, a transaction enables one client to execute a series of multiple commands without another client being able to interrupt the commands. Transactions guarantee that the commands are executed as a single isolated operation.

Swimming Pool

For a context to learn these principles, we are once again using a Raspberry Pi to gather temperatures from our swimming pool and sending those temperatures on to a cloud-based historian. Unfortunately, the world is not perfect, and we occasionally lose Internet connectivity. Redis to the rescue! In this case, we use a Redis list as a temporary historian to buffer our temperature values until our Internet connection is restored and we can empty the list. Each temperature point is stored as a JSON object as follows (where t represents a timestamp and v represents a value):

{"t":"1/6/2018 10:01","v":75.59}

First, we'll employ the following function to populate a Redis list (poolTemps) with some swimming pool temperature values to simulate data that has been streamed in from our pool temperature sensor:

const getData = () => [
    { t: '1/6/2018 10:01', v: 75.59 },
    { t: '1/6/2018 10:02', v: 75.60 },
    { t: '1/6/2018 10:03', v: 75.63 },
];

Our challenge is that we want to retrieve all the values stored in our poolTemps list and upload them to our cloud-based historian. We also want to empty the list; however, we have another process running that is faithfully adding new temperature sensor values. There is a remote possibility that we could retrieve all the temperature values from the list and the other process could add another sensor value in the split second before we delete the list. One of our history points would be lost. :cry: Let's solve this problem with the help of Redis transactions and Node.js.

Populate Pool Temperatures in a Redis List

We start by populating the poolTemps list with swimming pool temperature data points:

const Redis = require('ioredis');

const redis = new Redis();

const getData = () => [
    { t: '1/6/2018 10:01', v: 75.59 },
    { t: '1/6/2018 10:02', v: 75.60 },
    { t: '1/6/2018 10:03', v: 75.63 },
];

async function populateWithHistory(key, data) {
    // Seed with data
    try {
        let strData = [];
        data.forEach(d => strData.push(JSON.stringify(d)));
        const result = await redis.rpush(key, ...strData);
        return result;
    } catch (error) {
        console.error(error);
    }
}

async function main() {
    const key = 'poolTemps';
    const data = getData();
    const result = await populateWithHistory(key, data);
    console.log(result);
    redis.disconnect();
}

main();

As shown above, this is accomplished by serializing the individual pool temperature objects and using rpush to add the objects to the Redis list. We are now ready to implement transactions.

Let's retrieve all the swimming pool temperature values from the Redis list and empty the list using one transaction:

const Redis = require('ioredis');

const redis = new Redis();

async function getRedisData(key) {
    const data = [];

    try {
        // Wrap this in a transaction so don't delete new list items that might get added.
        const result = await redis.multi()
            .lrange(key, 0, -1)
            .del(key)
            .exec();

        // The result of the lrange command (containing the list of history points) is
        // stored in the first element of the result array.  Within this element, the
        // history is stored in the second element.
        const items = result[0][1];

        items.forEach((s) => {
            data.push(JSON.parse(s));
        });
    } catch (error) {
        console.error(error);
    }
    return data;
}

async function main() {
    const key = 'poolTemps';
    const data = await getRedisData(key);
    console.log(data);

    // We have the data and could send it to the cloud from here.

    redis.disconnect();
}

main();

In line 10, we mark the beginning of the transaction block with the multi function.

We queue our lrange command in line 11 to retrieve the pool temperatures from the poolTemps list. Likewise, we queue our del command in line 12 to wipe all of the elements from the list.

We use the exec command in line 13 to executes all our previously queued commands.

The exec command returns an array containing the execution results of each queued command. In our example, result[0] contains the result of the lrange command and result[1] contains the result of our del command. We are interested in result[0] because it contains a list of pool temperature values (retrieved from the lrange command) to send to the cloud. Here is the specific structure of the result[0] array:

[ null,
  [ '{"t":"1/6/2018 10:01","v":75.59}',
    '{"t":"1/6/2018 10:02","v":75.6}',
    '{"t":"1/6/2018 10:03","v":75.63}' ] ]

The pool temperature values are stored in the second element of the result[0] array and thus we use result[0][1] rather than result[0][0] (see line 18) to retrieve the pool temperatures.

Using Redis Transactions (The Full Example)

Let's combine everything together and populate the pool temperature data and retrieve and empty the list using transactions so we have a complete working example:

const Redis = require('ioredis');

const redis = new Redis();

const getData = () => [
    { t: '1/6/2018 10:01', v: 75.59 },
    { t: '1/6/2018 10:02', v: 75.60 },
    { t: '1/6/2018 10:03', v: 75.63 },
];

async function populateWithHistory(key, data) {
    // Seed with data
    try {
        let strData = [];
        data.forEach(d => strData.push(JSON.stringify(d)));
        const result = await redis.rpush(key, ...strData);
        return result;
    } catch (error) {
        console.error(error);
    }
}

async function getRedisData(key) {
    const data = [];

    try {
        // Wrap this in a transaction so don't delete new list items that might get added.
        const result = await redis.multi()
            .lrange(key, 0, -1)
            .del(key)
            .exec();

        // The result of the lrange command (containing the list of history points) is stored in
        // the first element of the result array.  Within this element, the history is stored in
        // the second element.
        const items = result[0][1];

        items.forEach((s) => {
            data.push(JSON.parse(s));
        });
    } catch (error) {
        console.error(error);
    }
    return data;
}

async function main() {
    const key = 'poolTemps';
    const seedData = getData();
    const result = await populateWithHistory(key, seedData);
    console.log(result);

    // Get the data. An array of 3 pool temperature points should be returned.
    const data = await getRedisData(key);
    console.log(data);

    // We have the data and could send it to the cloud from here.

    // Try to get the data one more time. An empty array should be returned.
    const data2 = await getRedisData(key);
    console.log(data2);

    redis.disconnect();
}

main();

There you have it! Our pool temperature data is safe (with no lost history) and transferred to the cloud :cloud: with the help of Redis transactions and Node.

results matching ""

    No results matching ""