apache-atlas-hive-hook-源码分析
创始人
2024-01-25 17:14:27
0

Atlas Hook类图

Hive 元数据变更有2种实现:
1)基于 Hook 函数实现,实现类为 HiveHook
2)基于MetaStoreEventListener 实现, 实现类为HiveMetastoreHookImpl
所以提供2 种配置,即配置钩子函数或监听器,我们目前采用的是第二种方案。Atlas hook的类图如下所示
在这里插入图片描述Hive hook 开发的步骤
1)创建类实现ExecuteWithHookContext
2)打包上传到HIVE_HOME/lib
3)配置,可在hive client中临时配置,或者设置hive-site.xml配置

Hive Hook源码分析

HiveHook重写了AtlasHook的run方法,根据hive的操作类型,生成事件,通过AtlasHook#notifyEntities发送事件消息。

HiveHook#run

public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
@Override
public void run(HookContext hookContext) throws Exception {
//...try {
// 从hookContext中获取HiveOperationHiveOperation        oper    = OPERATION_MAP.get(hookContext.getOperationName());AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects());BaseHiveEvent        event   = null;
// 匹配不同的HiveOperation, 创建事件        switch (oper) {case CREATEDATABASE:event = new CreateDatabase(context);break;case DROPDATABASE:event = new DropDatabase(context);break;
//...case ALTERTABLE_RENAME:case ALTERVIEW_RENAME:event = new AlterTableRename(context);break;case ALTERTABLE_RENAMECOL:event = new AlterTableRenameCol(context);break;default://...break;}if (event != null) {
// 用户组信息final UserGroupInformation ugi = hookContext.getUgi() == null ? Utils.getUGI() : hookContext.getUgi();
// 委托给AtlasHook#notifyEntitiessuper.notifyEntities(event.getNotificationMessages(), ugi);}} catch (Throwable t) {LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t);}
//...
}
}

hive事件类图
在这里插入图片描述BaseHiveEvent事件主要分为以下几类
1)Hive类型 HIVE_TYPE_
2)Hbase类型 HBASE_TYPE_
3)Hive属性 ATTRIBUTE_
4)Hbase属性 HBASE_
5)Hive关系属性 RELATIONSHIP_
6)其他

AtlasHook#notifyEntities

notifyEntities消息通知有同步和异步两种策略,notifyEntities委托给notifyEntitiesInternal

public static void notifyEntities(List messages, UserGroupInformation ugi, int maxRetries) {if (executor == null) { // send synchronously
// 同步通知notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);} else {
// 异步通知executor.submit(new Runnable() {@Overridepublic void run() {notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);}});}
}

AtlasHook#notifyEntitiesInternal

notifyEntitiesInternal实现了重试策略, 掉用notificationInterface.send方法发送消息

@VisibleForTesting
static void notifyEntitiesInternal(List messages, int maxRetries, UserGroupInformation ugi,NotificationInterface notificationInterface,boolean shouldLogFailedMessages, FailedMessagesLogger logger) {
// ...final int maxAttempts         = maxRetries < 1 ? 1 : maxRetries;Exception notificationFailure = null;
// 多次尝试for (int numAttempt = 1; numAttempt <= maxAttempts; numAttempt++) {if (numAttempt > 1) { // retry attempt
// 重试,线程sleeptry {LOG.debug("Sleeping for {} ms before retry", notificationRetryInterval);Thread.sleep(notificationRetryInterval);} catch (InterruptedException ie) {LOG.error("Notification hook thread sleep interrupted");break;}}try {
//  发送通知,什么地方接收通知if (ugi == null) {notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);} else {PrivilegedExceptionAction privilegedNotify = new PrivilegedExceptionAction() {@Overridepublic Object run() throws Exception {notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);return messages;}};ugi.doAs(privilegedNotify);}notificationFailure = null; // notification sent successfully, reset errorbreak;} catch (Exception e) {notificationFailure = e;LOG.error("Failed to send notification - attempt #{}; error={}", numAttempt, e.getMessage());}}// 通知失败处理
// ...
}
 

通知消息最终由KafkaNotification将消息发送到kafka, KafkaNotification在AtlasHook静态方法种由NotificationProvider创建

notificationInterface的初始化

notificationInterface的初始化在AtlasHook 的static方法中初始化,notificationInterface的实现类为KafkaNotification,消息将发送到kafka中,Atlas服务端对消息进行消费,然后入库。

public abstract class AtlasHook {
//...protected static Configuration         atlasProperties;protected static NotificationInterface notificationInterface;
//...private static       ExecutorService      executor = null;static {try {atlasProperties = ApplicationProperties.get();} catch (Exception e) {LOG.info("Failed to load application properties", e);}
//...metadataNamespace         = getMetadataNamespace(atlasProperties);placeCode                 = getPlaceCode(atlasProperties);notificationMaxRetries    = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
// 消息通知接口实现notificationInterface     = NotificationProvider.get();
//...
}

HiveMetastoreHookImpl源码分析

HiveMetastoreHookImpl

HiveMetastoreHookImpl 继承了MetaStoreEventListener 类,重写了 onCreateDatabase、onDropDatabase等方法,并委托给HiveMetastoreHook#handleEvent处理

public class HiveMetastoreHookImpl extends MetaStoreEventListener {private static final Logger            LOG = LoggerFactory.getLogger(HiveMetastoreHookImpl.class);private        final HiveHook          hiveHook;private        final HiveMetastoreHook hook;public HiveMetastoreHookImpl(Configuration config) {super(config);this.hiveHook = new HiveHook();this.hook     = new HiveMetastoreHook();}
//...@Overridepublic void onCreateDatabase(CreateDatabaseEvent dbEvent) {HiveOperationContext context = new HiveOperationContext(CREATEDATABASE, dbEvent);// 委托给HiveMetastoreHook执行hook.handleEvent(context);}@Overridepublic void onDropDatabase(DropDatabaseEvent dbEvent) {HiveOperationContext context = new HiveOperationContext(DROPDATABASE, dbEvent);hook.handleEvent(context);
}
//...
}

HiveMetastoreHookImpl#handleEvent

handleEvent将变更消息通过AtlasHook#notifyEntities,消息将发送到kafka中,Atlas服务端对消息进行消费,然后入库。

public void handleEvent(HiveOperationContext operContext) {ListenerEvent listenerEvent = operContext.getEvent();
//...try {HiveOperation        oper    = operContext.getOperation();AtlasHiveHookContext context = new AtlasHiveHookContext(hiveHook, oper, hiveHook.getKnownObjects(), this, listenerEvent);BaseHiveEvent        event   = null;switch (oper) {case CREATEDATABASE:event = new CreateDatabase(context);break;//...case ALTERTABLE_RENAMECOL:FieldSchema columnOld = operContext.getColumnOld();FieldSchema columnNew = operContext.getColumnNew();event = new AlterTableRenameCol(columnOld, columnNew, context);break;default:if (LOG.isDebugEnabled()) {LOG.debug("HiveMetastoreHook.handleEvent({}): operation ignored.", listenerEvent);}break;}if (event != null) {final UserGroupInformation ugi = SecurityUtils.getUGI() == null ? Utils.getUGI() : SecurityUtils.getUGI();
// 消息通知,AtlasHook#notifyEntitiessuper.notifyEntities(event.getNotificationMessages(), ugi);}} catch (Throwable t) {LOG.error("HiveMetastoreHook.handleEvent({}): failed to process operation {}", listenerEvent, t);}}
}

相关内容

热门资讯

智能家居工厂实体店加盟,机器人...   作为教育发展金字塔的顶端,北京教育培训机构的发展也是国内顶尖的。双减的实施导致大量教育培训机构裁...
不工作创业有什么方向,谋事创业...                                                   ...
目前投资创业开店,创业的运营模...   早餐、热干面、川菜香、饺子店,20多年餐饮从业经验,对餐饮行业有相当好的了解。今天跟大家说一些开...
创新创业的好点子,高校创新创业...   创新是社会进步的灵魂,创业是推动经济社会发展的重要途径。年轻人充满想象力和创造力,是创新创业的重...
月入一万加盟什么小生意好,耳环...   现在主要是想着摆摊,所以今天专程去了另外一个镇调查,因为我朋友也在那个镇摆摊,所以今天还是去找她...
经典的创业培训方式,创业每日培...   这是一个集中和隔离的好时机。14天之后,如果我什么都不做,我肯定会疯掉。我只是想整理一下这些年的...
想自己创业家人反对怎么办,自己...   结婚20年后,我老公不顾家庭,自己创业。你想离婚吗?      虽然你老公没有外遇,但是你们的婚...
员工创业的好处,员工内部创业的...   作者介绍      @李凯东      有丰富管理经验的大工厂大数据总监;      负责数据、...
一千元创业小本生意,创业图片大...   不要被几千块的工资所困。这八个小生意都是暴利!      第一个小生意:收集废品。      每...
创业路上好伙伴,创业路上感恩朋...   前段时间有个朋友跟我说,如果不想在别的公司工作,可以创业。创业意味着你可以为自己工作。一开始,我...