Saya telah menggunakan contoh dari web dan dapat mentransfer file dari Mongo ke HDFS dan sebaliknya. Saya tidak dapat menemukan halaman web yang tepat sekarang. Tapi programnya seperti di bawah ini.
Anda bisa mendapatkan percikan dari ini dan melanjutkan.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.bson.BSONObject;
import org.bson.types.ObjectId;
import com.mongodb.hadoop.MongoInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;
public class CopyFromMongodbToHDFS {
public static class ImportWeblogsFromMongo extends
Mapper<LongWritable, Text, Text, Text> {
public void map(Object key, BSONObject value, Context context)
throws IOException, InterruptedException {
System.out.println("Key: " + key);
System.out.println("Value: " + value);
String md5 = value.get("md5").toString();
String url = value.get("url").toString();
String date = value.get("date").toString();
String time = value.get("time").toString();
String ip = value.get("ip").toString();
String output = "\t" + url + "\t" + date + "\t" + time + "\t" + ip;
context.write(new Text(md5), new Text(output));
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
MongoConfigUtil.setInputURI(conf,
"mongodb://127.0.0.1:27017/test.mylogs");
System.out.println("Configuration: " + conf);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "Mongo Import");
Path out = new Path("/user/cloudera/test1/logs.txt");
FileOutputFormat.setOutputPath(job, out);
job.setJarByClass(CopyFromMongodbToHDFS.class);
job.setMapperClass(ImportWeblogsFromMongo.class);
job.setOutputKeyClass(ObjectId.class);
job.setOutputValueClass(BSONObject.class);
job.setInputFormatClass(MongoInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}