【Spring Boot + Datahub】阿里云流数据处理平台 基于2.15版本的数据读写【上】_帽子屋先生的博客-CSDN博客_spring消费datahub

2022-7-3 15:29:26 Author: webcache.googleusercontent.com(查看原文) 阅读量:33728 收藏

DataHub基本介绍
阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布 (Publish),订阅 (Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。DataHub服务可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到DataHub的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。

DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点。DataHub与阿里云流计算引擎StreamCompute无缝连接,用户可以轻松使用SQL进行流数据分析。

DataHub服务也提供分发流式数据到各种云产品的功能,目前支持分发到MaxCompute(原ODPS),OSS等。

系统整体功能图
在这里插入图片描述

产品优势

高吞吐
最高支持单shard每日8000万Record级别的写入量。

实时性
通过 DataHub ,您可以实时的收集各种方式生成的数据并进行实时的处理,对您的业务产生快速的响应。

易用性
DataHub 提供丰富的SDK包,包括C++, JAVA, Pyhon, Ruby, Go等语言。
DataHub服务也提供Restful API规范,您可以用自己的方式实现访问接口。
除了SDK以外,DataHub 还提供一些常用的客户端插件,包括:Fluentd,LogStash,Flume等。您可以使用这些客户端工具往 DataHub 里面写入流式数据。
DataHub 同时支持强Schema的结构化数据(创建Tuple类型的Topic)和无类型的非结构化数据(创建Blob类型的Topic),您可以自由选择。
高可用
服务可用性不低于99.9%。
规模自动扩展,不影响对外服务;数据持久性不低于99.999%。
数据自动多重冗余备份。
动态伸缩
每个主题(Topic)的数据流吞吐能力可以动态扩展和减少,最高可达到每主题256000 Records/s的吞吐量。

高安全性
提供企业级多层次安全防护,多用户资源隔离机制;
提供多种鉴权和授权机制及白名单、主子账号功能。

使用场景

在这里插入图片描述
DataHub作为一个流式数据处理服务,结合阿里云众多云产品,可以构建一站式的数据处理服务。

流计算StreamCompute
StreamCompute是阿里云提供的流计算引擎,提供使用类SQL的语言来进行流式计算。DataHub 和StreamCompute无缝结合,可以作为StreamCompute的数据源和输出源,具体可参考实时计算文档

aliyun-stream-dataflow

流处理应用
用户可以编写应用订阅DataHub中的数据,并进行实时的加工,把加工后的结果输出。用户可以把应用计算产生的结果输出到DataHub中,并使用另外一个应用来处理上一个应用生成的流式数据,来构建数据处理流程的DAG。

流式数据归档
用户的流式数据可以归档到 MaxCompute(原ODPS)中。用户通过创建DataHub Connector,指定相关配置,即可创建将Datahub中流式数据定期归档的同步任务。

以上来自于阿里云平台 Datahub在线文档

好了,先进入正题,Datahub 2.15版本是支持协同消费的,单是要保证阿里云数据总线服务器是最新版本的,否则是不支持协同消费类的,但是低版本数据总线服务器是支持高版本sdk向下兼容的

在这里插入图片描述
一开始我们新建一个项目
在这里插入图片描述
项目新建完成以后,再新建这个项目下的topic
在这里插入图片描述

pom依赖

		<dependency>
            <groupId>com.aliyun.datahubgroupId>
            <artifactId>aliyun-sdk-datahubartifactId>
            <version>2.15.0-publicversion>
        dependency>
        <dependency>
            <groupId>com.aliyun.datahubgroupId>
            <artifactId>datahub-client-libraryartifactId>
            <version>1.1.8-publicversion>
        dependency>

yml 配置 (保密原因自我补全)

# datahub配置文件
public:
  datahub:
    # 配置信息
    config:
      #控制器
      startup: false
      #应用服务器地址
      endpoint:  XXXXXXXXXXXXXXXXXXXXXXXXXXXX
      #用户权限accessId
      accessId: XXXXXXXXXXXXXXXXX
      #用户权限accessKey
      accessKey: XXXXXXXXXXXXXXXx
      serivce:
        #应用名称
      - projectName: XXXXXXXX
        #应用消费——topic名称
        topicGet: XXXXXXXXXXXXXXXXXX
        #应用生产——topic名称
        topicSet: XXXXXXXXXXXXXXXXXX
        #topicid的订阅ID
        subId: XXXXXXXXXXXXXXXXXX

config配置类

package com.encdata.oss.datahubClient;

import com.encdata.oss.system.domain.dto.DataHubConfigDTO;
import com.encdata.oss.system.handler.CustomerThreadPool;
import com.encdata.oss.system.handler.task.DatahubConsumerTask;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static com.encdata.oss.system.controller.BaseController.executorService;

/**
 * Created by IntelliJ IDEA.
 * datahub 配置文件类
 * @author liyiq
 * @date 2020/06/04
 */
@Data
@Component
@ConfigurationProperties(prefix = "public.datahub.config")
public class DatahubConfig {
    /**
     * datahub 控制器
     */
    private Boolean startup;

    /**
     * datahub应用服务器地址
     */
    private String endpoint;
    public static String endpoints;

    /**
     * datahub 用户权限accessId
     */
    private String accessId;
    public static String accessIds;

    /**
     * datahub 用户权限accessKey
     */
    private String accessKey;
    public static String accessKeys;

    /**
     * 应用配置信息集合
     */
    private List<Map> serivce;
    public static List<Map> serivces = new ArrayList<>();

    /**
     * datahub 应用名称
     */
    private String projectName;

    /**
     * datahub 应用消费——topic名称
     */
    private String topicGet;

    /**
     * datahub topicid的订阅ID
     */
    private String subId;

    /**
     * 初始化datahub协同消费类
     * 支持订阅多个topic
     */
    @PostConstruct
    public void datahubRecycling(){

        //todo: 静态变量赋值
        endpoints = this.endpoint;
        accessIds = this.accessId;
        accessKeys = this.accessKey;
        serivces = this.serivce;

        //todo:开启关闭datahub消费控制器
        if(startup){
            if(executorService == null){
                executorService = CustomerThreadPool.createDefaultThreadPool();
            }

            //todo: 多应用分割
            for(int s=0;s<serivce.size();s++){
                Map map = serivce.get(s);
                projectName = map.get("projectName").toString();
                topicGet = map.get("topicGet").toString();
                subId = map.get("subId").toString();
                String[] topics = topicGet.split(",");
                //todo: 多topic循环监听
                for(int i=0;i<topics.length;i++){
                    //todo:封装datahub配置DTO
                    DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO();
                    dataHubConfigDTO
                            .setEndpoint(endpoint)
                            .setAccessId(accessId)
                            .setAccessKey(accessKey)
                            .setProjectName(projectName)
                            .setTopicName(topics[i])
                            .setSubId(subId);
                    //todo:初始化监听线程
                    executorService.execute(new DatahubConsumerTask(dataHubConfigDTO));
                }
            }


        }

    }
}

核心线程池+主线程+工作线程

package com.encdata.oss.system.handler;

import java.util.concurrent.*;

/**
 * 自定义线程池
 *
 * Created by IntelliJ IDEA.
 *
 * @author yangyi
 * @date 2020/06/04
 */
public class CustomerThreadPool {
    /** 线程池核心线程数,即线程池中常驻的线程数量 **/
    private static final int DEFAULT_CORE_POLL_SIZE = 8;

    /** 线程池允许的最大线程数,非核心线程在超时之后会被清除,受限于 CAPACITY,需要根据实际的物理机配置去计算 **/
    private static final int DEFAULT_MAXIMUM_POOL_SIZE = 1024;

    /** 线程没有任务执行时可以保持的时间【非核心线程】 **/
    private static final long DEFAULT_KEEP_ALIVE_TIME = 0;

    /** keepAliveTime 的时间单位 **/
    private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MICROSECONDS;

    /** 阻塞队列的最大容量 **/
    private static final Integer MAX_WORK_QUEUE_CAPACITY = 1024*10;

    /** 任务阻塞队列,用于存储等待执行的任务,默认采用有界队列 **/
    private static final BlockingQueue<Runnable> DEFAULT_WORK_QUEUE = new ArrayBlockingQueue<Runnable>(MAX_WORK_QUEUE_CAPACITY);

    /** 线程工厂,用来创建线程,可自定义对线程的控制**/
    private static final ThreadFactory DEFAULT_THREAD_FACTORY = Executors.defaultThreadFactory();

    /**
     * rejectHandler:当任务队列已满时,拒绝任务提交时的策略:
             AbortPolicy【默认】:丢掉任务,并抛RejectedExecutionException异常。
             DiscardPolicy:直接丢掉任务,不抛异常。
             DiscardOldestPolicy:丢掉最老的任务,然后调用execute立刻执行该任务(新进来的任务)。
             CallerRunsPolicy【推荐】:在调用者的当前线程去执行这个任务。
     */
    private static final RejectedExecutionHandler DEFAULT_HANDLER = new ThreadPoolExecutor.CallerRunsPolicy();

    /**
     * 创建线程池
     * @param corePoolSize 核心线程数
     * @param maximumPoolSize 最大线程数
     * @param keepAliveTime 没有任务执行时非核心线程可以保持的时间
     * @param unit keepAliveTime 的时间单位
     * @param workQueue 任务阻塞队列,用于存储等待执行的任务
     * @param threadFactory 线程工厂
     * @param handler 当任务队列已满时,拒绝任务提交时的策略
     * @return ThreadPoolExecutor
     */
    public static ThreadPoolExecutor createThreadPool(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, handler);
    }

    /***
     * 创建线程池
     * @param corePoolSize 核心线程数
     * @param maximumPoolSize 最大线程数
     * @param keepAliveTime 没有任务执行时非核心线程可以保持的时间
     * @param unit keepAliveTime 的时间单位
     * @param workQueue 任务阻塞队列,用于存储等待执行的任务
     * @param threadFactory 线程工厂
     * @return ThreadPoolExecutor
     */
    public static ThreadPoolExecutor createThreadPool (int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        return createThreadPool(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, DEFAULT_HANDLER);
    }

    /***
     * 创建线程池
     * @param corePoolSize 核心线程数
     * @param maximumPoolSize 最大线程数
     * @param keepAliveTime 没有任务执行时非核心线程可以保持的时间
     * @param unit keepAliveTime 的时间单位
     * @param workQueue 任务阻塞队列,用于存储等待执行的任务
     * @return ThreadPoolExecutor
     */
    public static ThreadPoolExecutor createThreadPool(int corePoolSize,
                                                   int maximumPoolSize,
                                                   long keepAliveTime,
                                                   TimeUnit unit,
                                                   BlockingQueue<Runnable> workQueue) {
        return createThreadPool(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, DEFAULT_THREAD_FACTORY);
    }

    /***
     * 创建线程池
     * @param corePoolSize 核心线程数
     * @param maximumPoolSize 最大线程数
     * @param keepAliveTime 没有任务执行时非核心线程可以保持的时间
     * @param unit keepAliveTime 的时间单位
     * @return ThreadPoolExecutor
     */
    public static ThreadPoolExecutor createThreadPool(int corePoolSize,
                                                   int maximumPoolSize,
                                                   long keepAliveTime,
                                                   TimeUnit unit) {
        return createThreadPool(corePoolSize, maximumPoolSize, keepAliveTime, unit, DEFAULT_WORK_QUEUE);
    }

    /**
     * 创建默认的线程池,配置如下:
     * corePoolSize = 8;
     * maximumPoolSize = 1024;
     * keepAliveTime = 0;
     * unit = TimeUnit.MICROSECONDS
     * workQueue = new ArrayBlockingQueue(10240);
     * threadFactory = Executors.defaultThreadFactory();
     * handler = new ThreadPoolExecutor.CallerRunsPolicy();
     */
    public static ThreadPoolExecutor createDefaultThreadPool() {
        return createThreadPool(DEFAULT_CORE_POLL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME, DEFAULT_TIME_UNIT);
    }
}

package com.encdata.oss.system.handler.task;


import com.encdata.oss.system.exception.BaseException;

/**
 * 抽象工作线程,若想进行多线程工作,可以继承此抽象类,再重写相关方法实现具体的业务逻辑
 * 

* Created by IntelliJ IDEA. * * @author yangyi * @date 2020/06/04 */ public abstract class BaseWorkTask implements Runnable { //工作任务线程 /*@Override public void run() { // 工作示例 System.out.println(Thread.currentThread().getId() + " is start"); process(); System.out.println(Thread.currentThread().getId() + " is over"); }*/ /** * 业务处理方法 * * @throws BaseException 业务处理异常 */ protected abstract void process() throws BaseException; }

package com.encdata.oss.system.handler.task;

import com.encdata.oss.datahubClient.singleSubscription.SubscriptionExample;
import com.encdata.oss.system.domain.dto.DataHubConfigDTO;
import com.encdata.oss.system.exception.BaseException;
import lombok.extern.slf4j.Slf4j;

import static com.encdata.oss.system.controller.BaseController.executorService;

/**
 * Created by IntelliJ IDEA.
 * 多应用,多topic 数据订阅线程
 * @author liyiq
 * @date 2020/06/08
 */
@Slf4j
public class DatahubConsumerTask extends BaseWorkTask {

    /**
     * datahub配置类dto
     */
    private DataHubConfigDTO dataHubConfigDTO;

    /**
     * datahub消费类
     */
    private SubscriptionExample subscriptionExample;

    public DatahubConsumerTask(DataHubConfigDTO dataHubConfigDTO){
        this.dataHubConfigDTO = dataHubConfigDTO;
    }

    @Override
    public void run() {
        if (log.isDebugEnabled()) {
            log.debug("DataHub数据消费信息处理线程【开始】");
        }

        process();

        if (log.isDebugEnabled()) {
            log.debug("DataHub数据消费信息处理线程【结束】=====重新请求");
            executorService.execute(new DatahubConsumerTask(dataHubConfigDTO));
        }
    }




    /**
     * 创建数据消费线程
     * @throws BaseException
     */
    @Override
    protected void process() throws BaseException {

        new SubscriptionExample(dataHubConfigDTO).Start();

    }
}

消费类

package com.encdata.oss.datahubClient.singleSubscription;

import com.aliyun.datahub.DatahubClient;
import com.aliyun.datahub.DatahubConfiguration;
import com.aliyun.datahub.auth.AliyunAccount;
import com.aliyun.datahub.common.data.RecordSchema;
import com.aliyun.datahub.exception.DatahubClientException;
import com.aliyun.datahub.exception.OffsetResetedException;
import com.aliyun.datahub.exception.OffsetSessionChangedException;
import com.aliyun.datahub.exception.SubscriptionOfflineException;
import com.aliyun.datahub.model.*;
import com.encdata.oss.datahubClient.TaskReleaseConsumer;
import com.encdata.oss.system.domain.dto.DataHubConfigDTO;

import java.util.List;

class Consumer{
	private String projectName = null;
	private String topicName = null;
	private String subId = null;
	private String shardId = null;
	private RecordSchema schema = null;
	private DatahubClient client = null;

	public Consumer(String projectName, String topicName, String subId, String shardId, RecordSchema schema,
			DatahubConfiguration conf) {
		this.projectName = projectName;
		this.topicName = topicName;
		this.subId = subId;
		this.shardId = shardId;
		this.schema = schema;
		this.client = new DatahubClient(conf);
	}

	private void commit(OffsetContext offsetCtx) {
		client.commitOffset(offsetCtx);
		//System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
	}

	public void run() {
		try {
			boolean bExit = false;
			// 首先初始化offset上下文
			OffsetContext offsetCtx = client.initOffsetContext(projectName, topicName, subId, shardId);
			String cursor = null; // 开始消费的cursor
			if (!offsetCtx.hasOffset()) {
				// 之前没有存储过点位,先获取初始点位,比如这里获取当前该shard最早的数据
				GetCursorResult cursorResult = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
				cursor = cursorResult.getCursor();
			} else {
				// 否则,获取当前已消费点位的下一个cursor
				GetCursorResult cursorResult = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.SEQUENCE,
						(offsetCtx.getOffset().getSequence() + 1));
				cursor = cursorResult.getCursor();
			}

			/*System.out.println("Start consume shard:" + shardId + ", start offset:" + offsetCtx.toObjectNode().toString()
					+ ", cursor:" + cursor);*/

			long recordNum = 0L;
			while (!bExit) {
				try {
					GetRecordsResult recordResult = client.getRecords(projectName, topicName, shardId, cursor, 10,
							schema);
					List<RecordEntry> records = recordResult.getRecords();
					if (records.size() == 0) {
						// 将最后一次消费点位上报
						commit(offsetCtx);
						// 可以先休眠一会,再继续消费新记录
						Thread.sleep(1000);
						//System.out.println("sleep 1s and continue consume records! shard id:" + shardId);
					} else {
						for (RecordEntry record : records) {
							// 处理记录逻辑
							/*System.out.println("Consume shard:" + shardId + " thread process record:"
									+ record.toJsonNode().toString());*/

							new TaskReleaseConsumer(topicName,record).DataParsing();
							// 上报点位,该示例是每处理100条记录上报一次点位
							offsetCtx.setOffset(record.getOffset());
							recordNum++;
							if (recordNum % 100 == 0) {
								commit(offsetCtx);
							}
						}
						cursor = recordResult.getNextCursor();
					}
				} catch (SubscriptionOfflineException e) {
					// 订阅下线,退出
					bExit = true;
					e.printStackTrace();
				} catch (OffsetResetedException e) {
					// 点位被重置,更新offset上下文
					client.updateOffsetContext(offsetCtx);
					cursor = client.getCursor(projectName, topicName, shardId,
							GetCursorRequest.CursorType.SEQUENCE, (offsetCtx.getOffset().getSequence() + 1)).getCursor();
					System.err.println("Restart consume shard:" + shardId + ", reset offset:"
							+ offsetCtx.toObjectNode().toString() + ", cursor:" + cursor);
				} catch (OffsetSessionChangedException e) {
					// 其他consumer同时消费了该订阅下的相同shard,退出
					bExit = true;
					e.printStackTrace();
				} catch (Exception e) {
					bExit = true;
					e.printStackTrace();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

public class SubscriptionExample {

	private String endpoint;
	private String accessId;
	private String accessKey;
	private String projectName;
	private String topicName;
	private String subId;
	private DatahubConfiguration conf;
	private DatahubClient client;

	public SubscriptionExample(DataHubConfigDTO dataHubConfigDTO) {
		this.endpoint=dataHubConfigDTO.getEndpoint();
		this.accessId=dataHubConfigDTO.getAccessId();
		this.accessKey=dataHubConfigDTO.getAccessKey();
		this.projectName=dataHubConfigDTO.getProjectName();
		this.topicName=dataHubConfigDTO.getTopicName();
		this.subId=dataHubConfigDTO.getSubId();
		this.conf = new DatahubConfiguration(new AliyunAccount(accessId, accessKey), endpoint);
		this.client = new DatahubClient(conf);
	}

	public void Start() {
		GetTopicResult topicResult = client.getTopic(projectName, topicName);
		ListShardResult shardResult = client.listShard(projectName, topicName);
		for (int i = 0; i < shardResult.getShards().size(); ++i) {
			new Consumer(projectName, topicName, subId, shardResult.getShards().get(i).getShardId(),
					topicResult.getRecordSchema(), conf).run();

		}

	}

	public static void main(String[] args) {

	String endpoint = "";
	String accessId = "";
	String accessKey = "";
	String projectName = "";
	String topicName = "";
	String subId = "";
		DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO();
		dataHubConfigDTO
				.setEndpoint(endpoint)
				.setAccessId(accessId)
				.setAccessKey(accessKey)
				.setProjectName(projectName)
				.setTopicName(topicName)
				.setSubId(subId);
		SubscriptionExample example = new SubscriptionExample(dataHubConfigDTO);
		try {
			example.Start();
		} catch (DatahubClientException e) {
			e.printStackTrace();
		}
	}
}

生产者工具类

package com.encdata.oss.datahubClient;


import com.aliyun.datahub.common.data.Field;
import com.aliyun.datahub.common.data.FieldType;
import com.aliyun.datahub.common.data.RecordSchema;
import com.aliyun.datahub.exception.DatahubClientException;
import com.encdata.oss.datahubClient.singleSubscription.DatahubExample;
import com.encdata.oss.system.domain.dto.DataHubConfigDTO;
import com.encdata.oss.system.domain.dto.TaskCenterPRQDTO;
import com.encdata.oss.util.ObjectParseUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

import static com.encdata.oss.datahubClient.DatahubConfig.*;

/**
 * Created by IntelliJ IDEA.
 * 任务发布状态回调类
 * @author liyiq
 * @date 2020/06/08
 */
@Slf4j
public class TaskReleaseProduction<T>{

    private static String projectName;

    private static String topicSet;

    private static DataHubConfigDTO dataHubConfigDTO;

    /**
     * 状态回调成dto
     */
    private T dto;

    public TaskReleaseProduction(T dto){
        this.dto=dto;
    }

    /**
     * 向Datahub发送数据
     */
    public void DataProduction(String topic){
        //todo: 通过不同的应用获取不同的topic
        for (int s = 0; s < serivces.size(); s++) {
            Map map = serivces.get(s);
            projectName = map.get("projectName").toString();
            topicSet = map.get("topicSet").toString();
            dataHubConfigDTO = new DataHubConfigDTO();
            this.dataHubConfigDTO
                    .setEndpoint(endpoints)
                    .setAccessId(accessIds)
                    .setAccessKey(accessKeys)
                    .setProjectName(projectName)
                    .setTopicName(topicSet);
            if (topic.equals(topicSet)) {
                datahubWriter(topic);
            }
        }
    }


    public void datahubWriter(String topic){
        // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
        RecordSchema schema = new RecordSchema();

        Map map = ObjectParseUtils.objectToMap(dto);
        for(Object key : map.keySet()){
            schema.addField(new Field(key.toString(), FieldType.STRING));
        }
        //todo: 根据泛型dto定义schema
//        switch (topic){
//            case "task_center_platform_response":
//                schema.addField(new Field("ref_id", FieldType.STRING));
//                schema.addField(new Field("biz_sys_code", FieldType.STRING));
//                schema.addField(new Field("biz_sys_name", FieldType.STRING));
//                schema.addField(new Field("title", FieldType.STRING));
//                schema.addField(new Field("result_code", FieldType.STRING));
//                schema.addField(new Field("result_msg", FieldType.STRING));
//                break;
//        }

        //todo:单独写入数据方法
        DatahubExample example = new DatahubExample(dataHubConfigDTO);
        try {
            example.init(schema);
            example.putRecords(dto);
            //example.getRecords();
        } catch (DatahubClientException e) {
            e.printStackTrace();
        }

        //todo:协同消费写入数据方法
//        ProducerConfig config = new ProducerConfig(endpoints, accessIds, accessKeys);
//        Producer producer = new Producer(projectName, topicSet, config);
//        try {
//                List recordEntries = genRecords(schema);
//                DatahubWriter.sendRecords(producer, recordEntries);
//        } finally {
//            // 确保资源正确释放
//            producer.close();
//        }
    }

    /*private List genRecords(RecordSchema schema) {
        List recordEntries = new ArrayList<>();
            RecordEntry entry = new RecordEntry();
            //entry.addAttribute("key1", "value1");
            //entry.addAttribute("key2", "value2");
            TupleRecordData data = new TupleRecordData(schema);
            //todo: 通过key值定义同步dto数据到data
            Map map = ObjectParseUtils.objectToMap(dto);
            for (Object key : map.keySet()) {
                data.setField(key.toString(),map.get(key));
            }
            entry.setRecordData(data);
            recordEntries.add(entry);
        return recordEntries;
    }*/


    /*public static void main(String[] args) {
        // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
        String endpoint = "https://datahub.cn-shanghai-shga-d01.dh.alicloud.ga.sh";
        String accessId = "0iWV0NCs805VuAAu";
        String accessKey = "iEwlgpCnXDwT93YMVDb2G60my9ne81";
        String projectName = "sjc_rwzx";
        String topicName = "task_center_platform_request";
        RecordSchema schema = new RecordSchema();

        *//*TaskCenterPRPDTO dto = new TaskCenterPRPDTO();
        dto.setBiz_sys_code("")
                .setBiz_sys_name("")
                .setRef_id("")
                .setResult_code("")
                .setResult_msg("")
                .setTitle("");*//*

        TaskCenterPRQDTO dto = new TaskCenterPRQDTO();
        dto.setBiz_sys_code("test")
            .setBiz_sys_name("测试系统")
            .setBrief("测试任务")
            .setClose_time("")
            .setFlow_start_param("")
            .setRef_id("123")
            .setRemark("备注")
            .setSubmit_by("张三")
            .setTc_flow_id("")
            .setTitle("测试标题");
        Map map = ObjectParseUtils.objectToMap(dto);
//        for(Object key : map.keySet()){
//            schema.addField(new Field(key.toString(), FieldType.STRING));
//        }
        schema.addField(new Field("ref_id", FieldType.STRING));
        schema.addField(new Field("biz_sys_code", FieldType.STRING));
        schema.addField(new Field("biz_sys_name", FieldType.STRING));
        schema.addField(new Field("title", FieldType.STRING));
        schema.addField(new Field("brief", FieldType.STRING));
        schema.addField(new Field("submit_by", FieldType.STRING));
        schema.addField(new Field("close_time", FieldType.STRING));
        schema.addField(new Field("remark", FieldType.STRING));
        schema.addField(new Field("tc_flow_id", FieldType.STRING));
        schema.addField(new Field("flow_start_param", FieldType.STRING));



        ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
        Producer producer = new Producer(projectName, topicName, config);
        // 根据场景控制循环
        boolean stop = false;
        try {
            while (!stop) {
                List recordEntries = new ArrayList<>();
                    RecordEntry entry = new RecordEntry();
                    //entry.addAttribute("key1", "value1");
                    //entry.addAttribute("key2", "value2");
                    TupleRecordData data = new TupleRecordData(schema);
                    //todo: 通过key值定义同步dto数据到data

                    Map map1 = ObjectParseUtils.objectToMap(dto);
                    for (Map.Entry entry1: map1.entrySet()) {
                        data.setField(entry1.getKey(),entry1.getValue());
                    }

                    entry.setRecordData(data);
                    recordEntries.add(entry);
                DatahubWriter.sendRecords(producer, recordEntries);
            }
        } finally {
            // 确保资源正确释放
            producer.close();
        }
    }*/


    public static void datahubSetmessage(TaskCenterPRQDTO dto) {
        String endpoint = "";
        String accessId = "";
        String accessKey = "";
        String projectName = "";
        String topicName = "";
        DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO();
        dataHubConfigDTO
                .setEndpoint(endpoint)
                .setAccessId(accessId)
                .setAccessKey(accessKey)
                .setProjectName(projectName)
                .setTopicName(topicName);
        DatahubExample example = new DatahubExample(dataHubConfigDTO);

        RecordSchema schema = new RecordSchema();
//        schema.addField(new Field("ref_id", FieldType.STRING));
//        schema.addField(new Field("biz_sys_code", FieldType.STRING));
//        schema.addField(new Field("biz_sys_name", FieldType.STRING));
//        schema.addField(new Field("title", FieldType.STRING));
//        schema.addField(new Field("brief", FieldType.STRING));
//        schema.addField(new Field("submit_by", FieldType.STRING));
//        schema.addField(new Field("close_time", FieldType.STRING));
//        schema.addField(new Field("remark", FieldType.STRING));
//        schema.addField(new Field("tc_flow_id", FieldType.STRING));
//        schema.addField(new Field("flow_start_param", FieldType.STRING));

        Map map = ObjectParseUtils.objectToMap(dto);
        for(Object key : map.keySet()){
            schema.addField(new Field(key.toString(), FieldType.STRING));
        }
        try {
            example.init(schema);
            example.putRecords(dto);
            //example.getRecords();
            //example.createADSDataConnector();
        } catch (DatahubClientException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        String endpoint = "";
        String accessId = "";
        String accessKey = "";
        String projectName = "";
        String topicName = "";
        DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO();
        dataHubConfigDTO
                .setEndpoint(endpoint)
                .setAccessId(accessId)
                .setAccessKey(accessKey)
                .setProjectName(projectName)
                .setTopicName(topicName);
        DatahubExample example = new DatahubExample(dataHubConfigDTO);

        RecordSchema schema = new RecordSchema();
//        schema.addField(new Field("ref_id", FieldType.STRING));
//        schema.addField(new Field("biz_sys_code", FieldType.STRING));
//        schema.addField(new Field("biz_sys_name", FieldType.STRING));
//        schema.addField(new Field("title", FieldType.STRING));
//        schema.addField(new Field("brief", FieldType.STRING));
//        schema.addField(new Field("submit_by", FieldType.STRING));
//        schema.addField(new Field("close_time", FieldType.STRING));
//        schema.addField(new Field("remark", FieldType.STRING));
//        schema.addField(new Field("tc_flow_id", FieldType.STRING));
//        schema.addField(new Field("flow_start_param", FieldType.STRING));


        TaskCenterPRQDTO dto = new TaskCenterPRQDTO();
        dto.setBiz_sys_code("sys_task_center")
                .setBiz_sys_name("任务中心")
                .setBrief("任务发起测试")
                .setClose_time("2020-06-21 00:00:00")
                .setFlow_start_param("")
                .setRef_id("1")
                .setRemark("备注")
                .setSubmit_by("test")
                .setTc_flow_id("1")
                .setTitle("测试标题");

        Map map = ObjectParseUtils.objectToMap(dto);
        for(Object key : map.keySet()){
            schema.addField(new Field(key.toString(), FieldType.STRING));
        }
        try {
            example.init(schema);
            example.putRecords(dto);
            //example.getRecords();
            //example.createADSDataConnector();
        } catch (DatahubClientException e) {
            e.printStackTrace();
        }
    }

}


文章来源: https://blog.csdn.net/lyxx1021/article/details/107077579
如有侵权请联系:admin#unsafe.sh