图的三角形计数

实验背景

图的三角形计数问题是一个基本的图计算问题,是很多复杂网络分析(比如社 交网络分析)的基础。目前图的三角形计数问题已经成为了 Spark 系统中 GraphX 图计算库 所提供的一个算法级API。本次实验任务就是要在Hadoop系统上实现Twitter社交网络图的 三角形计数任务。 换句话说,这TM是mapreduce课程的作业啊,不做也得做啊!!!

实验要求

一个社交网络可以看做是一张图(离散数学中的图)。社交网络中的人对应于图的顶点;社 交网络中的人际关系对应于图中的边。在本次实验任务中,我们只考虑一种关系——用户之 间的关注关系。假设“王五”在 Twitter/微博中关注了“李四”,则在社交网络图中,有一条对 应的从“王五”指向“李四”的有向边。图 1 中展示了一个简单的社交网络图,人之间的关注关 系通过图中的有向边标识了出来。 graph 本次的实验任务就是在给定的社交网络图中,统计图中所有三角形的数量。 在统计前,需要先进行有向边到无向边的转换,依据如下逻辑转换:

1
IF ( A→B) OR (B→A) THEN A-B

“A→B”表示从顶点 A 到顶点 B 有一条有向边。A-B 表示顶点 A 和顶点 B 之间有一条无向边。 一个示例见图 1,图 1 右侧的图就是左侧的图去除边方向后对应的无向图。 请在无向图上统计三角形的个数。在图 1 的例子中,一共有 3 个三角形。 + 输入格式 输入数据仅一个文件。该文件由若干行组成,每一行由两个以空格分隔的整数组成: A B A,B 分别是两个顶点的 ID。这一行记录表示图中具有一条由 A 到 B 的有向边。整个图的结构由该文件唯一确定。 下面的框中是文件部分内容的示例:

1
2
3
4
5
6
7
8
87982906 17975898
17809581 35664799
524620711 270231980
247583674 230498574
348281617 255810948
159294262 230766095
14927205 5380672
......
  • 输出格式 请在报告中报告如下内容: 1.实验设计说明,包括主要设计思路、算法设计、程序和各个类的设计说明 2.最终统计出的三角形个数 3.程序运行和实验结果说明和分析 4.性能、扩展性等方面存在的不足和可能的改进之处 5.源程序,执行程序 6.在集群上运行 MapReduce 作业的截屏、JobTracker 上记录的截屏

实验思路

基本思路很简单,假如我们找到了 A->B 和 A->C 两个边,那么,我们只要去找有没有B->C这条边就可以了。 详细来说,我们使用了三个map reduce过程来实现这个东西。 - 第一个map reduce map用来调换顺序,意思就是,我们把 A->B 和 B->A 都变成 A->B,这样有什么好处呢?之前实验要求里也说了, A->B 和 B->A 这两条有向边应该化成一条无向边,这第一个map reduce就是做这个事情的。这里,把 A->B 作为key。 reduce过程是去重,就是假如有多个A->B,记做一个边,当然如果实验要求不同,可以不同。 - 第二个mapreduce map过程是把key A->B 拆开,把A作为key,把B作为value。这里是干什么呢?这里就是要寻找了,我们要找到需要的边,也就是上面举例中的B->C边存在不存在。 reduce过程,对value进行处理。比如key为A,那么,其value应该是所有和A有边的点。假如B,C,那么我们就要把B->C作为新的key,其value我们用&来代替,意思就是这是我们要找的边。另一方面,我们不能把原来的边直接删掉,因为我们还是要从这里面找边,那么,还要有A->B,A->C这样的key,其value我们用#来表示,代表这是存在的边。 - 第三个mapreduce map过程没有做上面事情,就是把第二部reduce的数据重新传到第三步的reduce。 reduce就是比对。我们找到每个key后面的value中,有没有#,如果有,就说明存在这个边,然后把除了#之外的&的个数进行统计,和就是我们所需要的个数。最后通过一个cleanup函数,打印最后的统计结果即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;


public class triangle {
  
  public static class sortMapper extends Mapper<Object, Text, Text, Text>{
      private Text a = new Text();
      private Text b = new Text();
      
      public void map(Object key,Text value,Context context)throws IOException,InterruptedException {

          StringTokenizer itr=new StringTokenizer(value.toString());
          long first = Integer.parseInt(itr.nextToken());
          long second = Integer.parseInt(itr.nextToken());
          if (first <= second){
              a.set(first+"#"+second);
              b.set("#");
          }
          else{
              a.set(second+"#"+first);
              b.set("#");
          }
          context.write(a, b);
      }
  }
  
  public static class sortReducer extends Reducer<Text, Text, Text, Text>{
      private Text b = new Text();    
      public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException {
          //Text b = new Text();
          b.set("#");
          context.write(key, b);
      }
  }
  
  public static class findMap extends Mapper<LongWritable, Text, Text, Text>{
      public void map(LongWritable key, Text value, Context context)throws IOException,InterruptedException {
          StringTokenizer itr=new StringTokenizer(value.toString());
          String[] tokens = itr.nextToken().toString().split("#");
          //String[] tokens = value.toString().split("#");
          String a = tokens[0];
          String b = tokens[1];
          //System.out.println(a+"    "+b);
          Text left = new Text();
          Text right = new Text();
          left.set(a+"");
          right.set(b+"");
          context.write(left, right);
      }
  }
  
  public static class findReducer extends Reducer<Text, Text, Text, Text>{
      
       public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException {
           ArrayList<String> array = new ArrayList<String>();
           Text left = new Text();
           Text right = new Text();
           right.set("#");
           for(Text value : values){
               array.add(value.toString());
               left.set(key.toString()+"#"+value.toString());
               context.write(left, right);
           }
           for(int i=0; i<array.size(); i++){
               for(int j=i+1; j<array.size(); j++){
                   Text a = new Text();
                   Text b = new Text();
                   if(Integer.parseInt(array.get(i)) < Integer.parseInt(array.get(j))){
                       a.set(array.get(i)+"#"+array.get(j));
                       b.set("&");
                   }
                   else{
                       a.set(array.get(j)+"#"+array.get(i));
                       b.set("&");
                   }
                   context.write(a, b);
               }
           }
       }
  }
  
  public static class combineMap extends Mapper<LongWritable, Text, Text, Text>{
      private Text a = new Text();
      private Text b = new Text();
      public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException {
          StringTokenizer itr=new StringTokenizer(value.toString());
          a.set(itr.nextToken().toString());
          b.set(itr.nextToken().toString());
          context.write(a, b);
      }
  }
  
  public static class combineReducer extends Reducer<Text, Text, Text, Text>{
      private static int result = 0;
      //private Text a = new Text();
      //private Text b = new Text();
      
      public void cleanup(Context context) throws IOException, InterruptedException{
          context.write(new Text("Result: "), new Text(""+result));
      }
      
      public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException {
          int count = 0;
          int is_triangle = 0;
          for(Text value : values){
              if(value.toString().equalsIgnoreCase("#")){
                  is_triangle = 1;
              }else if(value.toString().equalsIgnoreCase("&")){
                  count ++;
              }else{
                  //System.out.println("wrong input number");
              }
          }
          if (is_triangle == 1){
              result += count;
          }
          //b.set(result+"");
          //context.write(key, new Text(count+""));
      }
  }
  
  public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf=new Configuration();
        String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
        if (otherArgs.length!=4) {
            System.err.println("Usage:invertedindex<in> <out1> <out2> <out3>");
            System.exit(2);
        }
        
        Job job1 = new Job(conf, "job1");
        job1.setJarByClass(triangle.class);
        job1.setMapperClass(sortMapper.class);
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(Text.class);
        job1.setReducerClass(sortReducer.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(Text.class);

        //job1.setInputFormatClass(TextInputFormat.class);
        //job1.setOutputFormatClass(TextOutputFormat.class);
        //job1.setMapOutputKeyClass(Text.class);
        //job1.setMapOutputValueClass(Text.class);
        
        FileInputFormat.addInputPath(job1, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job1, new Path(otherArgs[1]));
        
        job1.waitForCompletion(true);
        

        Job job2 = new Job(conf, "job2");
        job2.setJarByClass(triangle.class);
        job2.setMapperClass(findMap.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setReducerClass(findReducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);
        
        //job2.setInputFormatClass(TextInputFormat.class);
        //job2.setOutputFormatClass(TextOutputFormat.class);
        //job2.setMapOutputKeyClass(Text.class);
        //job2.setMapOutputValueClass(Text.class);
        
        FileInputFormat.addInputPath(job2, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job2, new Path(otherArgs[2]));
        
        job2.waitForCompletion(job1.isComplete());
     
        Job job3 = new Job(conf, "job3");
        job3.setJarByClass(triangle.class);
        job3.setMapperClass(combineMap.class);
        job3.setMapOutputKeyClass(Text.class);
        job3.setMapOutputValueClass(Text.class);
        job3.setReducerClass(combineReducer.class);
        job3.setOutputKeyClass(Text.class);
        job3.setOutputValueClass(Text.class);


        //job3.setInputFormatClass(TextInputFormat.class);
        //job3.setOutputFormatClass(TextOutputFormat.class);
        //job3.setMapOutputKeyClass(Text.class);
        //job3.setMapOutputValueClass(Text.class);
        
        FileInputFormat.addInputPath(job3, new Path(otherArgs[2]));
        FileOutputFormat.setOutputPath(job3, new Path(otherArgs[3]));
        
        job3.waitForCompletion(job2.isComplete());

    }
  
}
May 8th, 2015