LJ Archive

Coroutines and Channels in C Using libmill

Amit Saha

Issue #268, August 2016

Want to try a different approach to writing concurrent applications in C? This article looks at libmill, a library that brings Go-style concurrency to C.

libmill is a C library that brings Golang-style concurrency to C. Using it, you can call a function like f(arg1, arg2) using go(f(arg1, arg2)), and the function will be executed in a separate coroutine. If you have multiple coroutines executing, libmill's scheduler takes care of scheduling the coroutines. Data will be passed to and from coroutines using channels. In this article, I introduce the key libmill features—creating coroutines and using channels to pass data to and from them. In addition, I take a look at some of libmill's other convenience functions that make it more than a coroutine and channels library.

Installing libmill

libmill currently is considered stable. Its latest release at the time of this writing is version 1.8 (released in March 2016). To install it, download the gzipped tarball from libmill.org/download.html, extract it and do the following steps:

$ ./configure
$ make
$ sudo make install

Note that you likely will need to run sudo ldconfig on Ubuntu/Debian, and manually add /usr/local/lib to a file in /etc/ld.so.conf.d/, and then run sudo ldconfig on Fedora to be able to link to the libmill shared library when compiling your programs.

Hello Coroutine

Now that you've installed libmill, let's write a first example of using coroutines in programs (Listing 1a).

Let's compile and run this program:

$ gcc -o listing1 listing1.c -lmill
$ ./listing1
Worker 1
Worker 2
Worker 3
Worker 4
Worker 5
Worker 6
Worker 7
Worker 8
Worker 9
Worker 10

The function, f() in Listing 1a is defined almost like an ordinary C function. The only difference is the coroutine specifier at the beginning:

coroutine void f(int index)

The coroutine specifier tells libmill that you plan to call this function in a coroutine using the go() construct, as you saw in the main() function in Listing 1a:

go(f(i));

Here, you call the function, f(), in a coroutine ten times, one after the other. The reason you see the above output in the same order (as opposed to any other order) as starting the coroutines is that the program runs in a single process. Given that though, if libmill finds that a coroutine implicitly or explicitly can be scheduled out and another coroutine is ready to run, the latter will start running.

libmill provides a function called msleep() that can be used to tell libmill's scheduler explicitly that it wants to sleep and let other runnable processes run. This function will allow you to see libmill's scheduling in action (Listing 1b).

When you compile and run this code, you'll see output where the coroutines are no longer executed in the order they are started:

$ gcc -o listing1-msleep listing1-msleep.c -lmill
$ ./listing1-msleep
Worker 1
Worker 4
Worker 10
Worker 6
Worker 3
Worker 9
Worker 8
Worker 5
Worker 7
Worker 2

The key statement in Listing 1b is msleep(now() + rand() % 50 ); in the function f(). When you use go(f(i)) to start the function f() in a coroutine, it first goes to “sleep” using the msleep() function. This tells the libmill scheduler that it can schedule other coroutines in the meantime.

Unlike the standard library's sleep() function, the argument to the msleep() function is a “deadline”—a libmill concept that means you give a function a deadline in time after which libmill will continue its execution. The now() function returns the current time in milliseconds, so msleep(now() + rand() % 50) sets the deadline to a random time between 0 and 50 microseconds for each coroutine. You'll also want to wait for all the coroutines to finish in the main() function, so insert the statement msleep(now() + 60) in it.

One point to mention here is if you instead use the sleep() function, the entire thread of execution will block, and no scheduling will be possible among the coroutines. This is the reason libmill comes with its own set of non-blocking alternatives for input/output operations, such as tcprev(), mfread() and others.

Using Channels for Communication

Channels are both a messaging as well as a synchronization mechanism when working with coroutines. A channel is unidirectional and typed. When creating a channel, you have to specify the type of data it will carry. Using a channel, you can send data to a coroutine, and then in the sending coroutine, wait to receive data back on a different channel.

Next, let's look at an example of using a channel to send work to a coroutine, wait for it to process the work and send the result back over another channel. The work here is simple. You pass the first command-line argument to the program and send it to the coroutine, which returns the length of the string (Listing 2).

In the main() function, create two channels, one for input and another for output, and call them input and output, respectively:

chan input = chmake(char*, 0);
chan output = chmake(int, 0);

chan is a type defined by libmill to represent channels. A channel is created using the chmake() function. The first argument to chmake() is the type of data it will carry, and the second argument is its size. The default behaviour of channels is that a sender will block until there is a receiver and vice versa. When the size of the channel is 0, it is called an unbuffered channel, and it will not allow any data to be written in either of the scenarios. When the size of the channel is a non-zero number, it allows as many messages to be sent to it before blocking.

The two channels created above are unbuffered channels. This suits the purpose here since you want the coroutine to wait until you send it some work to do on the input channel, and then you want to wait for the coroutine to send you the result back in the output channel.

The coroutine function worker() accepts the two channels as parameters: chan input and chan output. When it starts, you use the chr(input, char*) function (chr stands for “ch”annel “r”eceive) to read a char* data item from the input channel. Until there is a data item available to be read, worker() blocks. Once there is a data item to read, it creates a copy of it in work, then uses the chs(output, int, strlen(work)) function (chs stands for “ch”annel “s”end) to write the length of the string to the output channel.

Let's compile and run the above program:

$ gcc -o listing2 listing2.c 
$ ./listing2
Please specify one command line argument
$ ./listing2 hello
Processing: hello
Result: 5

In the main() function, you start the coroutine using go(worker(input, output)). Next, you write the string argv[1] to the input channel using the chs() function. Next, you use the chr() function to read the data on the output channel and print it. Note, how you don't have to worry about having to wait explicitly before you read from the output channel. It is all taken care of for you automatically. Finally, you close both the channels, which frees the resources:

chclose(input);
chclose(output);

Writing a Coroutine-Based TCP Server

libmill comes with a number of helper functions that are customized for writing coroutine-based network server programs. Next, I describe how to write a simple TCP server that handles each client in a separate coroutine (Listing 3).

The function handler() handles a new client connection. It accepts a parameter of type tcpsock as an argument and will be used to represent a new client connection. It prints a string “New connection!” and closes the client connection using the tcpclose() function.

Let's now look at the main() function that creates the server. The following two statements create a listening socket:

ipaddr addr = iplocal(NULL, port, 0);
tcpsock server = tcplisten(addr, 10);

The iplocal() function is used to convert a human-friendly IP address to listen on to an address of the ipaddr type. The first argument to the function is the IP address or network interface to listen on; specifying NULL will make the server listen on all local network interfaces. The second argument to the function is the port to listen on, and the third function specifies whether you want an IPv4 or an IPv6 address. Leaving it as 0 defaults to IPv4 for now.

Next, you call the tcplisten() function to create the listening socket. The first argument to tcplisten() is the address you obtained in the previous step. The second argument is the backlog—the maximum number of incoming connections that have not yet been accepted, but will not be refused. It returns a value of type tcpsock representing the listening socket.

If for some reason the tcplisten() function cannot set up the listening socket, it returns NULL and sets errno to an error code specifying the reason it failed. Hence, you check for that and use the perror() function to report the error when there is one.

Once you have set up the listening socket correctly, you create an infinite loop that will accept an incoming connection using tcpaccept(). The first argument to the tcpaccept() function is server, the listening socket you created earlier. The second argument is a deadline for the function. The deadline is a time in the future up to which tcpaccept() will wait for a client connection to come in before returning. A value of -1 indicates that tcpaccept() will block indefinitely for an incoming connection:

/* Server loop*/
 while(1) {
   tcpsock as = tcpaccept(server, -1);
   if (!as)
     continue;
   /* Dispatch this request */
   go(handler(as));
 }

tcpaccept() returns NULL upon error, so you check for it and start waiting for the next connection, if there was one. If the connection was accepted, you call the handler() function with the accepted connection socket in a coroutine.

Let's compile and run the program:

$ gcc -o listing3 listing3.c -lmill
./listing3
Server listening on 9090

In a different terminal session, you can try using telnet to 127.0.0.1 9090, and you will see “New connection!” being printed for every new client connection.

The repository for this article's code (https://github.com/amitsaha/lj_libmill) contains a more elaborate example of using libmill's functions to create an HTTP server in the directory named hashid_service. It handles each incoming connection in a new coroutine.

Choosing among Multiple Channels

When you have multiple channels and you want to perform an operation when a channel is ready for reading or writing, the choose construct allows you to do that. Listing 4 is an fictional example of when this might be useful. It downloads data from remote URLs specified as command-line arguments, making the request to each URL in a separate coroutine.

This program uses three channels: work, result and error. The work channel is used to send work from the main program to the coroutine, and the coroutine uses the result and error channels to send back the result of a successful request and error, respectively. The channels are created in the main() function with the following code:

chan work = chmake(char*, 0);
chan result = chmake(struct data, 0);
chan error = chmake(struct data, 0);

The first channel, work, is of type char*, and the second and third channels are of type struct data, which is defined earlier in the code as:

struct data {
  char *url;
  char *data;
};

Since you can get the result of processing the URL in any order, you wrap the result of processing in the above structure.

Next, in the main() function, you start as many coroutines as the number of URLs and then use the chs() function to send it the URL to download from. In the f() function, you don't actually attempt to connect to a remote URL, but simulate a scenario where there is some error in downloading 30% of the time, due to a faulty network. If there was an error, you write to the error channel; otherwise, you write to the result channel.

Back in the main() function, you have created all the coroutines, so now you want to just wait and report back on the result of trying to download data from each of the supplied URLs. You do this using the choose construct as follows:


for(int i=1; i<argc; i++) {
    choose {
    in(result, struct data, value):
      printf("Processed URL: %s, Result: %s\n", value.url, value.data);
    in(error, struct data, value):
      printf("Processed URL: %s, Error: %s\n", value.url, value.data);
    end
    }
 } 

The number of data items you expect to have on both channels, result and error, is equal to the number of URLs. Hence, you create a for loop that runs for as many times. In the body of the loop, you create the choose construct.

The first in clause is used to wait for data to be available for reading in the channel result of type struct data. When there is a data item available to be read, a single data item is read and the variable specified, the value is declared by the in clause itself and is used to refer to the data item read from the channel. Similarly, the second in clause is used to wait for data on the error channel. Every choose construct must have an end clause at the end.

When either of the above two activities happens, you use printf() to write the URL processed and the result of the processing. When you compile and run the program, you'll see a result similar to the following:

$ gcc -o listing4 listing4.c -lmill

$ ./listing4  https://raw.githubusercontent.com/amitsaha/
↪lj_libmill/master/listing3/listing3.c 
 ↪https://raw.githubusercontent.com/amitsaha/lj_libmill/
↪master/listing1/listing1-msleep.c

Processed URL: https://raw.githubusercontent.com/amitsaha/
↪lj_libmill/master/listing3/listing3.c, Error: Error 
 ↪retrieving data
Processed URL: https://raw.githubusercontent.com/amitsaha/
↪lj_libmill/master/listing1/listing1-msleep.c, Result: 
 ↪Data at the URL

In addition to the in clause, choose supports a number of other clauses. The out clause can be used to wait until you can write to a channel, ch, and has the syntax out(ch, <data type>, data). The deadline clause allows you to set a deadline in the choose construct—it will fire if no other clause has fired in the time specified as deadline. The otherwise clause will execute if none of the other clauses match.

Using Multiple Processors

By default, a program written using libmill will use only one processor core, even if your computer has multiple processor cores. The approach to using multiple cores is to use libmill's mfork() function to create a new process. Each process then becomes capable of doing the same work on a different processor core. Listing 5 shows an example of using mfork() to create a version of the earlier TCP server that creates multiple processes listening for incoming connections. Each process continues to handle an incoming connection in a separate coroutine as earlier.

When you compile and run the above program, you will see messages like the following, which tells you that you have three processes listening on port 9090:

./listing5
Listening on 9090 (PID: 3760)
Listening on 9090 (PID: 3762)
Listening on 9090 (PID: 3761)

The process tree looks like this:

$ pstree 3760
-+= 03760 amit ./listing5
 |--- 03761 amit ./listing5
 \--- 03762 amit ./listing5

If you try connecting to port 9090 from multiple clients, you will see that the connections will be handled by different processes, which is what you want.

The key change in this listing from the previous server is the following section of code in the main() function:


/* Set up the server processes - the main process is listening 
  as well, so we fork NUM_PROCESSES-1 child processes
*/
for (int i = 1; i < NUM_PROCESSES; i++ ) {
    pid_t pid = mfork();
    /* Child process?*/
    if (pid == 0)
      break;
}

Similar to the fork() function, mfork() creates a new child process and returns 0 in the child process and the child process ID in the parent process. Hence, if the value of the pid you get is 0, you break from the loop; otherwise, you continue until you have forked NUM_PROCESSES-1 number of processes.

Inspecting Coroutines

libmill comes with a function called goredump() that you can use to dump the state of coroutines and channels. You can call it as a function from your program or from the gdb prompt. Let's look at an example of the latter.

Consider the program in Listing 4 that uses multiple channels to send work and receives the results from the coroutines. Let's say you forgot to send any work to the coroutine, because you accidentally removed the statement: chs(work, char*, argv[i]); (Listing 6). When you run the program, it simply will hang, doing nothing. So, let's see what's going on. First, find the process ID of the process and start gdb:

(gdb) attach 2132
..

Now, call the goredump() function and print its result:


(gdb) p goredump()

(gdb) p goredump()

COROUTINE  state                          current            created
----------------------------------------------------------------------
{0}        choose(<2>,<3>)                 ---              <main>
{1}        chr(<1>)                       listing6.c:13     listing6.c:34

CHANNEL  msgs/max    senders/receivers      refs  done  created
---------------------------------------------------------------------
<1>      0/0         r:{1}                  1     no    listing6.c:29
<2>      0/0         r:{0}                  1     no    listing6.c:30
<3>      0/0         r:{0}                  1     no    listing6.c:31

The output of goredump() has two sections: one for coroutines and other for channels.

For each coroutine, it displays the current state—or what it is doing currently, where it is currently executing and where it was created. You can see that the main coroutine (0) is waiting in the choose construct to read from the result and error channels (channel numbers 2 and 3, respectively).

The second coroutine (that is, the worker coroutine) is waiting at the function chr() to read from channel 1 at line number 13. So, that tells you why the program simply hangs—it should tell you that you haven't written anything to channel 1.

The section on channels displays for each channel the number of messages currently in the channel and the maximum number of messages, senders/receiver coroutines, number of references to the channel, whether the sender is done sending to the channel and where it was created. You can see that for channels 2 and 3, both the readers are in coroutine 0 along with the line numbers.

Conclusion

This article looks at some of libmill's key features, including some convenient functions it makes available. A number of other libmill features provide building blocks to write efficient coroutine-driven network programs in C. See the Resources section for some important links to learn more about libmill.

Amit Saha is a software engineer and the author of Doing Math with Python (No Starch Press). He blogs at echorand.me, and you can send him email at amitsaha.in@gmail.com.

LJ Archive