头条|在MaxCompute中利用bitmap进行数据处理
摘要:摘要:很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日
摘要:很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日活跃等分析。本文给出了一个使用MaxComp很
很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日活跃等分析。本文给出了一个使用MaxComp
很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日活跃等分析。
本文给出了一个使用MaxCompute MapReduce开发一个对不同日期活跃用户ID进行bitmap编码和计算的样例。供感兴趣的用户进一步了解、分析,并应用在自己的场景下。
import com.aliyun.odps.OdpsException;import com.aliyun.odps.data.Record;import com.aliyun.odps.data.TableInfo;import com.aliyun.odps.mapred.JobClient;import com.aliyun.odps.mapred.MapperBase;import com.aliyun.odps.mapred.ReducerBase;import com.aliyun.odps.mapred.conf.JobConf;import com.aliyun.odps.mapred.utils.InputUtils;import com.aliyun.odps.mapred.utils.OutputUtils;import com.aliyun.odps.mapred.utils.SchemaUtils;import org.roaringbitmap.RoaringBitmap;import org.roaringbitmap.buffer.ImmutableRoaringBitmap;import java.io.DataOutputStream;import java.io.IOException;import java.io.OutputStream;import java.nio.ByteBuffer;import java.util.Base64;import java.util.Iterator;public class bitmapDemo2{ public static class BitMapper extends MapperBase { Record key; Record value; @Override public void setup(TaskContext context) throws IOException { key = context.createMapOutputKeyRecord(); value = context.createMapOutputValueRecord(); } @Override public void map(long recordNum, Record record, TaskContext context) throws IOException { RoaringBitmap mrb=new RoaringBitmap(); long AID=0; { { { { AID=record.getBigint("id"); mrb.add((int) AID); //获取key key.set(new Object[] {record.getString("active_date")}); } } } } ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes()); mrb.serialize(new DataOutputStream(new OutputStream(){ ByteBuffer mBB; OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;} public void close() {} public void flush() {} public void write(int b) { mBB.put((byte) b);} public void write(byte[] b) {mBB.put(b);} public void write(byte[] b, int off, int l) {mBB.put(b,off,l);} }.init(outbb))); String serializedstring = Base64.getEncoder().encodeToString(outbb.array()); value.set(new Object[] {serializedstring}); context.write(key, value); } } public static class BitReducer extends ReducerBase { private Record result = null; public void setup(TaskContext context) throws IOException { result = context.createOutputRecord(); } public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException { long fcount = 0; RoaringBitmap rbm=new RoaringBitmap(); while (values.hasNext()) { Record val = values.next(); ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0))); ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb); RoaringBitmap p= new RoaringBitmap(irb); rbm.or(p); } ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes()); rbm.serialize(new DataOutputStream(new OutputStream(){ ByteBuffer mBB; OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;} public void close() {} public void flush() {} public void write(int b) { mBB.put((byte) b);} public void write(byte[] b) {mBB.put(b);} public void write(byte[] b, int off, int l) {mBB.put(b,off,l);} }.init(outbb))); String serializedstring = Base64.getEncoder().encodeToString(outbb.array()); result.set(0, key.get(0)); result.set(1, serializedstring); context.write(result); } } public static void main( String[] args ) throws OdpsException { System.out.println("begin........."); JobConf job = new JobConf(); job.setMapperClass(BitMapper.class); job.setReducerClass(BitReducer.class); job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("id:string")); InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);// +------------+-------------+// | id | active_date |// +------------+-------------+// | 1 | 20190729 |// | 2 | 20190729 |// | 3 | 20190730 |// | 4 | 20190801 |// | 5 | 20190801 |// +------------+-------------+ OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);// +-------------+------------+// | active_date | bit_map |// +-------------+------------+// 20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D// 20190730,OjAAAAEAAAAAAAAAEAAAAAMA// 20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D JobClient.runJob(job); }}
对Java应用打包后,上传到MaxCompute项目中,即可在MaxCompute中调用该MR作业,对输入表的数据按日期作为key进行用户id的编码,同时按照相同日期对bitmap后的用户id取OR操作(根据需要可以取AND,例如存留场景),并将处理后的数据写入目标结构表当中供后续处理使用。
原文链接:https://yq.aliyun.com/articles/712519?utm_content=g_1000070289
本文为云栖社区原创内容,未经允许不得转载。
- 免责声明
- 世链财经作为开放的信息发布平台,所有资讯仅代表作者个人观点,与世链财经无关。如文章、图片、音频或视频出现侵权、违规及其他不当言论,请提供相关材料,发送到:2785592653@qq.com。
- 风险提示:本站所提供的资讯不代表任何投资暗示。投资有风险,入市须谨慎。
- 世链粉丝群:提供最新热点新闻,空投糖果、红包等福利,微信:juu3644。

币圈观察



