#!/bin/bash

# Create a temporary directory - use the TMP environment variable
# to use a different base dir than /tmp.
temp_root=${TMP:-/tmp}
temp_dir=$(mktemp --tmpdir=${temp_root} --directory)
trap "trap - EXIT ; rm -fr ${temp_dir} ; exit 0" EXIT INT TERM

# Define the MR pipelines.
pipelines=(
    "song:mr_davfs"
    "song:mr_stats"
    "tasks:mr_clean_tasks --max-age=604800"
    "playlog:mr_markov"
)


# Dump a bucket and cache the result.
dump_bucket() {
    local bucket="$1"
    local bucket_file=${temp_dir}/${bucket}.keys
    if [ -e ${bucket_file} ]; then
        cat ${bucket_file}
    else
        mr_dump_bucket --bucket=${bucket} \
            | tee ${bucket_file}.temp
        mv -f ${bucket_file}.temp ${bucket_file}
    fi
}

# Run a mapreduce (with data on standard input).
run_mr() {
    local mrname="$1"
    shift
    local outputdir=${temp_dir}/${mrname}
    mkdir -p ${outputdir}
    (cd ${outputdir} && ${mrname} --mapreduce "$@")
}

# Run the pipelines sequentially.
for pl in "${pipelines[@]}" ; do
    bucket="${pl%:*}"
    mr="${pl#*:}"
    dump_bucket ${bucket} | run_mr ${mr}
done