By user10664542


2019-12-02 18:56:36 8 Comments

await is not blocking as expected, when a block of code updates db (using postgres / node )

https://node-postgres.com

I have a list of async function calls, each call udpates a database, and each subsequent call works on data updated by the previous call.

There are about eight calls in a row, and each call must update the complete set of data it is working with, 100% to completion, before going to the next.

I tried to make everything not async, but it appears I am forced to make everything async/await because of the library I am using (postgres / node).


Each function call must complete 100% before going on to the next function call, because the next step does a select on rows where a field is not null (where the previous step fills in a value).

I have an await in front of each call, that does something (see code below):

  1. loads the db from a csv,
  2. next step selects all rows just inserted, calls an API and updates the database,
  3. and so on,

but at one point, when the next function executes, NONE of the rows have been updated (as I trace through and verify, a SQL statement returns nothing back),

the code seems to pass right through going to the second function call, not blocking, honoring the await, and completing it's code block.

If I comment out some of the latter rows (dependent on the previous), and let the program run to completion, the database gets updated.

There is nothing functionally wrong with the code, everything works, just not from beginning to completion.

After running two function calls at the beginning, letting that run, I can then comment out those rows, uncomment the later rows in the flow, and run again, and everything works as expected, but I cannot run to completion with both uncommented.

What can I do to make sure each function call completes 100%, has all updates completed in the database, before going to the next step?

async/await is not working for me.

this is not pseudo-code it's the actual code, that is executing, that I am working with, the function names changed only. It is real working code, cut-n-pasted direct from my IDE.

// these are functions I call below (each in their own .js)
const insert_rows_to_db_from_csv = require('./insert_rows_to_db_from_csv')
const call_api_using_rows_from_function_above = require('./call_api_using_rows_from_function_above')
const and_so_on = require('./and_so_on')
const and_so_on_and_on = require('./and_so_on_and_on')
const and_so_on_and_on_and_on = require('./and_so_on_and_on_and_on')

// each of the above exports a main() function where I can call func.main() just // like this one defined below (this is my main() entry point)

module.exports = {
    main: async function (csvFilePath) {
        console.log('service: upload.main()')
        try {
            const csvList = []

            let rstream = fs.createReadStream(csvFilePath)
                .pipe(csv())
                .on('data', (data) => csvList.push(data))
                .on('end', async () => {
                    let num_rows = csvList.length

                    //step one (if I run these two, with step two calls below commented out, this works)
                    await insert_rows_to_db_from_csv.main(csvList);
                    await call_api_using_rows_from_function_above.main();

                    // step two
                    // blows up here, on the next function call,
                    // no rows selected in sql statements, must comment out, let the above run to
                    // completion, then comment out the rows above, and let these run separate
                    await work_with_rows_updated_in_previous_call_above.main();   // sets
                    await and_so_on.main();
                    await and_so_on_and_on.main();
                    await and_so_on_and_on_and_on.main();
                })
        } catch (err) {
            console.log(err.stack)
        } finally {
        }
    }
};

here is the one liner I am using to call the insert/update to the DB:

 return await pool.query(sql, values);

that's it, nothing more. This is from using: https://node-postgres.com/

npm install pg


PART 2 - continuing on,

I think the problem might be here. This is where I am doing each API call, then insert (that the next function call is dependent upon), some code smell here that I can't sort out.

processBatch(batch) is called, that calls the API, gets a response back, and then within there it calls `handleResponseDetail(response), where the insert is happening. I think the problem is here, if there are any ideas?

this is a code block inside: await call_api_using_rows_from_function_above.main();

It completes with no errors, inserts rows, and commits, then the next function is called, and this next function finds no rows (inserted here). But the await on the entire main() .js blocks and waits, so I don't understand.

/**
 * API call, and within call handleResponse which does the DB insert.
 * @param batch
 * @returns {Promise<*>}
 */
async function processBatch(batch) {
    console.log('Processing batch');
    return await client.send(batch).then(res => {
        return handleResponseDetail(res);
    }).catch(err => handleError(err));
}

// should this be async?
function handleResponseDetail(response) {

    response.lookups.forEach(async function (lookup) {

        if (typeof lookup.result[0] == "undefined") {   // result[0] is Candidate #0
            ++lookup_fail;
            console.log('No response from API for this address.')
        } else {
            ++lookup_success;

            const id = await insert(lookup);
        }
    });
}

2 comments

@Klaycon 2019-12-02 23:03:59

Given the code block from your Part 2 edit, the problem is now clear: all of your insert()s are being scheduled outside of the blocking context of the rest of your async/await code! This is because of that .forEach, see this question for more details.

I've annotated your existing code to show the issue:

function handleResponseDetail(response) { //synchronous function

    response.lookups.forEach(async function (lookup) { //asynchronous function
        //these async functions all get scheduled simultaneously
        //without waiting for the previous one to complete - that's why you can't use forEach like this
        if (typeof lookup.result[0] == "undefined") {   // result[0] is Candidate #0
            ++lookup_fail;
            console.log('No response from API for this address.')
        } else {
            ++lookup_success;

            const id = await insert(lookup); //this ONLY blocks the inner async function, not the outer `handleResponseDetail`
        }
    });
}

Here is a fixed version of that function which should work as you expect:

async function handleResponseDetail(response) {

    for(const lookup of response.lookups) {

        if (typeof lookup.result[0] == "undefined") {   // result[0] is Candidate #0
            ++lookup_fail;
            console.log('No response from API for this address.')
        } else {
            ++lookup_success;

            const id = await insert(lookup); //blocks handleResponseDetail until done
        }
    }
}

Alternatively, if the order of insertion doesn't matter, you can use Promise.all for efficiency:

async function handleResponseDetail(response) {

    await Promise.all(response.lookups.map(async lookup => {

        if (typeof lookup.result[0] == "undefined") {   // result[0] is Candidate #0
            ++lookup_fail;
            console.log('No response from API for this address.')
        } else {
            ++lookup_success;

            const id = await insert(lookup);
        }
    })); //waits until all insertions have completed before returning
}

To reiterate, you cannot easily use .forEach() with async/await because .forEach() simply calls the given function for each element of the array synchronously, with no regard for awaiting each promise before calling the next. If you need the loop to block between each element, or to wait for all elements to complete processing before returning from the function (this is your use case), you need to use a different for loop or alternatively a Promise.all() as above.

@user10664542 2019-12-03 01:01:01

OK, making those changes, based on the last response resolved the problem (using the alternate looping mechanism), I will test/explore the second solution, Promise.all() as well. All responses were helpful to fix and refine the code, and I learned a few things. This really saved me a lot of time, and I was able to move on to other things to meet a deadline, and would like to send a cash reward for your thorough response (@Klaycon), sometime before Christmas (any Crypto that you accept), or by any other means. Thank you.

@Max 2019-12-02 20:10:13

What your main function currently does is merely creating stream, assigning listeners and instantly returning. It does not await for all the listeners to resolve like you are trying to have it do

You need to extract your file reading logic to another function, which will return a Promise that will resolve only when the entire file is read, then await for that Promise inside main

function getCsvList(csvFilePath) {
  return new Promise((resolve, reject) => {
    const csvList = []
    fs.createReadStream(csvFilePath)
      .pipe(csv())
      .on('data', (data) => csvList.push(data))
      .on('end', () => {
        resolve(csvList)
      })
      .on('error', (e) => reject(e))
  })
}

module.exports = {
  main: async function (csvFilePath) {
    try {
      const csvList = await getCsvList(csvFilePath)
      await insert_rows_to_db_from_csv.main(csvList);
      await call_api_using_rows_from_function_above.main();
      await work_with_rows_updated_in_previous_call_above.main();
      await and_so_on.main();
      await and_so_on_and_on.main();
      await and_so_on_and_on_and_on.main();
    } catch (err) {
      console.log(err.stack)
    } finally {
    }
  }
};

@user10664542 2019-12-02 22:04:46

This is a much better design, cleaner. Thank you. I have updated my code to reflect the example you have given, and I believe this fixed one problem, it does block before going to the next step,but it still fails in a function call dependent on a previous call to complete inserting rows and commit.

@user10664542 2019-12-02 22:06:25

In step one in my example above, after reading a csv, inserting to the DB (success there), I iterate through those rows, call a 3rd party API, and populate another database table from the response with new rows, and that code executes, and inserts new rows (no failures), and that step blocks and completes, I traced through the try/finally{} to verify, and I am issuing an explicit commit in the finally{}, but the new rows inserted are not visible to the next step in the process. And also, Postgres by default is explicit autocommit, unless turned off, so my commit is redundant.

@Klaycon 2019-12-02 22:36:13

@user10664542 It seems more likely given the information about a 3rd party API that at some point you've got a loose callback/promise escaping from the "synchronous" context of async/await. By that I mean a typical node-style callback not wrapped in a promise or a promise not awaited. This would cause a potential race condition leading to out-of-order queries, causing effects identical to the ones you're observing

@user10664542 2019-12-02 22:43:05

I updated another code block, where I think the problem is (something amiss there, I feel - but not sure what ), in section beginning PART 2 above. I feel I'm close to the problem there.

Related Questions

Sponsored Content

11 Answered Questions

[SOLVED] How to safely call an async method in C# without await

11 Answered Questions

[SOLVED] Call async/await functions in parallel

2 Answered Questions

21 Answered Questions

[SOLVED] How and when to use ‘async’ and ‘await’

6 Answered Questions

[SOLVED] HttpClient.GetAsync(...) never returns when using await/async

1 Answered Questions

[SOLVED] Transactions with node pg and foreach loop and async await

2 Answered Questions

[SOLVED] Typescript async/await not working

2 Answered Questions

[SOLVED] Async with await do not work. Why?

Sponsored Content