Transducers

From Clojure to C++

















a presentation brought to you by
Juan Pedro
Bolivar Puente

from Ableton

1. What?

"A transducer is a device that converts one form of energy to another. Energy types include (but are not limited to): electrical, mechanical, electromagnetic (including light), chemical, acoustic, and thermal energy. Usually a transducer converts a signal in one form of energy to a signal in another[1] (for example, a loudspeaker driver converts an electric signal to sound), but any variable attenuation of energy may serve as input; for example, the light reflecting off the landscape, although it is not a signal, conveys information that image sensors, one form of transducer, can convert. A sensor is a transducer whose purpose is to sense (i.e. detect) some characteristic of its environs; it is used to detect a parameter in one form and report it in another form of energy, often an electrical signal. For example, a pressure sensor might detect pressure (a mechanical form of energy) and convert it to electrical signal for display at a remote gauge. Transducers are widely used in measuring instruments."

Invented by Rich Hickey Introduced in Clojure 1.7

Research from Ableton on declarative data models

A transducer is a transformation
over a sequential process

     map([](int x) { return std::to_string(x); });
 

A transformation as a value


auto xf = map([](int x) { return std::to_string(x); });
          

 

A composable transformation


auto xf = comp(
    filter([](int x) { return x > 0; }),
    map([](int x) { return std::to_string(x); }));
          

$$comp(f, g) = f \circ g$$ $$(f \circ g)(x) = f(g(x))$$

A transformation independent of the source


auto xf = comp(
    filter([](int x) { return x > 0; }),
    map([](int x) { return std::to_string(x); }));
          

auto data = std::vector<int>{ ... };
auto xformed = sequence(xf, data);
std::copy(xformed.begin(), xformed.end(), ...)
          

run(comp(read<int>(std::cin),
         xf,
         write(std::cout)));
          

auto sig1 = boost::signal<void(int)>{};
auto sig2 = make_signal(xf, sig1);
sig2.connect([] (std::string s) { cout << s << endl; });
sig1(41); sig1(-10); sig1(10) // "41", "10", ...
          





Sequential processes are

not just ranges!

  • An iterator
  • GUI events
  • CSP Channels
  • A network socket

2. How?

"Extract the essence
of map and filter"

template <typename In, typename Out, typename F>
Out transform(In first, In last, Out out, F fn) {
    for (; first != last; ++first) {
        *out++ = fn(*first);
    }
    return out;
}
template <typename In, typename Out, typename P>
Out filter(In first, In last, Out out, P pred) {
    for (; first != last; ++first) {
        if (pred(*first))
            *out++ = *first;
    }
    return out;
}
template <typename In, typename S, typename Rf>
S accumulate(In first, In last, S state, Rf step) {
    for (; first != last; ++first) {
        state = step(state, *first);
    }
    return state;
}
auto output_rf = [] (auto out, auto input) {
    *out++ = input;
    return out;
};
template <typename In, typename Out, typename F>
Out transform(In first, In last, Out out, F fn) {
    return accumulate(first, last, out, [&](auto state, auto input) {
           return output_rf(state, fn(in));
    });
}
template <typename In, typename Out, typename P>
Out filter(In first, In last, Out out, P pred) {
    return accumulate(first, last, out, [&](auto state, auto input) {
          return pred(input) ? output_rf(state, input) : state;
    });
}
auto map = [] (auto fn) {
  return [=] (auto step) {
    return [=] (auto s, auto ...ins) {
      return step(s, fn(ins...));
    };
  };
};
              
auto filter = [] (auto pred) {
  return [=] (auto step) {
    return [=] (auto s, auto ...ins) {
      return pred(ins...)
             ? step(s, ins...)
             : s;
    };
  };
};

// transform(first, last, fn) ==

accumulate(first, last, out,
    map(fn)(output_rf));


            

// filter(first, last, pred) ==

accumulate(first, last, out,
    filter(fn)(output_rf));



            
auto map = [] (auto fn) {
  return [=] (auto step) {
    return [=] (auto s, auto ...ins) {
      return step(s, fn(ins...));
    };
  };
};
              
auto filter = [] (auto pred) {
  return [=] (auto step) {
    return [=] (auto s, auto ...ins) {
      return pred(ins...)
             ? step(s, ins...)
             : s;
    };
  };
};
              



Definitions

  • Reducing function
  • Transducer
  • State
  • Inputs
  • Outputs

Mmmh, smells
like essence of map and filter!










The pipeline goes left to right

      accumulate(first, last, out, comp(filter(pred), map(fn)) (output_rf));accumulate(first, last, out, filter(pred) (map(fn) (output_rf)));
if (pred(s, inputs)) {
inputs = map(inputs);
*out++ = inputs;
}

Atria uses Clojure's nomenclature

  • reduce(step, initial, ranges...)
  • transduce(xform, step, initial, ranges...)
  • into(col, xform, ranges...)
  • into_vector(xform, ranges...)
  • sequence(xform, ranges...)

3. Advanced stuff

1. Variadic transducers

Most transducers have arity 0...n


// 0 inputs, generator

sequence(map(&std::rand));
            

// 1 input

auto v1 = { 1, 2, 3, 4, 5 }
transduce(map([](int x){ return x * 2 }),
          std::plus<>{},
          0,
          v1);
            

// n inputs, zipping...

auto v1 = { 1, 2, 3, 4, 5 }
auto v2 = { 3.3, 2.2, 1.1 }
into_vector(map(plus<>{}), v1, v2);
into_vector(identity, v1, v2);
            

// n outputs

sequence(comp(
  map([] { return make_tuple(rand(), rand(), rand()); }),
  unzip,
  filter([] (int x, int y, int z) { return x<y && y<z; }),
  interleave));
            

2. State

enumerate is a stateful transducer


sequence(enumerate);
// 0, 1, 2, 3, ...

into_vector(enumerate, "abc");
// (0, 'a'), (1, 'b'), (2, 'c')
          

The Clojure way is to hide the state
in the reducing function


auto enumerate = [] (auto step) {
    return [n = 0] (auto s, auto ins...) mutable {
        return step(s, n++, ins...);
    };
};
          

Hidden state makes me sad... 😿

💀😻💀
Mutable lambdas
killed my mommy

state goes here

auto enumerate = [] (auto step) {
    return [n = 0] (auto s, auto ins...) mutable {
        return step(s, n++, ins...);
    };
};

not here!

state_wrapper<
    Tag,
    State,
    Data
>

auto enumerate = [] (auto step) {
    return [=] (auto s, auto ins...) {
        auto n = state_data(s, [] { return 0; });
        auto w = state_unwrap(s);
        return state_wrap(step(w, ins...), n + 1);
    };
};
enumerate step
state
[♦, 1] [♦, 1] [♦, 1] [♦, 1] [♦, 2] [♦, 2]
input
0 0 1 1
template <typename Fn, typename State, typename Range>
auto reduce(Fn step, State state, const Range& range)
{
   auto first = begin(range), last = end(range);
   while (first != end)
       state = step(state, *first++);
   return state;
}

The state type changes on the first iteration

template <typename Fn, typename State, typename Range>
auto reduce(Fn step, State init, const Range& range)
{
   auto first = begin(range), last = end(range);
   if (first != end) {
     auto state = step(init, *first++);
     while (first != end)
         state = step(state, *first++);
     return state_complete(state);
   }
   return init;
}

Filter became
problematic










auto filter = [] (auto pred) {
    return [=] (auto step) {
        return [=] (auto s, auto... ins) {
            return pred(ins...) ? step(s, ins...) : s;
        };
    };
};
filter
enumerate step
state
[♦, 1] [♦, ?]
input
¿♥? ¿♥? ¿♥?
auto filter = [] (auto pred) {
    return [=] (auto step) {
        return [=] (auto s, auto... ins) {
            return pred(ins...) ? call(step, s, ins...)
                                : skip(step, s, ins...);
        };
    };
};
Tries to find a common type for step(s, ins...) and s

Use a union-like type otherwise

3. Early termination

Take needs to terminate early


into_vector(comp(enumerate, take(4)))
// { 0, 1, 2, 3 }

into(string{}, comp(map(tolower), take(4)), "ABCDEG...")
// "abcd"
          
struct take_tag {};

auto take = [] (auto count) {
   return [] (auto step) {
       return [=] (auto s, auto ins...) {
          auto n = state_data(s, [] { return count; });
          auto w = state_unwrap(s);
          return state_wrap<take_tag>(
             step(w, ins...), n - 1);
      };
   };
};

Use tag to define reduced state

template <typename T>
bool state_wrapper_data_is_reduced(take_tag, T n) {
    return n == 0;
}
template <typename Fn, typename State, typename Range>
auto reduce(Fn step, State init, const Range& range)
{
   auto first = begin(range), last = end(range);
   if (first != end) {
     auto state = step(init, *first++);
     while (!state_is_reduced(state) && first != end)
         state = step(state, *first++);
     return state_complete(state);
   }
   return init;
}

4. Batteries included

  • reduce_nested
  • reductor
  • transducer

auto cat = [] (step) {
    return [=] (auto s, auto... ins) {
        return reduce_nested(
            step, s, ins...);
    }
};
                
  • reductor<RF, State, Ins...>
  • operator () (Ins...);
  • operator bool ();
  • current() -> StateT
  • complete() -> State
transducer<int, pack<int, char> >
    xform = comp(
        map([](auto x) {
            return to_string(x); }),
        cat,
        enumerate);
                    
"std::function for transducers"
  cat   dedupe   distinct   drop   drop_while   filter   interpose   mapcat   map   map_indexed   partition_by   partition   random_sample   remove   replace   take   take_nth   take_while
     chain   count   cycle   enumerate   product   range   repeat

Exclusive!

  each   eager   interleave   iter   readbuf   read   sink   traced   transducer   unzip   writebuf   write   zip

5. Performance

template <typename Fn, typename State, typename Range>
auto reduce(Fn step, State init, const Range& range)
{
   auto first = begin(range), last = end(range);
   if (first != end) {
     auto state = step(init, *first++);
     while (!state_is_reduced(state) && first != end) {
         auto last = std::move(state);
         state = step(statestd::move(last), *first++);
     }
     return state_complete(state);
   }
   return init;
}

The good

State and inputs forward through the chain

Small functions, easy to inline

Benchmarks say that most transducers pipelines perform like hand-written code

Often faster than boost.range

The ugly

Type erasure is expensive

Filtering before a stateful transducer has overhead
in the current design

Adapting as iterator has overhead

4. Conclusion

transform
value
composable
independent

pure
fast
wip
std?

Thank
you
very
much

!

Extras





Tail recursive accumulate


template <typename InT, typename StateT, typename FnT>
StateT accumulate(InT first, InT last, StateT state, FnT fn)
{
  return first == last
      ? state
      : accumulate(next(first), last,
                   fn(state, *first),
                   fn);
}
                




C++11 transducer


struct map_rf_gen {
    template <typename ReducingFnT,
              typename MappingT>
    struct apply {
        ReducingFnT step;
        MappingT mapping;

        template <typename State, typename ...Inputs>
        auto operator() (State&& s, Inputs&& ...is)
          -> ABL_DECLTYPE_RETURN(
            step(std::forward<State>(s),
                 estd::invoke(mapping, std::forward<Inputs>(is)...)))
    };
};

template <typename MappingT>
auto map(MappingT&& mapping)
    -> transducer_impl<detail::map_rf_gen, estd::decay_t<MappingT> > {
        return { std::forward<MappingT>(mapping) };
}
                

template<typename ReducingFnGenT,
         typename ...ParamTs>
struct transducer_impl : std::tuple<ParamTs...>
{
    using base_t = std::tuple<ParamTs...>;

    template <typename ...Ts>
    transducer_impl(Ts ...ts)
        : base_t(std::move(ts)...) {}

    template<typename ReducingFnT>
    constexpr auto operator() (ReducingFnT&& step) const
      -> typename ReducingFnGenT::template apply<
        estd::decay_t<ReducingFnT>,
        estd::decay_t<ParamTs>...
      >
    {
        using indexes_t = estd::make_index_sequence<sizeof...(ParamTs)>;
        return this->make(std::forward<ReducingFnT>(step), indexes_t());
    }
          

    template<typename ReducingFnT, std::size_t...indexes_t>
    constexpr auto make(ReducingFnT&& step, estd::index_sequence<indexes_t...>) const
      -> typename ReducingFnGenT::template apply<
        estd::decay_t<ReducingFnT>,
        estd::decay_t<ParamTs>...
      >
    {
        return { std::forward<ReducingFnT>(step),
                 std::get<indexes_t>(*this)... };
    }
};
          




Completion


into_vector(partiton(3), "abcdef")
// {'a', 'b', 'c'}, {'d', 'e', 'f'}

into_vector(partiton(3), "abcd")
// {'a', 'b', 'c'}, {'d'}
                

struct partition_tag {};

auto partition = [] (auto count) {
    return [] (auto step) {
        return [] (auto s, auto... ins) {
            using elem_t = decltype(tuplify(ins...));
            auto data = state_data(s, [] {
                return make_tuple(std::vector<elem_t>{}, step);
            });
            auto& group = get<0>(data);
            auto& step_ = get<1>(data);
            group.push_back(tuplify(ins...));
            auto w = group == count
              ? call(step_, state_unwrap(s), group)
              : skip(step_, state_unwrap(s), group);
            return wrap_state(w, data);
        }
    };
}
                

template <typename T>
auto state_wrapper_complete(partition_tag, T s) {
    auto data = state_wrapper_data(s);
    return state_complete(std::get<1>(data) (
        state_unwrap(s),
        std::get<0>(data)));
}
                




Jokes


The real smell of freaking essence of map and filter!