当前位置:网站首页>Presto Event Listener开发

Presto Event Listener开发

2022-08-09 21:54:00 莱恩大数据

一、简介

同Hive Hook一样,Presto也支持自定义实现Event Listener,用于侦听Presto引擎执行查询时发生的事件,并作出相应的处理。我们可以利用该功能实现诸如自定义日志记录、调试和性能分析插件,帮助我们更好的运维Presto集群。但是不同于Hive Hook的是,在Presto集群中,一次只能有一个Event Listener处于活动状态。

Event Listener作为Plugin监听以下事件:

  • Query Creation(查询建立相关信息)
  • Query completion (success or failure)(查询执行相关信息,包含成功查询的细节信息,失败查询的错误码等信息)
  • Split completion (success or failure)(split执行信息,同理包含成功和失败的细节信息)

了解Hook及Listener模式的朋友对于其步骤应该很清楚了,我们只需要:

  1. 实现Presto Event Listener和EventListenerFactory接口。
  2. 正确的打包我们的jar。
  3. 部署,放到Presto指定目录,修改配置文件。

二、实现步骤

  1. 添加pom依赖,如下 
  2. 实现EventListener,该类是我们的核心逻辑所在,供包含上面所说的三个事件:
    public interface EventListener{
    	//query创建的详细信息
        default void queryCreated(QueryCreatedEvent queryCreatedEvent)
        {
        }
    	//query执行的详细信息
        default void queryCompleted(QueryCompletedEvent queryCompletedEvent)
        {
        }
    	//split执行的详细信息
        default void splitCompleted(SplitCompletedEvent splitCompletedEvent)
        {
        }
    }
  3. 实现EventListenerFactory创建我们自己实现的EventListener
    public interface EventListenerFactory {
        String getName();
    
        EventListener create(Map<String, String> config);
    }
    
  4. 实现Plugin接口,实现getEventListenerFactories()方法,获取我们自己实现的EventListenerFactory
    public class NwdEventListenerPlugin implements Plugin {
        @Override
        public Iterable<EventListenerFactory> getEventListenerFactories() {
            return ImmutableList.of(new NwdEventListenerFactory());
        }
    }
  5. 添加配置信息,为etc/event-listener.properties。其中event-listener.name为必备属性,其他属性为我们plugin所需要的信息。

代码实例: 

由于集群运维的需要,先需要将用户的查询历史、查询花费的时间等信息进行统计,以便于后续对各个业务的查询进行优先级分级和评分,方便后续Presto集群稳定性易用性的维护。这里给出一个简单的将这些信息存储到Mysql数据库的样例。

 <presto.version>0.273.3</presto.version>

<dependency>
    <groupId>com.facebook.presto</groupId>
    <artifactId>presto-spi</artifactId>
    <version>${presto.version}</version>
</dependency>
QueryEventListenerFactory
public class QueryEventListenerFactory implements EventListenerFactory {

  @Override
  public String getName() {
    return "query-event-listener";
  }

  @Override
  public EventListener create(Map<String, String> config) {
    if (!config.containsKey("jdbc.uri")) {
      throw new RuntimeException("/etc/event-listener.properties file missing jdbc.uri");
    }
    if (!config.containsKey("jdbc.user")) {
      throw new RuntimeException("/etc/event-listener.properties file missing jdbc.user");
    }
    if (!config.containsKey("jdbc.pwd")) {
      throw new RuntimeException("/etc/event-listener.properties file missing jdbc.pwd");
    }

    return new QueryEventListener(config);
  }
}

 QueryEventPlugin

public class QueryEventPlugin implements Plugin {

  @Override
  public Iterable<EventListenerFactory> getEventListenerFactories() {
    EventListenerFactory listenerFactory = new QueryEventListenerFactory();
    return Arrays.asList(listenerFactory);
  }
}

QueryEventListener

public class NwdEventListener implements EventListener {
    private static final Logger log = Logger.get(NwdEventListener.class);
    private static Properties props = new Properties();
    private static Producer<String, String> producer;

    @Inject
    public NwdEventListener(Map<String, String> config) {
        log.info("###########【PrestoListener】Start ############,config:" + JSON.toJSONString(config));
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        for (Map.Entry<String, String> entry : config.entrySet()) {
            if (entry.getKey().startsWith("kafka")) {
                props.put(entry.getKey().replace("kafka.", ""), entry.getValue());
            }
        }
        Thread.currentThread().setContextClassLoader(null);
        producer = new KafkaProducer<>(props);
    }

    /**
     * 校验内容
     * 1.user账号是否在盘古存在
     * 2.user和password是否匹配(正确)
     * 3.表权限是否通过
     * 4.传入的资源组名是否和使用者匹配(不能使用别的部门资源提交SQL)
     *
     * @param queryCreatedEvent
     */
    @Override
    public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
        log.info("############【QueryCreatedEvent】start checking auth############");
    }

    @Override
    public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {
        log.info("############【QueryCompletedEvent】start ############");
        try {
            long executionTime = queryCompletedEvent.getEndTime().toEpochMilli() - queryCompletedEvent.getExecutionStartTime().toEpochMilli();
            long scheduledTime = queryCompletedEvent.getExecutionStartTime().toEpochMilli() - queryCompletedEvent.getCreateTime().toEpochMilli();

            SQLEvent event = SQLEvent.builder()
                    .metadata(ParserUtils.getMetadata(queryCompletedEvent))
                    .context(ParserUtils.getContext(queryCompletedEvent))
                    .statistics(ParserUtils.getStatistics(queryCompletedEvent))
                    .inputs(ParserUtils.getInputs(queryCompletedEvent))
                    .output(ParserUtils.getOutput(queryCompletedEvent))
                    .failureInfo(ParserUtils.getFailureInfo(queryCompletedEvent))
                    .createTime(queryCompletedEvent.getCreateTime().toEpochMilli())
                    .executionStartTime(queryCompletedEvent.getExecutionStartTime().toEpochMilli())
                    .endTime(queryCompletedEvent.getEndTime().toEpochMilli())
                    .executionTime(executionTime)
                    .scheduledTime(scheduledTime)
                    .build();
            String json = JSONObject.toJSONString(event);
            ProducerRecord<String, String> recodrd = new ProducerRecord<>(props.getProperty("topic"), queryCompletedEvent.getMetadata().getQueryId(), json);
            producer.send(recodrd, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if (exception == null) {
                        log.info(queryCompletedEvent.getMetadata().getQueryId() + " 消息发送成功");
                    } else {
                        log.error(queryCompletedEvent.getMetadata().getQueryId() + " 消息发送失败");
                    }
                }
            });
        } catch (Exception e) {
            log.error(queryCompletedEvent.getMetadata().getQueryId() + " 发送消息发生异常", e);
        }
    }

    @Override
    public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {
    }

}

三、打包

四、部署

  1. 创建配置文件etc/event-listener.properties
  2. 在presto根目录下创建query-event-listener目录,名称与我们上面event listener的name一致
  3. 将我们的jar包和mysql connector的jar包拷贝到上面创建的目录
  4. 重新启动Presto服务即可
原网站

版权声明
本文为[莱恩大数据]所创,转载请带上原文链接,感谢
https://blog.csdn.net/cxl_shelly/article/details/126253435