Timestamp tokens a better coordination primitive for data-processing systems Andrea Lattuada

2025-04-26 0 0 469.12KB 14 页 10玖币
侵权投诉
Timestamp tokens: a better coordination primitive
for data-processing systems
Andrea Lattuada
ETH Zurich
Frank McSherry
Materialize Inc.
Abstract
Distributed data processing systems have advanced
through models that expose more and more opportuni-
ties for concurrency within a computation. The schedul-
ing of these increasingly sophisticated models has be-
come the bottleneck for improved throughput and re-
duced latency.
We present a new coordination primitive for dataflow
systems, the timestamp token, which minimizes the vol-
ume of information shared between the computation
and host system, without surrendering precision about
concurrency. Several projects have now used timestamp
tokens, and were able to explore computational idioms
that could not be expressed easily, if at all, in other
platforms. Importantly, these projects did not need to
design and implement whole systems to support their
research.
1 Introduction
Systems for data-intensive computation have advanced
through programming models that allow programs to re-
veal progressively more opportunities for concurrency.
Frameworks like MPI [2] allow programmers only to
explicitly sequence data-parallel computations. Sys-
tems like DryadLINQ [24] and Spark [25] use data-
dependence graphs to allow programs to express task
parallelism. Stream processors like Flink [9] and Na-
iad [22] (following [1,7,10]) add a temporal dataflow
dimension to represent pipeline parallelism. In each
case, new runtimes extract more detailed information
about the computations, allowing them greater flexibil-
ity in their execution. Figure 1demonstrates the forms
of parallelism that can be expressed in these systems.
Dataflow systems have become limited by the com-
plexity of the boundary between system and computa-
tion. Specifically, as computations provide progressively
more fine-grained and detailed information about con-
currency opportunities, the scalability and sophistica-
tion of the system schedulers must increase. In our ex-
perience, system complexity has increased to the point
that scheduling rather than computation becomes the
bottleneck that prevents higher throughputs and lower
latencies.
System designers have the opportunity to reduce the
volume of coordination by reconsidering the interface
between system and operator. For example, where
Spark Streaming [26] must schedule distinct events
to implement distinct logical times, Flink (and other
stream processors) allow operators to retire batches of
events corresponding to blocks of logical times, substan-
tially improving throughput. Where Flink (and other
stream processors) requires continual interaction with
operators to confirm that they have no output at a
logical time, Naiad asks operators to explicitly identify
future times at which the operator should be notified,
which is necessary to support cyclic dataflows. These in-
terfaces reduce the volume of coordination, but require a
deeper involvement of the system itself: continually in-
voking operators in Flink and sequencing notifications
in Naiad.
We propose a simple dataflow coordination primitive,
timestamp tokens, which can dramatically simplify the
design of advanced dataflow systems. Drawing inspi-
ration from work on capability systems, a timestamp
token is an in-memory object that can be held by an op-
erator and provides the ability to produce timestamped
data messages on a specific dataflow edge. A timestamp
token does not require repeated interaction between sys-
tem and operator to confirm, exercise, or release this
ability. Instead, an operator accumulates and summa-
rizes its interactions with its timestamp tokens. The
system collects this information when most convenient,
maintains a view of outstanding timestamp tokens, and
provides summaries of potential input timestamps to
each operator.
Timestamp tokens make it relatively simple to intro-
duce dataflow idioms that would be complicated or im-
possible in other systems. Although we have not pre-
viously reported on them since we designed and imple-
mented them in an open-source data-processor, times-
tamp tokens have been in use for several years in various
research and production projects.
Faucet [18] uses timestamp tokens to allow opera-
1
arXiv:2210.06113v1 [cs.DC] 12 Oct 2022
t=3
t=4
t=257
tokenize count
map
join
t=3
t=4
t=257
t=3
t=257
t=3
t=257
task parallelism
data parallelism
pipeline
parallelism
input A
input B
timestamps
timestamped
messages
Figure 1: Data, task, and pipeline parallelism in
dataflow systems with a temporal dimension. Data for
each key is represented with different shapes. map trans-
forms the tuples from input A: independent input tuples
can be processed in a data-parallel fashion. tokenize
acts on strings from input B, and count groups and
tallies the tokens. map and count do not share a
dataflow edge and are a candidate for task parallelism.
count can process tokens at timestamp 4 while counts
for timestamp 3 are joined with the outputs of map:
the temporal dimension enables pipeline parallelism be-
tween inter-dependent operators.
tors (and dataflow fragments) to implement their own
flow control, without modifying system code. DD [20]
uses timestamp tokens to provide arbitrary granularity
timestamps for differential dataflow, dramatically im-
proving the throughput over the corresponding Naiad
implementation. Megaphone [16] uses timestamp tokens
to specialize the implementations of operator-internal
schedulers, for example using priority queues in opera-
tors that support them without requiring system-wide
support. In each case, timestamp tokens’ separation
between system and operators provided the flexibility
to introduce behavior that would otherwise require the
implementation of a specialized system.
2 Coordination in dataflow sys-
tems
A dataflow program is expressed as a directed graph
(V, E) where nodes Vrepresent data transformations
and edges Erepresent the communication channels be-
tween the nodes. A dataflow system instantiates multi-
ple workers and provides each with the dataflow graph.
At runtime, the system exchanges data messages be-
tween workers, as the messages cross dataflow edges,
and each worker independently applies data transfor-
mations in response to received data, producing output
messages that are further exchanged and processed.
In modern dataflow systems, messages bear a logi-
cal timestamp t, and dataflow operators maintain or
advance timestamps as they process messages. The sys-
tem and operators collaborate to track outstanding mes-
sages by timestamp, so that operators can learn when
certain input timestamps are complete and it is appro-
priate to produce the corresponding output. Most com-
monly, the system provides the operator with a “water-
mark” or “frontier” indicating a lower bound on future
timestamps the operator may observe, and the opera-
tor communicates to the system a lower bound on the
timestamps it might still need to produce as output.
The system is responsible for collecting and integrat-
ing the information from all operators, as well as the
messages produced and retired, to provide correct lower
bounds to the operators.
2.1 Representative dataflow systems
We now walk through several representative systems
and relate their moving parts to dataflow coordination.
Spark models a computation as an acyclic dataflow
graph, but without distinct logical times: inputs in
Spark are either ”complete” or ”not yet complete”. The
Spark system tracks which inputs are complete and sig-
nals operators when their inputs are all complete and
the operator can run to completion. Operators report
back to the system as they complete their outputs.
Flink models computations as an acyclic dataflow
graph, with integer logical times. Flink streams
(dataflow edges) report an increasing integer “water-
mark” lower-bounding the timestamps the stream may
yet produce. These watermarks are interleaved in the
stream of data itself, and each operator is required to
produce them in their output streams as well. Flink
does not have a centralized scheduler, and maintains
a fresh view of its outputs only through the continued
introduction of new watermarks in the dataflow inputs.
Naiad models computations as a potentially cyclic
dataflow graph, with partially ordered logical times.
Naiad operators request “notifications” at specified log-
ical times, and Naiad invokes a callback only once it
determines that all messages bearing that logical time
have been delivered. Naiad does not present operators
with lower bounds for their inputs, and instead requires
operators to defer the responsibility of scheduling to the
system itself, in part because the logic for doing so re-
quires a holistic view of the dataflow graph and all other
pending notifications.
Each of these systems introduce new opportunities
for concurrency, and corresponding performance gains
on important tasks. However, no one system unifies the
work of the others. We believe that unifying this work,
and laying the groundwork for more advanced behav-
2
iors, requires a simplification of the interface between
system and operator, rather than further sophistication.
3 Timestamp tokens
We propose that dataflow systems and operator logic
can coordinate precisely, efficiently, and ergonomically
by explicitly handling in-memory tokens that represent
their ability to produce outgoing data in the future. We
borrow and adapt this idiom from capability systems
(e.g. object-capability systems [13,15], capability-based
protection and security [12,21], hardware capabilities
[5,11]). Similarly to capabilities1, a timestamp token
represents a computing object – an operator output –
and the actions that can be performed with respect to
that object: the production of data at timestamp tand
dataflow location l.
Following Naiad we refer to the pair of timestamp t
and location las a pointstamp (t, l). A location can be
either a node in Vor an edge in E.
Definition. Atimestamp token is a coordination
primitive that names an associated pointstamp (t, l),
and which gives its holder the ability to produce mes-
sages with timestamp tat location l.
The location for a timestamp token is typically one of
the output edges of the operator that holds it.
Nothwithstanding any other similarities to capabil-
ities, our interest is in the information that holding
timestamp tokens communicates to others. The system
tracks the set of live timestamp tokens and summarizes
this information to operators as frontiers: lower bounds
on the timestamps that operators may yet observe in
their inputs. By downgrading (to future timestamps)
or discarding their held timestamp tokens, operators al-
low frontiers to advance and the computation as a whole
to make forward progress.
3.1 The timestamp token life-cycle
Each dataflow operator is initially provided with a
timestamp token for each of its output edges, each bear-
ing some minimal “zero” timestamp. This gives each
operator the opportunity to be a source of timestamped
messages, even without receiving input messages. For
many operators, their first actions will be to discard
these timestamp tokens, by which they release their
ability to produce output messages unprompted, and
unblock the dataflow system at the same time.
1“Each capability [...] locates by means of a pointer some
computing object, and indicates the actions that the computation
may perform with respect to that object.” [13]
3. downgrade
operator A
operator (B)
t=4,
timestamp tokens
t=3,
t=6,
1. receive
2. exercise
AB
AB
AB
t=6
input output
t=6
Figure 2: Timestamp token life-cycle.
As a dataflow operator executes, it can receive, ex-
ercise, downgrade, and discard timestamp tokens (Fig-
ure 2). Operators receive timestamped input messages,
each of which provides a timestamp token at that times-
tamp for each of the operator’s outputs. Operators can
produce timestamped output messages as long as they
hold a timestamp token with the corresponding times-
tamp and output edge. Lastly, operators can arbitrar-
ily hold, downgrade (to future timestamps), and discard
their timestamp tokens as their logic dictates.
The dataflow system is informed of the net changes to
the number of timestamp tokens for each pointstamp,
but only passively in response to operator actions,
rather than actively as a gatekeeper. Through this infor-
mation the system can inform dataflow operators about
the consequences of operator actions, without the spe-
cific details of the reasons for those actions.
3.2 Coordination
The coordination state of the dataflow system is the set
of timestamp tokens, which when combined with the
dataflow graph determines lower bounds for the times-
tamps at each operator input. As the set of times-
tamp tokens evolves these lower bounds advance, and
the dataflow system has the responsibility of informing
operators as this happens. The difference with times-
tamp tokens is that operators drive the production of
this information, instead of the system itself.2
Operators have a great deal of flexibility in how (or
even if) they respond to changes in their input frontiers
(timestamp lower bounds). Certain streaming opera-
tors like map and filter can be oblivious to this in-
formation and process data as it arrives. Synchronous
reduction operators like reduce should await the in-
dication that they have received all inputs for a times-
tamp before they apply their reduction function and
produce output. Hybrid operators like count may per-
2For example, Naiad does not allow operators to hold tokens
across invocations; Timely Dataflow (without timestamp tokens)
does, by allowing operators to participate directly (and often in-
correctly) in the coordination protocol. Here, timestamp tokens
are respectively more expressive, and safer.
3
摘要:

Timestamptokens:abettercoordinationprimitivefordata-processingsystemsAndreaLattuadaETHZurichFrankMcSherryMaterializeInc.AbstractDistributeddataprocessingsystemshaveadvancedthroughmodelsthatexposemoreandmoreopportuni-tiesforconcurrencywithinacomputation.Theschedul-ingoftheseincreasinglysophisticatedm...

展开>> 收起<<
Timestamp tokens a better coordination primitive for data-processing systems Andrea Lattuada.pdf

共14页,预览3页

还剩页未读, 继续阅读

声明:本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。玖贝云文库仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知玖贝云文库,我们立即给予删除!
分类:图书资源 价格:10玖币 属性:14 页 大小:469.12KB 格式:PDF 时间:2025-04-26

开通VIP享超值会员特权

  • 多端同步记录
  • 高速下载文档
  • 免费文档工具
  • 分享文档赚钱
  • 每日登录抽奖
  • 优质衍生服务
/ 14
客服
关注