EIO Redesign
From Erights
As the only implementor of EIO, Kevin Reid feels that the original EIO design is too heavyweight; it effectively requires input streams to have buffers and an elaborate queueing mechanism, and does not map well to POSIX IO operations, thus creating inefficiency and discouraging compositional programming.
XXX Explain whch API variant the last kEIO implements.
E-on-CL currently implements version 2.
Contents |
Redesign goals
- A stream should be able to wrap a POSIX file descriptor without hiding too much of the functionality. In particular, it should be possible to implement, for example, a console-IO process, which must not read from stdin until it actually wants user input (or be suspended if it is a background process).
- It should be possible to write interesting and lightweight wrappers around stream objects.
- It should be possible to interconvert between sequences and streams readily.
- Furthermore, given that we can convert a sequence to an input stream, streams solve another design issue: parallel iteration. Currently, there is no way to iterate over multiple collections in parallel without explicit indexing (because iterate/1 is internal iteration); whereas a group of stream objects can easily be read in parallel.
- Doing this introduces another requirement: It should be possible to iterate over a map, and obtain the key-value pairs.
- Furthermore, given that we can convert a sequence to an input stream, streams solve another design issue: parallel iteration. Currently, there is no way to iterate over multiple collections in parallel without explicit indexing (because iterate/1 is internal iteration); whereas a group of stream objects can easily be read in parallel.
Version 1
This version focused on eliminating complexity which made EIO difficult to implement; for example, available() on InStream as noted.
[My old notes. File date was 2007-03-28.]
Simplified EIO Stream[Element] { to close() :??? to fail(_ :Broken) :??? /** See InStream and OutStream for the specific meanings of this message. */ to available() :IOCount /** A promise for the number of elements that may be read or written before the stream becomes closed, or ALL if there is no such limit. Zero if and only if the stream is closed. */ to remaining() :vow[IOCount] to getElementType() :Is[Element] } InStream[Element] extends Stream[Element] { /** The number of elements which are known to be immediately readable. */ to available() :IOCount to whenAvailable(_ :Thunk) # no count /** Always immediate. In the event that an external IO stream is forced to return less than the maximum, it will not return any more elements until the next turn. */ to read(max :IOCount) :List[Element] to skip(min :IOCount, max :IOCount) :void # to copy(min :IOCount, max :IOCount, destination :OutStream[Element]) :void ... extra methods ... } 2007 note: removed minimum argument to read() as I can't see anything it's good for Supporting available() - obligates all InStreams to read eagerly, or defer reporting availability until a whenAvailable has supplied a specific count; - makes it difficult to support wrapper streams which promptly update their own availability; therefore it has been removed. TODO: figure out how to support grow/shrink wrapper stream (e.g. character coding) with a minimum of eager reading or single-element reads /** A finite buffer is unnecessarily useless, as a client will not likely generate conveniently buffer-sized chunks; furthermore, under the POSIX model the OS-level buffer is of an unknown size and fullness, so a wrapping EIO stream must buffer anyway. * * However, some feedback is useful, to allow the client to stop generating output until the buffer is less full. */ OutStream[Element] { /** Always succeeds; all OutStreams must potentially have an unlimited buffer. */ to write(elements :List[Element]) :void /** An estimate of the best number of elements to supply to the next write. May be ALL. */ to available() :IOCount /** Requires the stream to eventually deliver all elements written so far. Closing implies flushing. */ to flush() :void /** Invokes 'thunk' once, at or after the stream's buffer has emptied to a point defined by the stream. Multiple reactors may be registered simultaneously, and are invoked in FIFO order. * * 'hint', which may be entirely ignored, requests that the point be when 'available' would return either at least that value, or its current maximum. */ to whenAvailable(hint :nullOk[IOCount], reactor :Thunk) } PeekableStream extends InStream /** whenever a parent stream is advanced past the child stream, the child becomes invalid. */ to peek() :PeekInStream RandomAccess to get(_ :Position) :RandomAccessStream RandomAccessStream extends Stream to position() :Position InStream impl to read(min, max) to interest() # ??? necessary? InStream backend to resolveRemaining(_ :IOCount) to setInnerAvailable(_ :IOCount) :void OutStream impl to write(_ :List[Element]) :??? OutStream backend to resolveRemaining(_ :IOCount) :void to setInnerAvailable(_ :IOCount) :void --- Usage experiments def join(in, out) { def loop() { in.whenAvailable(fn { out.write(in.read(out.available())) null }) } loop() } def transformInput(f, factor, in) { def extraBuffer := [].diverge() def transformed { to close() { in.close() } to fail(p) { in.fail(p) } to whenAvailable(f) { in.whenAvailable(f) } to read(max) { if (extraBuffer.size().aboveZero()) { return extraBuffer.removeRun(0, extraBuffer.size()) } else { def computed := f(in.read(factor * max)) def overrun := computed.size() - max if (overrun > 0) { extraBuffer.append(computed(max)) return computed(0, max) } else { return computed } } } } return transformed }
Version 2: Promises
This version focused on simplifying the stream interfaces sufficiently that there was no need for a generic supertype/wrapper implementation of InStream and OutStream ('impl'/'backend' at the following).
XXX Record what EoCL does here
Version 3a: Dividoids
In order to solve the problem of interacting with arbitrary chunk types, we replace the chunk type guard with an object which provides the interface for working with chunks (now renamed 'segments'). These objects are named dividoids, which name was suggested by a friend with knowledge of category theory.
The Dividoid interface is designed to be highly generic; for example, it could equally be well used to chop up a function representing a real number value varying continuously, as well as collections of discrete objects. This is deliberate, to benefit the fuzzier cases such as text (which you might want to avoid breaking in the middle of, say, a UTF-8 character, or a combining sequence, or a word...)
Current draft dividoid interface:
/** A dividoid is ...XXX finish this * * The Size type should typically support addition, subtraction, and isZero. * * A dividoid should not be stateful. * * This abstraction was designed by Kevin Reid with help from Joe Geldart and ideas from * the Haskell Monoid type class and * <http://conal.net/blog/posts/sequences-segments-and-signals/>. */ interface "org.erights.e.elib.eio.dividoid"[Segment, Size] { to segmentType() :Same[Segment] to sizeType() :Same[Size] /** The empty, or identity, segment. * * Law: d.join(d.empty(), a) is equivalent to a. * Law: d.join(a, d.empty()) is equivalent to a. */ to empty() :Segment /** The size of a segment. When possible, this operation should only be applied to * segments which are the left side of a d.split(); this permits infinite segments. */ to size(e :Segment) :Size /** Join segments to make one segment. * * Law: d.join is associative. * * Law: d.split(d.join(a, b), d.size(a))[0] is equivalent to a. * * Not a law: d.split(d.join(a, b), d.size(a))[1] is equivalent to b. (For example, maps * and sets cannot implement this.) */ to join(segments :List[Segment]) :Segment /** Break this segment up into two segments, where the size of the first is the minimum * of the given size and the size of the given segment. * * Law: def [a,b] := d.split(x, _); d.join(a, b) is equivalent to x. * * Law: If EIO.getALL() is in the range of Size, then d.split(a, ALL) is equivalent to * [a, d.empty()]. */ to split(e :Segment, size :Size) :Tuple[Segment, Segment] XXX split-off-one-element operation for sanity in parallel iteration? }
XXX Expand on this
Rationales:
- The join operation takes a list of segments so that it can use an efficient accumulation procedure and avoid redundant allocations.
Version 3b (not yet coded)
The problem with the Dividoid notion is that it is unnatural to E to have an external object providing the operations. The motivation for it was to allow this functionality to be added without adding many methods to the individual objects. Suppose we discard that and add operations to the objects.
Disadvantages:
- possible clutter in the already-large collection interfaces and naming conflicts.
- no natural object from which to get the empty segment
Advantages:
- Better promise pipelining behavior
- Possible utility as collection methods
- Generalize cdr-pattern and its map version's semantics?
Draft interface:
/** The Size type should typically support addition, subtraction, and isZero. * * An earlier version of this abstraction was designed by Kevin Reid with help from * Joe Geldart and ideas from the Haskell Monoid type class and * <http://conal.net/blog/posts/sequences-segments-and-signals/>. */ interface "org.erights.e.elib.eio.segment"[Size] { /** The size of a segment. When possible, this operation should only be applied to * segments which are the left side of a .segment(); this permits infinite segments. */ to size() :Size /** Append the provided segments to this segment to make one segment. * * Law: desegment is associative among [this]+segments. * * Law: a.desegment([b]).segment(a.size())[0] is equivalent to a. * * Not a law: a.desegment([b]).segment(a.size())[1] is equivalent to b. (For example, maps * and sets cannot implement this.) */ to desegment(segments :List[Segment[Size]]) :Segment[Size] /** Break this segment up into two segments, where the size of the first is the minimum of * the given size and the size of this. * * Law: def [a,b] := x.segment(_); a.desegment([b]) is equivalent to x. * * Law: x.segment(someSize - someSize)[0] = identity, such that identity.desegment([x]) is * equivalent to x. * * Law: If EIO.getALL() is in the range of Size, then a.segment(ALL) is equivalent to * [a, a.segment(zero)[0]]. */ to segment(size :Size) :Tuple[Segment[Size], Segment[Size]] /** Return a segment of size zero. * * XXX This is redundant with .segment(someSize-someSize) and could be removed. * * This is an identity element. If a is a compatible segment, * Law: this.withoutAll().desegment([a]) is equivalent to a. * Law: this.desegment([a.withoutAll()]) is equivalent to this. */ to withoutAll() :Segment[Size] }
Plans:
- If an ordinary finite collection (List, Map, etc) is coerced to a Segment, have it return an object specialized for being an efficient iterator.
- But wait, then segments aren't ordinary collections. Implementation-switching, or coerce back?
Design TODO:
- Figure out how this supports parallel iteration. We need an analogue to segment(1) => [1-element-collection, rest-of-items], produces [key, value, rest-of-items] (the key and value being what iterate/1 gives its AssocFunc). This could be done by splitting out a 1-element segment and getting key and value from that, or by a method that actually yields that 3-tuple.
Discussion
Voice discussion, kpreid/MarkM, 2011-02-08
- MarkM generally likes the Segment interface's design - we went over it in detail
- MarkM asks whether Streams can be replaced with Segment (given segment(s)[1] returning a promise)
- kpreid: InStream has close/fail which is upstream messaging
- kpreid: InStream has explicitly asking for input which is different from a promise for the rest (promises are expected to resolve whether or not you look at them)
- kpreid: Also supports different cooperation patterns: time sequence of read ops, rather than passing around the tail
- To be discussed later: parallel iteration.
- MarkM asks whether Streams can be replaced with Segment (given segment(s)[1] returning a promise)