(* Immutable iterators. As in Chapter 4 of the poly. *)

(* I am now using the word "cascade" because it is shorter than
   "immutable iterator" and suggests a cascade of elements.
   I am now using "cascade_now" instead of "head" to suggest a
   cascade whose first element has been demanded and is available now. *)

type 'a cascade_now =
| Nil
| Cons of 'a * 'a cascade

and 'a cascade =
  unit -> 'a cascade_now

(* ------------------------------------------------------------------------ *)

(* Constructors. *)

let debug = ref true

let nil = Nil

let cons who x xs =
  if !debug then
    Printf.printf "%s: producing %d\n" who x;
  Cons (x, xs)

(* The debug message inside [cons] helps see when elements are produced.
   In this message, we assume that [x] is an integer. This makes [cons]
   specialized to cascades of integers. Otherwise, it would work with
   any element type, of course. *)

(* ------------------------------------------------------------------------ *)

(* Printing. *)

(* [print n xs] prints up to [n] elements of the cascade [xs]. For ease of
   use, it is specialized to cascades of integers. *)

let print_int x =
  Printf.printf "%d\n" x

let rec print n xs =
  if n > 0 then
    match xs() with
    | Nil ->
        ()
    | Cons (x, xs) ->
        print_int x;
        print (n - 1) xs

(* ------------------------------------------------------------------------ *)

(* Producers. *)

let rec interval (j : int) (k : int) : int cascade =
  fun () ->
    if j < k then
      cons "interval" j (interval (j + 1) k)
    else
      nil

let rec from (j : int) : int cascade =
  fun () -> cons "from" j (from (j + 1))

(* Examples. *)

let () =
  print 10 (interval 20 25)

let () =
  print 10 (from 20)

let from20 =
  from 20

let () =
  print 10 from20

let () =
  print 10 from20

(* ------------------------------------------------------------------------ *)

(* A consumer. *)

let rec find (p : 'a -> bool) (xs : 'a cascade) : 'a option =
  match xs() with
  | Nil ->
      None
  | Cons (x, xs) ->
      if p x then
        Some x
      else
        find p xs

(* Example. *)

let m : int option =
  find (fun x -> x mod 7 = 0) (from 33)

(* ------------------------------------------------------------------------ *)

(* Transformers. *)

let rec map (f : 'a -> 'b) (xs : 'a cascade) : 'b cascade =
  fun () ->
    match xs() with
    | Nil ->
        Nil
    | Cons (x, xs) ->
        cons "map" (f x) (map f xs)

let rec sum accu (xs : int cascade) : int cascade =
  fun () ->
    match xs() with
    | Nil ->
        Nil
    | Cons (x, xs) ->
        let accu = accu + x in
        cons "sum" accu (sum accu xs)

let rec zip (xs : 'a cascade) (ys : 'b cascade) : ('a * 'b) cascade =
  fun () ->
    match xs(), ys() with
    | Nil, _
    | _, Nil ->
        Nil
    | Cons (x, xs), Cons (y, ys) ->
        Cons ((x, y), zip xs ys)

(* ------------------------------------------------------------------------ *)

(* Examples. *)

(* A pipeline. Here, we connect a producer, a transformer, and a consumer. *)

let m : int option =
  find (fun x -> x mod 7 = 0) (map (fun x -> 2 * x) (from 33))

(* The same pipeline as above. *)

let m : int option =
  from 33
    |> map (fun x -> 2 * x)
    |> find (fun x -> x mod 7 = 0)

(* Another pipeline. *)

let m : int option =
  from 0
    |> sum 0
    |> find (fun s -> s >= 50)

(* Another pipeline, where [xs] is used twice, so each of its elements
   is requested and computed twice. *)

let m : (int * int) option =
  let xs = from 0 in                (* one producer *)
  zip xs (xs |> sum 0)              (* used twice *)
    |> find (fun (x, s) -> s >= 50)

(* The search takes constant space, even though it allocates many closures
   in the heap (which become unreachable immediately after they are called). *)

let m : (int * int) option =
  debug := false; (* turn off the debugging messages for this example *)
  let m =
    let xs = from 0 in
    zip xs (xs |> sum 0)
      |> find (fun (x, s) -> x >= 32 * 1024 * 1024)     (* 32 million *)
  in
  debug := true;
  m

(* Of course, a non-modular, imperative version of the same computation is
   about 10x faster. The speed difference between the two styles would be
   less dramatic if producing the next element actually demanded a non-trivial
   computation. *)

let m : int * int =
  let x = ref 0
  and s = ref 0 in
  while (!x < 32 * 1024 * 1024) do
    s := !s + !x;
    x := !x + 1
  done;
  !x, !s

(* ------------------------------------------------------------------------ *)

(* Converting a cascade to a mutable iterator. *)

let cascade_to_iterator (xs : 'a cascade) : unit -> 'a option =
  let current = ref xs in
  fun () ->
    match (!current)() with
    | Nil ->
        None
    | Cons (x, xs) ->
        current := xs;
        Some x

let it =
  cascade_to_iterator (from 0)

let m : int option =
  it()

let m : int option =
  it()

(* ------------------------------------------------------------------------ *)

(* Producing a cascade of the elements of a binary tree, in infix order. *)

type 'a tree =
| Leaf
| Node of 'a tree * 'a * 'a tree

let rec elements (t : 'a tree) (accu : 'a cascade_now) : 'a cascade =
  fun () ->
    elements_now t accu

and elements_now (t : 'a tree) (accu : 'a cascade_now) : 'a cascade_now =
  match t with
  | Leaf ->
      accu
  | Node (t0, x, t1) ->
      elements_now t0 (Cons (x, elements t1 accu))

let elements t =
  elements t nil

let t : int tree =
  Node(Node(Leaf, 1, Leaf), 2, Node(Node(Leaf, 3, Leaf), 4, Leaf))

let () =
  print 5 (elements t)

(* ------------------------------------------------------------------------ *)

(* Duplicated computation. *)

(* The definition of [from33double] causes no computation. *)

let from33double =
  from 33
    |> map (fun x -> 2 * x)

(* But if we use [from33double] twice, then its elements are computed
   twice: *)

let m : int option =
  from33double |>
      find (fun x -> x mod 7 = 0)

let m : int option =
  from33double |>
      find (fun x -> x mod 13 = 0)

(* Naively, one might wish to avoid this repeated computation by transforming
   the cascade to a list first. The following code works for finite cascades: *)

let rec unfold (xs : 'a cascade) : 'a list =
  match xs() with
  | Nil ->
      []
  | Cons (x, xs) ->
      x :: unfold xs

(* However, the cascade [from33double] is infinite, so the following call
   would loop. Try it -- you will get a Stack_overflow exception. *)

(*
let _ =
  debug := false;
  unfold from33double
 *)
let () =
  debug := true

(* ------------------------------------------------------------------------ *)

(* The above considerations explains why we introduce streams, where the
   computation of the next element is not only delayed, but also memoised. *)

type 'a stream_now =
| Nil
| Cons of 'a * 'a stream

and 'a stream =
  'a stream_now Lazy.t

let force = Lazy.force

(* ------------------------------------------------------------------------ *)

(* Constructors. As above. *)

let nil = Nil

let cons who x xs =
  if !debug then
    Printf.printf "%s: producing %d\n" who x;
  Cons (x, xs)

(* ------------------------------------------------------------------------ *)

(* Printing. As above. *)

let rec print n xs =
  if n > 0 then
    match force xs with
    | Nil ->
        ()
    | Cons (x, xs) ->
        print_int x;
        print (n - 1) xs

(* ------------------------------------------------------------------------ *)

(* Producers. As above, except delaying now uses [lazy (...)]. *)

let rec interval (j : int) (k : int) : int stream =
  lazy (
    if j < k then
      cons "interval" j (interval (j + 1) k)
    else
      nil
  )

let rec from (j : int) : int stream =
  lazy (
    cons "from" j (from (j + 1))
  )

(* Examples. *)

let () =
  print 10 (interval 20 25)

let () =
  print 10 (from 20)

let from20 =
  from 20

let () =
  print 10 from20

let () =
  print 10 from20 (* note: no new computation! *)

let () =
  print 20 from20 (* note: new computation only from 30 and on. *)

(* ------------------------------------------------------------------------ *)

(* A consumer. As above, except forcing now uses [force]. *)

let rec find (p : 'a -> bool) (xs : 'a stream) : 'a option =
  match force xs with
  | Nil ->
      None
  | Cons (x, xs) ->
      if p x then
        Some x
      else
        find p xs

let m : int option =
  find (fun x -> x mod 7 = 0) (from 33)

(* ------------------------------------------------------------------------ *)

(* Transformers. *)

let rec map (f : 'a -> 'b) (xs : 'a stream) : 'b stream =
  lazy (
    match force xs with
    | Nil ->
        Nil
    | Cons (x, xs) ->
        cons "map" (f x) (map f xs)
  )

let rec sum accu (xs : int stream) : int stream =
  lazy (
    match force xs with
    | Nil ->
        Nil
    | Cons (x, xs) ->
        let accu = accu + x in
        cons "sum" accu (sum accu xs)
  )

let rec zip (xs : 'a stream) (ys : 'b stream) : ('a * 'b) stream =
  lazy (
    match force xs, force ys with
    | Nil, _
    | _, Nil ->
        Nil
    | Cons (x, xs), Cons (y, ys) ->
        Cons ((x, y), zip xs ys)
  )

(* ------------------------------------------------------------------------ *)

(* A pipeline. As above. *)

let m : int option =
  from 33
    |> map (fun x -> 2 * x)
    |> find (fun x -> x mod 7 = 0)

(* Here, we see memoisation at work. When we request a prefix of the
   stream [from33double], some elements are produced, and are memoised.
   Later, when we request a longer prefix of this stream, the elements
   that been memoised are obtained immediately, without the need for
   re-computing them. *)

let from33double =
  from 33
    |> map (fun x -> 2 * x)

let m : int option =               (* produces 33 to 35 *)
  from33double |>
      find (fun x -> x mod 7 = 0)

let m : int option =               (* searches 33 to 35, without producing them again *)
  from33double |>
      find (fun x -> x mod 7 = 0)

let m : int option =               (* searches 33 to 36, producing just 36 anew *)
  from33double |>
      find (fun x -> x mod 12 = 0)

(* ------------------------------------------------------------------------ *)

(* In the following pipeline, [xs] is used twice, so each of its elements
   is requested twice, yet is computed only once, thanks to memoisation. *)

let m : (int * int) option =
  let xs = from 0 in
  zip xs (xs |> sum 0)
    |> find (fun (x, s) -> s >= 50)

(* ------------------------------------------------------------------------ *)

(* Now, for fun, let's do merge sort. *)

(* We use OCaml's polymorphic comparison operator < so as to simplify things.
   In principle, it would be preferable to parameterize the code over a
   comparison function. *)

(* Merging two sorted streams is just like merging two sorted lists: *)

let rec merge xs ys =
  lazy (
    match force xs, force ys with
    | Nil, Nil ->
        Nil
    | Nil, Cons (y, ys') ->
        cons "merge" y ys'
        (* or: force ys *)
        (* I prefer using [cons] here, even though it is more costly,
           because I want to see a message in debugging mode. *)
    | Cons (x, xs'), Nil ->
        cons "merge" x xs'
        (* or: force xs *)
    | Cons (x, xs'), Cons (y, ys') ->
        if x < y then
          cons "merge" x (merge xs' ys)
        else
          cons "merge" y (merge xs ys')
  )

let s =
  merge
    (interval 0 5)
    (from 2)

let () =
  print 15 s (* note the subtle interleaving of messages *)

let () =
  print 15 s (* memoised, so no more messages *)

(* ------------------------------------------------------------------------ *)

(* [take n xs] is the stream [xs], truncated to length at most [n]. *)

let rec take (n : int) (xs : 'a stream) : 'a stream =
  lazy (
    if n = 0 then
      nil
    else
      match force xs with
      | Nil ->
          nil
      | Cons (x, xs) ->
          cons "take" x (take (n-1) xs)
  )

(* ------------------------------------------------------------------------ *)

(* [drop n xs] is the stream [xs], deprived of its [n] first elements.
   Note that requesting the first element of [drop n xs] causes [n+1]
   elements to be immediately demanded from [xs]. *)

let rec drop (n : int) (xs : 'a stream) : 'a stream_now =
  match n, force xs with
  | 0, c ->
      c
  | _, Nil ->
      Nil
  | n, Cons (x, xs) ->
      drop (n-1) xs

let drop (n : int) (xs : 'a stream) : 'a stream =
  lazy (drop n xs)

(* ------------------------------------------------------------------------ *)

(* The length of a stream. This works for finite streams only. *)

let rec length accu xs =
  match force xs with
  | Nil ->
      accu
  | Cons (_, xs) ->
      length (accu + 1) xs

let length xs =
  length 0 xs

(* ------------------------------------------------------------------------ *)

(* Merge sort. *)

(* In three steps. *)

(* 1. The main recursive function. Here, [n] is the useful length of [xs].
      That is, we ignore any elements beyond the first [n] elements. *)

let rec sort xs n =
  if n < 2 then
    take n xs
  else
    let xs1 = sort             xs      (n/2)
    and xs2 = sort (drop (n/2) xs) (n - n/2) in
    merge xs1 xs2

(* 2. We supply [length xs] as the initial value of [n]. This computation
      evaluates the whole stream [xs] and takes time O(n). *)

let sort xs =
  sort xs (length xs)

(* 3. (For purists.) In principle, one should delay the computation of
      [length xs] until the first element of the sorted list is demanded.
      This can be done by building one last suspension, as follows. *)

let sort xs =
  lazy (force (sort xs))

(* ------------------------------------------------------------------------ *)

(* An application of [sort]. *)

(* As noted above, O(n) computation is required in order to produce just
   the first element of the sorted list. This cannot be avoided, since
   this element is the minimum element of the input list. During this
   initial computation, [sort] computes the length of the input list
   and builds a binary tree of [merge] nodes. Then, every time one more
   element is demanded, elements flow down this tree (which re-arranges
   itself, as a [merge] transformer disappears once one of its argument
   streams becomes empty). Every element beyond the first one is produced
   in time O(log n). The total cost is O(nlog n). *)

let () =
  print 16 (sort (interval 0 16))
  (* the number of "merge: producing x" messages
     is the number of "1" bits in the number 16-x.
     The total number of "merge:" messages is O(nlog n). *)

(* ------------------------------------------------------------------------ *)

(* Another application of [sort]. *)

(* The minimum element of the list [xs] can be computed in linear time simply by
   building the stream [xs] and demanding its first element. More generally, for
   any [k], one can compute the [k] smallest elements of [xs] in time O(n + klog
   n) simply by demanding the first [k] elements of [sort xs]. *)

let head (xs : 'a stream) : 'a option =
  match force xs with
  | Nil ->
      None
  | Cons (x, _) ->
      Some x

let min xs =
  head (sort xs)

let m : int option =
  min (sort (interval 0 16))

(* ------------------------------------------------------------------------ *)

(* Converting a mutable iterator to a stream. *)

(* Thanks to memoisation, even though the iterator is ephemeral (can be
   used only once), the resulting stream is persistent and can be used
   as many times as desired. Of course, the cost is that all elements
   of the stream are kept in memory as long as there exists a pointer
   to the stream. *)

let rec iterator_to_stream (it : unit -> 'a option) : 'a stream =
  lazy (
    match it() with
    | None ->
        Nil
    | Some x ->
        Cons (x, iterator_to_stream it)
  )

let read filename : unit -> char option =
  let channel = open_in filename in
  fun () ->
    try
      Some (input_char channel)
    with End_of_file ->
      close_in channel;
      None

let s : char stream =
  iterator_to_stream (read "Transcript.ml")

let c =
  Lazy.force s (* gets the first character of this file *)