previous up next
Go backward to B.2 Algorithm
Go up to B Distributed Snapshots
Go forward to B.4 State Program
RISC-Linz logo

B.3 Program

Embedded into this page is a DAJ program that implements the algorithm described in the previous subsection.

You should see a button labelled "Press Me" embedded below. If you do not see this button but the string "Please enable Java!" instead, you must enable Java on your Web browser and then reload thiis page.

Please enable Java!

In case your browser does not support Java, we include a snapshot of the visualization window below.

Distributed Snapshots: Visualization
 

Below you can find the full source code of the program.

// --------------------------------------------------------------------------
// $Id: snapshot.tex,v 1.3 1997/11/12 21:59:23 schreine Exp $
// the Chandy-Lamport algorithm for taking distributed snapshots
//
// (c) 1997, Wolfgang Schreiner <Wolfgang.Schreiner@risc.uni-linz.ac.at>
// http://www.risc.uni-linz.ac.at/software/daj
// --------------------------------------------------------------------------
import daj.*;
import java.util.*;

// ----------------------------------------------------------------------------
//
// the application itself
//
// ----------------------------------------------------------------------------
public class Main extends Application
{
  // --------------------------------------------------------------------------
  // main function of application
  // --------------------------------------------------------------------------
  public static void main(String[] args)
  {
    new Main().run();
  }

  // --------------------------------------------------------------------------
  // constructor for application
  // --------------------------------------------------------------------------
  public Main()
  {
    super("Distributed Snapshots", 580, 450);
  }

  // --------------------------------------------------------------------------
  // construction of a hexagon network (just for fun)
  // --------------------------------------------------------------------------
  public void construct()
  {
    // length of first row and half height of hexagon
    int w = 3;
    int h = 2;
    int size = (w+h-1)*(w+h)-w*(w-1)+(w+h);
    Node nodes[][] = new Node[w+h][2*h+1];

    // left upper corner and vertical/horizontal distance
    int x0 = 170;
    int y0 = 50;
    int x = 100;
    int y = 86;

    // random generator for initialization of deposit
    Random random = new Random();

    // create and place nodes
    int number = 0;
    for (int j = 0; j < h+1; j++)
      {
        for (int i = 0; i < w+j; i++)
          {
            int deposit = Math.abs(random.nextInt())%1000;
            nodes[i][j] = 
              node(new Prog(size, number, deposit), String.valueOf(number), 
                   x0+i*x-j*x/2, y0+j*y);
            number++;
        
          }
      }
    for (int j = h+1; j < 2*h+1; j++)
      {
        for (int i = 0; i < w+2*h-j; i++)
          {
            int deposit = Math.abs(random.nextInt())%1000;
            nodes[i][j] = 
              node(new Prog(size, number, deposit), String.valueOf(number), 
                   x0+i*x-(2*h-j)*x/2, y0+j*y);
            number++;
          }
      }

    // link nodes
    for (int j = 0; j < h+1; j++)
      {
        for (int i = 0; i < w+j; i++)
          {
            if (i < w+j-1) link(nodes[i][j], nodes[i+1][j]);
            if (i > 0) link(nodes[i][j], nodes[i-1][j]);
            if (j < h) 
              {
                link(nodes[i][j], nodes[i][j+1]);
                link(nodes[i][j], nodes[i+1][j+1]);
              }
            else
              {
                if (i < w+j-1) link(nodes[i][j], nodes[i][j+1]);
                if (i> 0) link(nodes[i][j], nodes[i-1][j+1]);
              }
            if (j > 0) 
              {
                if (i < w+j-1) link(nodes[i][j], nodes[i][j-1]);
                if (i > 0) link(nodes[i][j], nodes[i-1][j-1]);
              }
          }
      }
    for (int j = h+1; j < 2*h+1; j++)
      {
        for (int i = 0; i < w+2*h-j; i++)
          {
            if (i < w+2*h-j-1) link(nodes[i][j], nodes[i+1][j]);
            if (i > 0) link(nodes[i][j], nodes[i-1][j]);
            if (j < 2*h) 
              {
                if (i > 0) link(nodes[i][j], nodes[i-1][j+1]);
                if (i < w+2*h-j-1) link(nodes[i][j], nodes[i][j+1]);
              }     
            if (j > 0) 
              {
                link(nodes[i][j], nodes[i][j-1]);
                link(nodes[i][j], nodes[i+1][j-1]);
              }
          }
      }
  }

  // --------------------------------------------------------------------------
  // informative message
  // --------------------------------------------------------------------------
  public String getText()
  {
    return "Distributed Snapshots\n \n" +
      "The Chandy-Lamport algorithm\n" +
      "for taking consistent global snapshots\n" +
      "of a running network.";
  }
}

// ----------------------------------------------------------------------------
//
// a program class
//
// ----------------------------------------------------------------------------
class Prog extends Program
{
  // state variables
  public int size;        // network size
  public int number;      // current node number
  public int deposit;     // current deposit
  public Message message; // message currently being processed
  public int index;       // index of channel from which message was received
  public int mode;        // current execution mode

  // snapped local state
  public int snapValue;      // current snap value
  public boolean snapped[];  // snap state of input channels
  public int missingSnaps;   // number of missing snaps of input channels

  // snapped total state
  public int totalValue;     // current network value
  public boolean done[];     // true iff node has reported its snap value
  public int missingValues;  // number of outstanding node reports

  // execution modes
  private final int RUNNING = 0;      // initial state
  private final int SNAPPING = 1;     // constructing snapshot
  private final int BROADCASTING = 2; // broadcasting result
  private final int TERMINATED = 3;   // terminal state

  // --------------------------------------------------------------------------
  // construct program in network of size `s` at node `n` with deposit `d`
  // --------------------------------------------------------------------------
  public Prog(int s, int n, int d)
  { 
    size = s;
    number = n;
    deposit = d;
    message = null;    
    mode = RUNNING;
  } 
 
  // --------------------------------------------------------------------------
  // return random integer `r` with 0 <= `r` < `n`
  // --------------------------------------------------------------------------
  private Random rand = new Random();
  private int random(int n)
  {
    return Math.abs(rand.nextInt())%n;
  }

  // --------------------------------------------------------------------------
  // go into snapping mode
  // --------------------------------------------------------------------------
  public void snap()
  {
    // go into snap mode, store local deposit, reset total value
    mode = SNAPPING;
    snapValue = deposit;
    totalValue = 0;

    // initialize snapping state of channels
    int s = in().getSize();
    snapped = new boolean[s];
    for (int i = 0; i < s; i++)
      snapped[i] = false;    
    missingSnaps = s;

    // initialize report state of nodes
    done = new boolean[size];
    for (int i = 0; i < size; i++)
      done[i] = false;
    missingValues = size;
  }

  // --------------------------------------------------------------------------
  // collect `value` from `sender`
  // --------------------------------------------------------------------------
  public void collect(int sender, int value)
  {
    totalValue += value;
    done[sender] = true;
    missingValues--;
    if (missingValues == 0) 
      mode = TERMINATED;
  }

  // --------------------------------------------------------------------------
  // main function of program
  // --------------------------------------------------------------------------
  public void main()
  { 
    while (mode != TERMINATED)
      {
        // node 0 triggers snapping at its own will
        if (number == 0 && mode == RUNNING && random(10) == 0)
          {
            snap();
            interrupt();
            out().send(new SnapMessage());
          }

        // wait some time for message
        index = in().select(random(5));
        if (index == -1 && deposit > 0)
          {
            // ---------------------------------------------------------------
            // send some money from deposit to some neighbor node
            // ---------------------------------------------------------------
            int value = random(deposit);
            deposit -= value;
            message = new ValueMessage(value);
            index = random(out().getSize());
            out(index).send(message);
            message = null;
            continue;
          }

        // message was received
        message = in(index).receive();

        // ---------------------------------------------------------------
        // value message was received
        // ---------------------------------------------------------------
        if (message instanceof ValueMessage)
          {
            ValueMessage valueMessage = (ValueMessage)message;
            int value = valueMessage.getValue();
            deposit += value;
            
            // snap message
            if (mode == SNAPPING && !snapped[index])
              {
                snapValue += value;
              }
          }

        // ---------------------------------------------------------------
        // snap message was received
        // ---------------------------------------------------------------
        else if (message instanceof SnapMessage)
          {
            SnapMessage snapMessage = (SnapMessage)message;
            if (mode == RUNNING) 
              {
                snap();
                out().send(snapMessage);
              }
            if (!snapped[index])
              {
                snapped[index] = true;
                missingSnaps--;
                if (missingSnaps == 0)
                  {
                    mode = BROADCASTING;
                    collect(number, snapValue);
                    if (number == 0) interrupt();
                    out().send(new ResultMessage(number, snapValue));
                  }
              }
          }

        // ---------------------------------------------------------------
        // result message was received
        // ---------------------------------------------------------------
        else if (message instanceof ResultMessage)
          {
            ResultMessage resultMessage = (ResultMessage)message;
            int sender = resultMessage.getSender();
            if (!done[sender])
              {
                collect(sender, resultMessage.getValue());
                out().send(resultMessage);
              }
          }
        
        // message was handled
        message = null;
      }
    // assert that total amount of money equals the determined value
    assert(new TotalValue(totalValue));
  }

  // --------------------------------------------------------------------------
  // called for display of program state
  // --------------------------------------------------------------------------
  public String getText()
  {
    String messageString;
    if (message == null)
      messageString = "(null)";
    else
      messageString = message.getText();
    String modeString;
    switch (mode)
      {
      case RUNNING:
        {
          modeString = "RUNNING";
          break;
        }
      case SNAPPING:
        {
          modeString = "SNAPPING";
          break;
        }
      case BROADCASTING:
        {
          modeString = "BROADCASTING";
          break;
        }
      case TERMINATED:
        {
          modeString = "TERMINATED";
          break;
        }
      default:
        {         
          modeString = "(unknown)";
          break;
        }
      }
    if (message == null)
      messageString = "(null)";
    else
      messageString = message.getText();

    if (mode == RUNNING)      
      return 
        "mode: " + modeString +
        "\ndeposit: " + String.valueOf(deposit);
    else if (mode == SNAPPING)
      return 
        "mode: " + modeString +
        "\ndeposit: " + String.valueOf(deposit) +
        "\nsnapValue: "  + String.valueOf(snapValue) +
        "\nmissingSnaps: "  + String.valueOf(missingSnaps);
    else if (mode == BROADCASTING)
      return 
        "mode: " + modeString +
        "\ndeposit: " + String.valueOf(deposit) +
        "\nsnapValue: "  + String.valueOf(snapValue) +
        "\ntotalValue: " + String.valueOf(totalValue) +
        "\nmissingValues: "  + String.valueOf(missingValues);
    else
      return 
        "mode: " + modeString +
        "\ndeposit: " + String.valueOf(deposit) +
        "\nsnapValue: "  + String.valueOf(snapValue) +
        "\ntotalValue: " + String.valueOf(totalValue);
  }
}

// ----------------------------------------------------------------------------
//
// the message classes
//
// ----------------------------------------------------------------------------


// ----------------------------------------------------------------------------
// carries `value`
// ----------------------------------------------------------------------------
class ValueMessage extends Message
{
  private int value;  
  public ValueMessage(int v)
  {
    value = v;
  }
  public int getValue()
  {
    return value;
  }
  public String getText()
  {
    return "VALUE[" + String.valueOf(value) + "]";
  }
}

// ----------------------------------------------------------------------------
// carries no information
// ----------------------------------------------------------------------------
class SnapMessage extends Message
{
  public String getText()
  {
    return "SNAP";
  }
}

// ----------------------------------------------------------------------------
// carries `value` of `sender`
// ----------------------------------------------------------------------------
class ResultMessage extends Message
{
  private int sender;
  private int value;  
  public ResultMessage(int s, int v)
  {
    sender = s;
    value = v;
  }
  public int getSender()
  {
    return sender;
  }
  public int getValue()
  {
    return value;
  }
  public String getText()
  {
    return "RESULT[" + String.valueOf(sender) + ", " + 
      String.valueOf(value) + "]";
  }
}

// ----------------------------------------------------------------------------
//
// global assertion stating that total values of network equals `value`
//
// ----------------------------------------------------------------------------
class TotalValue extends GlobalAssertion
{
  int value; // expected value
  int sum;   // counted value
  public TotalValue(int v)
  {
    value = v;
  }
  public String getText()
  {
    return "invalid value: " + String.valueOf(sum) + 
      "(" + String.valueOf(value) + " expected)";
  }

  // --------------------------------------------------------------------------
  // add values of all messages in `msg`
  // --------------------------------------------------------------------------
  private int add(Message msg[])
  {
    int s = 0;
    for (int i = 0; i < msg.length; i++)
      { 
        if (msg[i] instanceof ValueMessage)
          {
            s += ((ValueMessage)msg[i]).getValue();
          }
      }
    return s;
  }

  // --------------------------------------------------------------------------
  // check whether total value of network equals `value`
  // --------------------------------------------------------------------------
  public boolean assert(Program prog[])
  {
    sum = 0;
    for (int j = 0; j < prog.length; j++)
      { 
        Prog program = (Prog)prog[j];
        // value deposited in node
        sum += program.deposit;
        // value in output channels
        int n = program.out().getSize();
        for (int i = 0; i < n; i++)
          {
            Message msg[] = getMessages(program.out(i));
            sum += add(msg);
          }
        // value in pending message
        if (program.message != null && program.message instanceof ValueMessage)
          {
            sum += ((ValueMessage)program.message).getValue();
          }
      }
    return sum == value;
  }
}

Maintainer: Wolfgang Schreiner
Last Modification: November 14, 1997

previous up next