Go backward to B.2 Algorithm Go up to B Distributed Snapshots Go forward to B.4 State Program |
Embedded into this page is a DAJ program that implements the algorithm described in the previous subsection.
You should see a button labelled "Press Me" embedded below. If you do not see this button but the string "Please enable Java!" instead, you must enable Java on your Web browser and then reload thiis page.
In case your browser does not support Java, we include a snapshot of the visualization window below.
Below you can find the full source code of the program.
// -------------------------------------------------------------------------- // $Id: snapshot.tex,v 1.3 1997/11/12 21:59:23 schreine Exp $ // the Chandy-Lamport algorithm for taking distributed snapshots // // (c) 1997, Wolfgang Schreiner <Wolfgang.Schreiner@risc.uni-linz.ac.at> // http://www.risc.uni-linz.ac.at/software/daj // -------------------------------------------------------------------------- import daj.*; import java.util.*; // ---------------------------------------------------------------------------- // // the application itself // // ---------------------------------------------------------------------------- public class Main extends Application { // -------------------------------------------------------------------------- // main function of application // -------------------------------------------------------------------------- public static void main(String[] args) { new Main().run(); } // -------------------------------------------------------------------------- // constructor for application // -------------------------------------------------------------------------- public Main() { super("Distributed Snapshots", 580, 450); } // -------------------------------------------------------------------------- // construction of a hexagon network (just for fun) // -------------------------------------------------------------------------- public void construct() { // length of first row and half height of hexagon int w = 3; int h = 2; int size = (w+h-1)*(w+h)-w*(w-1)+(w+h); Node nodes[][] = new Node[w+h][2*h+1]; // left upper corner and vertical/horizontal distance int x0 = 170; int y0 = 50; int x = 100; int y = 86; // random generator for initialization of deposit Random random = new Random(); // create and place nodes int number = 0; for (int j = 0; j < h+1; j++) { for (int i = 0; i < w+j; i++) { int deposit = Math.abs(random.nextInt())%1000; nodes[i][j] = node(new Prog(size, number, deposit), String.valueOf(number), x0+i*x-j*x/2, y0+j*y); number++; } } for (int j = h+1; j < 2*h+1; j++) { for (int i = 0; i < w+2*h-j; i++) { int deposit = Math.abs(random.nextInt())%1000; nodes[i][j] = node(new Prog(size, number, deposit), String.valueOf(number), x0+i*x-(2*h-j)*x/2, y0+j*y); number++; } } // link nodes for (int j = 0; j < h+1; j++) { for (int i = 0; i < w+j; i++) { if (i < w+j-1) link(nodes[i][j], nodes[i+1][j]); if (i > 0) link(nodes[i][j], nodes[i-1][j]); if (j < h) { link(nodes[i][j], nodes[i][j+1]); link(nodes[i][j], nodes[i+1][j+1]); } else { if (i < w+j-1) link(nodes[i][j], nodes[i][j+1]); if (i> 0) link(nodes[i][j], nodes[i-1][j+1]); } if (j > 0) { if (i < w+j-1) link(nodes[i][j], nodes[i][j-1]); if (i > 0) link(nodes[i][j], nodes[i-1][j-1]); } } } for (int j = h+1; j < 2*h+1; j++) { for (int i = 0; i < w+2*h-j; i++) { if (i < w+2*h-j-1) link(nodes[i][j], nodes[i+1][j]); if (i > 0) link(nodes[i][j], nodes[i-1][j]); if (j < 2*h) { if (i > 0) link(nodes[i][j], nodes[i-1][j+1]); if (i < w+2*h-j-1) link(nodes[i][j], nodes[i][j+1]); } if (j > 0) { link(nodes[i][j], nodes[i][j-1]); link(nodes[i][j], nodes[i+1][j-1]); } } } } // -------------------------------------------------------------------------- // informative message // -------------------------------------------------------------------------- public String getText() { return "Distributed Snapshots\n \n" + "The Chandy-Lamport algorithm\n" + "for taking consistent global snapshots\n" + "of a running network."; } } // ---------------------------------------------------------------------------- // // a program class // // ---------------------------------------------------------------------------- class Prog extends Program { // state variables public int size; // network size public int number; // current node number public int deposit; // current deposit public Message message; // message currently being processed public int index; // index of channel from which message was received public int mode; // current execution mode // snapped local state public int snapValue; // current snap value public boolean snapped[]; // snap state of input channels public int missingSnaps; // number of missing snaps of input channels // snapped total state public int totalValue; // current network value public boolean done[]; // true iff node has reported its snap value public int missingValues; // number of outstanding node reports // execution modes private final int RUNNING = 0; // initial state private final int SNAPPING = 1; // constructing snapshot private final int BROADCASTING = 2; // broadcasting result private final int TERMINATED = 3; // terminal state // -------------------------------------------------------------------------- // construct program in network of size `s` at node `n` with deposit `d` // -------------------------------------------------------------------------- public Prog(int s, int n, int d) { size = s; number = n; deposit = d; message = null; mode = RUNNING; } // -------------------------------------------------------------------------- // return random integer `r` with 0 <= `r` < `n` // -------------------------------------------------------------------------- private Random rand = new Random(); private int random(int n) { return Math.abs(rand.nextInt())%n; } // -------------------------------------------------------------------------- // go into snapping mode // -------------------------------------------------------------------------- public void snap() { // go into snap mode, store local deposit, reset total value mode = SNAPPING; snapValue = deposit; totalValue = 0; // initialize snapping state of channels int s = in().getSize(); snapped = new boolean[s]; for (int i = 0; i < s; i++) snapped[i] = false; missingSnaps = s; // initialize report state of nodes done = new boolean[size]; for (int i = 0; i < size; i++) done[i] = false; missingValues = size; } // -------------------------------------------------------------------------- // collect `value` from `sender` // -------------------------------------------------------------------------- public void collect(int sender, int value) { totalValue += value; done[sender] = true; missingValues--; if (missingValues == 0) mode = TERMINATED; } // -------------------------------------------------------------------------- // main function of program // -------------------------------------------------------------------------- public void main() { while (mode != TERMINATED) { // node 0 triggers snapping at its own will if (number == 0 && mode == RUNNING && random(10) == 0) { snap(); interrupt(); out().send(new SnapMessage()); } // wait some time for message index = in().select(random(5)); if (index == -1 && deposit > 0) { // --------------------------------------------------------------- // send some money from deposit to some neighbor node // --------------------------------------------------------------- int value = random(deposit); deposit -= value; message = new ValueMessage(value); index = random(out().getSize()); out(index).send(message); message = null; continue; } // message was received message = in(index).receive(); // --------------------------------------------------------------- // value message was received // --------------------------------------------------------------- if (message instanceof ValueMessage) { ValueMessage valueMessage = (ValueMessage)message; int value = valueMessage.getValue(); deposit += value; // snap message if (mode == SNAPPING && !snapped[index]) { snapValue += value; } } // --------------------------------------------------------------- // snap message was received // --------------------------------------------------------------- else if (message instanceof SnapMessage) { SnapMessage snapMessage = (SnapMessage)message; if (mode == RUNNING) { snap(); out().send(snapMessage); } if (!snapped[index]) { snapped[index] = true; missingSnaps--; if (missingSnaps == 0) { mode = BROADCASTING; collect(number, snapValue); if (number == 0) interrupt(); out().send(new ResultMessage(number, snapValue)); } } } // --------------------------------------------------------------- // result message was received // --------------------------------------------------------------- else if (message instanceof ResultMessage) { ResultMessage resultMessage = (ResultMessage)message; int sender = resultMessage.getSender(); if (!done[sender]) { collect(sender, resultMessage.getValue()); out().send(resultMessage); } } // message was handled message = null; } // assert that total amount of money equals the determined value assert(new TotalValue(totalValue)); } // -------------------------------------------------------------------------- // called for display of program state // -------------------------------------------------------------------------- public String getText() { String messageString; if (message == null) messageString = "(null)"; else messageString = message.getText(); String modeString; switch (mode) { case RUNNING: { modeString = "RUNNING"; break; } case SNAPPING: { modeString = "SNAPPING"; break; } case BROADCASTING: { modeString = "BROADCASTING"; break; } case TERMINATED: { modeString = "TERMINATED"; break; } default: { modeString = "(unknown)"; break; } } if (message == null) messageString = "(null)"; else messageString = message.getText(); if (mode == RUNNING) return "mode: " + modeString + "\ndeposit: " + String.valueOf(deposit); else if (mode == SNAPPING) return "mode: " + modeString + "\ndeposit: " + String.valueOf(deposit) + "\nsnapValue: " + String.valueOf(snapValue) + "\nmissingSnaps: " + String.valueOf(missingSnaps); else if (mode == BROADCASTING) return "mode: " + modeString + "\ndeposit: " + String.valueOf(deposit) + "\nsnapValue: " + String.valueOf(snapValue) + "\ntotalValue: " + String.valueOf(totalValue) + "\nmissingValues: " + String.valueOf(missingValues); else return "mode: " + modeString + "\ndeposit: " + String.valueOf(deposit) + "\nsnapValue: " + String.valueOf(snapValue) + "\ntotalValue: " + String.valueOf(totalValue); } } // ---------------------------------------------------------------------------- // // the message classes // // ---------------------------------------------------------------------------- // ---------------------------------------------------------------------------- // carries `value` // ---------------------------------------------------------------------------- class ValueMessage extends Message { private int value; public ValueMessage(int v) { value = v; } public int getValue() { return value; } public String getText() { return "VALUE[" + String.valueOf(value) + "]"; } } // ---------------------------------------------------------------------------- // carries no information // ---------------------------------------------------------------------------- class SnapMessage extends Message { public String getText() { return "SNAP"; } } // ---------------------------------------------------------------------------- // carries `value` of `sender` // ---------------------------------------------------------------------------- class ResultMessage extends Message { private int sender; private int value; public ResultMessage(int s, int v) { sender = s; value = v; } public int getSender() { return sender; } public int getValue() { return value; } public String getText() { return "RESULT[" + String.valueOf(sender) + ", " + String.valueOf(value) + "]"; } } // ---------------------------------------------------------------------------- // // global assertion stating that total values of network equals `value` // // ---------------------------------------------------------------------------- class TotalValue extends GlobalAssertion { int value; // expected value int sum; // counted value public TotalValue(int v) { value = v; } public String getText() { return "invalid value: " + String.valueOf(sum) + "(" + String.valueOf(value) + " expected)"; } // -------------------------------------------------------------------------- // add values of all messages in `msg` // -------------------------------------------------------------------------- private int add(Message msg[]) { int s = 0; for (int i = 0; i < msg.length; i++) { if (msg[i] instanceof ValueMessage) { s += ((ValueMessage)msg[i]).getValue(); } } return s; } // -------------------------------------------------------------------------- // check whether total value of network equals `value` // -------------------------------------------------------------------------- public boolean assert(Program prog[]) { sum = 0; for (int j = 0; j < prog.length; j++) { Prog program = (Prog)prog[j]; // value deposited in node sum += program.deposit; // value in output channels int n = program.out().getSize(); for (int i = 0; i < n; i++) { Message msg[] = getMessages(program.out(i)); sum += add(msg); } // value in pending message if (program.message != null && program.message instanceof ValueMessage) { sum += ((ValueMessage)program.message).getValue(); } } return sum == value; } }