Pipeline Language

· 1266 words · 6 minute read

Pipeline is a language idea that filters data through a pipeline of functions.

A simple pipeline might be System.StdIn -> System.Command(wc -l) -> System.StdOut that takes a line in, reads the number of characters, and spits out the number of characters in the line. True, you can do this with bash.

Pipeline is strongly typed. Let's say we want to implement the above newline delimited, character counting functionality.

The data enters as a stream. Let's imagine we have the following string coming into StdIn:

"this\nis\na\nnewline\ndelimited\nstring"

The first thing we need to do is split this up. The characters march in from left to right, so we need to consume characters until we hit a newline. We'll need to consume more than one character in a stream in order to make this work. What we need is an accumulator that consumes more than one character and spits out a string when it hits a newline.

A function that takes a character, does something, and produces a character would look like this:

def fn_name(char c) -> char = c + 1

But what we need is something that will take as many characters from the pipeline as possible and only spit a string out when it hits the correct condition. Now would be a good time to talk about the pipes between functions.

In the above function, we take only a single character as input. The function "call" is applied only to the next object in the queue, in this case a single character. So the output type of queue has to match the input type of the function. But now we need a way for a function to say "hey, call me again". Or maybe we need to allow the functions to read indiscriminantly from the queues? That doesn't seem like a good idea. But what if each function had an input and output queue? Then functions could mangle their own queue without disrupting other functions.

In a block diagram it might look something like this:

                      Input Queue                  ┌──────────────┐           Output Queue
                ┌───┬───┬────────────┬───┬───┐     │              │     ┌───┬───┬────────────┬───┬───┐
                │   │   │            │   │   │     │ Fn A         │     │   │   │            │   │   │
         ┌─────►│   │   │  ...       │   │   ├────►│              ├────►│   │   │    ...     │   │   ├─────┐
         │      │   │   │            │   │   │     │              │     │   │   │            │   │   │     │
         │      └───┴───┴────────────┴───┴───┘     │              │     └───┴───┴────────────┴───┴───┘     │
         │                                         └──────────────┘                                        │
         │                                                                                                 ▼

 From Message Bus                                                                                   To Message Bus

But wait, what stops us from having arbitrarily many queues per function?

         ...                                                     ...



      Input Queue                                             Output Queue
┌───┬───┬────────────┬───┬───┐                          ┌───┬───┬────────────┬───┬───┐
│   │   │            │   │   │                          │   │   │            │   │   │
│   │   │  ...       │   │   ├──┐                   ┌──►│   │   │    ...     │   │   │
│   │   │            │   │   │  │                   │   │   │   │            │   │   │
└───┴───┴────────────┴───┴───┘  │                   │   └───┴───┴────────────┴───┴───┘
                                │                   │
      Input Queue               │  ┌──────────────┐ │         Output Queue
┌───┬───┬────────────┬───┬───┐  │  │              │ │   ┌───┬───┬────────────┬───┬───┐
│   │   │            │   │   │  │  │ Fn A         │ │   │   │   │            │   │   │
│   │   │  ...       │   │   ├──┼─►│              ├─┼──►│   │   │    ...     │   │   │
│   │   │            │   │   │  │  │              │ │   │   │   │            │   │   │
└───┴───┴────────────┴───┴───┘  │  │              │ │   └───┴───┴────────────┴───┴───┘
                                │  └──────────────┘ │
      Input Queue               │                   │         Output Queue
┌───┬───┬────────────┬───┬───┐  │                   │   ┌───┬───┬────────────┬───┬───┐
│   │   │            │   │   │  │                   │   │   │   │            │   │   │
│   │   │  ...       │   │   ├──┘                   └──►│   │   │    ...     │   │   │
│   │   │            │   │   │                          │   │   │            │   │   │
└───┴───┴────────────┴───┴───┘                          └───┴───┴────────────┴───┴───┘


         ...                                                     ...

But how do we specify where one function's output gets piped to if we have arbitrarily many input queues? Maybe we make input/output queues addressable somehow. The first thing that comes to mind is dot notation to specify a specific input queue. We might have something like this fn_A.output1 -> fn_B.input97. Almost like Linux file descriptors, except ours would be an object or an address or something like that. This notation leaves a lot to be desired. For example, something like fn_A.input1 -> fn_B.input1 could be reasonable in a user's eyes, after all both inputs and outputs are dot accessible.

I got a little side-tracked here. Our goal is to parse a stream of characters and produce a stream of strings that display the number characters per line. With per-function queues this should be too hard. What is our main function? It take a stream of characters on stdin and produces a stream of strings on stdout. Our type definition could look like this:

fn main : stream<char> -> stream<int>

This reads as "function main takes a stream of characters as input and produces a stream of strings as an output". I don't know if this is too far from Algol to be useful to the average reader.

Our function definition could look like this (brackets? semicolon and indentation?! I don't know!!):

fn main : stream<char> -> stream<int> =
  (parse_line -> count_chars)*

That star? That's equivalent to a regular expression 'zero-or-more' modifier. Also similar to how EBNF (baccus-naur grammar format, or extended baccus-naur) uses asterisks. The function parse_line will run at least once. It will either succeed or fail. If the first run succeeds, the it will run again, and so on until there are no more lines to parse. I'm not sure the asterisk is a worthwhile convention here, given how the idea of a pipeline works. We assume a function will act on an item whenever it enters the queue, so to have some notation that says "do this more than once" doesn't seem to fit. Or does it? I might be useful to concatenate functions together that consume an item in the queue and then pass the remainder of the queue to another function. I'm starting to smell Lisp in here somewhere.

Then there's that paren and pipe thing going on: (parse_line -> count_chars). I suppose I could have made this another function definition, but why not have lambdas? Unnamed, inline functions can be fun, right? This defines a nameless function that parses a line, then passes the resulting string to a function that counts the characters in the string.

Well, here's our example of concatenation. You'll see the type definition reads stream<char> -> string, so a single execution of this function takes in a stream and produces just one string.

fn parse_line : stream<char> -> string =
  parse_char* + parse_newline |
  parse_newline

Our final functions are pretty simple to implement.

fn parse_char : char -> char | None =
  if input is char:
    return char
  else:
    return None

fn parse_newline : char -> char | None =
  if input is "\n":
    return char
  else:
    return None

fn count_chars : string -> int =
  _output = 0
  for character in input:
    _output += 1
  return _output

I added an additional convention in the type signature. I don't know if it is pedantic, clear, or useful, but there it is: char -> char | None. That is, the function returns a character OR None. I suppose we could use exceptions for control flow, but this has been done (I can't find the link on HackerNews. If you know the language that uses exceptions for control flow entirely I'd love to get a link from you.

In our base functions are basic Python style logic flow. I'd love to replace it with something that fits into the "parser" or "pipeline" syntax thing I've got going, but my brain is tired.

If you made it this far, thanks for taking the time to read my meanderings. God bless you.