forked from MicrosoftResearch/Naiad
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathProgram.cs
154 lines (125 loc) · 6.72 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/*
* Naiad ver. 0.5
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* THIS CODE IS PROVIDED ON AN *AS IS* BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT
* LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR
* A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
*
* See the Apache Version 2.0 License for specific language governing
* permissions and limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using Microsoft.Research.Naiad;
using Microsoft.Research.Naiad.Frameworks.Azure;
using Microsoft.Research.Naiad.Frameworks.DifferentialDataflow;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure;
namespace Microsoft.Research.Naiad.Examples
{
public static class Program
{
public static void Main(string[] args)
{
using (var controller = NewController.FromArgs(ref args))
{
if (args.Length < 3)
{
Console.Error.WriteLine("Usage: results_container_name node_count edge_count");
System.Environment.Exit(0);
}
string containerName = args[0];
int nodeCount = Convert.ToInt32(args[1]);
int edgeCount = Convert.ToInt32(args[2]);
// To write the results to the command line, leave this line
string connectionString = null;
// For Azure storage emulator, uncomment this line
// string connectionString = "UseDevelopmentStorage=true;";
// To use Windows Azure Storage, uncomment this line, and substitute your account name and key:
// string connectionString = "DefaultEndpointsProtocol=https;AccountName=[Account name];AccountKey=[Account key]";
Console.Error.WriteLine("Running connected components on a random graph ({0} nodes, {1} edges).", nodeCount, edgeCount);
if (connectionString != null)
{
Console.Error.WriteLine("Writing results to container {0}", containerName);
}
else
{
Console.Error.WriteLine("Writing results to console");
}
// generate a random graph
var random = new Random(0);
var graph = new IntPair[edgeCount];
for (int i = 0; i < edgeCount; i++)
graph[i] = new IntPair(random.Next(nodeCount), random.Next(nodeCount));
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
using (var computation = controller.NewComputation())
{
// set up the CC computation
var edges = computation.NewInputCollection<IntPair>();
// no prioritization; uncomment this and comment the next for slowness!
// Func<IntPair, int> priorityFunction = node => 0;
// Introduce labels in priority order. Labels 0 through 9 inclusive are introduced sequentially,
// following by exponentially-growing sets of labels.
Func<IntPair, int> priorityFunction = node => 65536 * (node.t < 10 ? node.t : 10 + Convert.ToInt32(Math.Log(1 + node.t) / Math.Log(2.0)));
// Perform the connected components algorithm on the collection of edges.
var labeledVertices = edges.ConnectedComponents(priorityFunction);
// Count the number of vertices in each component.
var componentSizes = labeledVertices.Count(n => n.t, (l, c) => new Pair<int, long>(l, c)); // counts results with each label
// Ignore the labels and consolidate to find the number of components having each size.
var sizeDistribution = componentSizes.Select(x => x.Second).Consolidate();
if (connectionString != null)
{
var account = CloudStorageAccount.Parse(connectionString);
var container = account.CreateCloudBlobClient().GetContainerReference(containerName);
container.CreateIfNotExists();
// Write the results to the given Azure blob container, with filename "componentSizes-part-i" for process i.
sizeDistribution.Output
.WriteTextToAzureBlobs(container, "componentSizes-part-{0}-{1}");
}
else
{
// Write the results to the console.
sizeDistribution.Subscribe(xs => { foreach (var x in xs) Console.WriteLine(x); });
}
computation.Activate();
edges.OnCompleted(controller.Configuration.ProcessID == 0 ? graph : Enumerable.Empty<IntPair>());
computation.Join();
}
controller.Join();
}
}
/// <summary>
/// the node names are now introduced in waves. propagation of each wave completes before the next starts.
/// </summary>
public static Collection<IntPair, T> ConnectedComponents<T>(this Collection<IntPair, T> edges, Func<IntPair, int> priorityFunction)
where T : Time<T>
{
// initial labels only needed for min, as the max will be improved on anyhow.
var nodes = edges.Select(x => new IntPair(Math.Min(x.s, x.t), Math.Min(x.s, x.t)))
.Consolidate();
// symmetrize the graph
edges = edges.Select(edge => new IntPair(edge.t, edge.s))
.Concat(edges);
// prioritization introduces labels from small to large (in batches).
return nodes.Where(x => false)
.GeneralFixedPoint((lc, x) => x.Join(edges.EnterLoop(lc), n => n.s, e => e.s, (n, e) => new IntPair(e.t, n.t))
.Concat(nodes.EnterLoop(lc, priorityFunction))
.Min(n => n.s, n => n.t),
priorityFunction,
n => n.s,
Int32.MaxValue);
}
}
}