聊聊skywalking的HTTPAccessLog
2020/4/1 17:01:46
本文主要是介绍聊聊skywalking的HTTPAccessLog,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
序
本文主要研究一下skywalking的HTTPAccessLog
HTTPAccessLog
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/HTTPAccessLog.java
@ScopeDeclaration(id = HTTP_ACCESS_LOG, name = "HTTPAccessLog") public class HTTPAccessLog extends AbstractLog { @Override public int scope() { return HTTP_ACCESS_LOG; } } 复制代码
- HTTPAccessLog继承了AbstractLog,其scope方法返回的是HTTP_ACCESS_LOG
AbstractLog
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/AbstractLog.java
@Setter @Getter public abstract class AbstractLog extends Source { private long timeBucket; private long timestamp; private int serviceId; private int serviceInstanceId; private int endpointId; private String traceId; private int isError; private String statusCode; private ContentType contentType = ContentType.NONE; private String content; @Override public String getEntityId() { throw new UnexpectedException("getEntityId is not supported in AbstractLog source"); } } 复制代码
- AbstractLog继承了Source,它定义了timeBucket、timestamp、serviceId、serviceInstanceId、endpointId、traceId、isError、statusCode、contentType、content属性
HTTPAccessLogDispatcher
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/HTTPAccessLogDispatcher.java
public class HTTPAccessLogDispatcher implements SourceDispatcher<HTTPAccessLog> { @Override public void dispatch(HTTPAccessLog source) { HTTPAccessLogRecord record = new HTTPAccessLogRecord(); record.setTimestamp(source.getTimestamp()); record.setTimeBucket(source.getTimeBucket()); record.setServiceId(source.getServiceId()); record.setServiceInstanceId(source.getServiceInstanceId()); record.setEndpointId(source.getEndpointId()); record.setTraceId(source.getTraceId()); record.setIsError(source.getIsError()); record.setStatusCode(source.getStatusCode()); record.setContentType(source.getContentType().value()); record.setContent(source.getContent()); RecordStreamProcessor.getInstance().in(record); } } 复制代码
- HTTPAccessLogDispatcher实现了SourceDispatcher接口,其dispatch将HTTPAccessLog转换为HTTPAccessLogRecord,然后执行RecordStreamProcessor.getInstance().in(record)
RecordStreamProcessor
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
public class RecordStreamProcessor implements StreamProcessor<Record> { private final static RecordStreamProcessor PROCESSOR = new RecordStreamProcessor(); private Map<Class<? extends Record>, RecordPersistentWorker> workers = new HashMap<>(); public static RecordStreamProcessor getInstance() { return PROCESSOR; } public void in(Record record) { RecordPersistentWorker worker = workers.get(record.getClass()); if (worker != null) { worker.in(record); } } @SuppressWarnings("unchecked") public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) { if (DisableRegister.INSTANCE.include(stream.name())) { return; } StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); IRecordDAO recordDAO; try { recordDAO = storageDAO.newRecordDao(stream.builder().newInstance()); } catch (InstantiationException | IllegalAccessException e) { throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e); } IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true); RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO); workers.put(recordClass, persistentWorker); } } 复制代码
- RecordStreamProcessor实现了StreamProcessor接口,其in方法从workers中找出record.getClass()对应的RecordPersistentWorker,然后执行其in方法
RecordPersistentWorker
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
public class RecordPersistentWorker extends AbstractWorker<Record> { private static final Logger logger = LoggerFactory.getLogger(RecordPersistentWorker.class); private final Model model; private final IRecordDAO recordDAO; private final IBatchDAO batchDAO; RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO) { super(moduleDefineHolder); this.model = model; this.recordDAO = recordDAO; this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class); } @Override public void in(Record record) { try { InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record); batchDAO.asynchronous(insertRequest); } catch (IOException e) { logger.error(e.getMessage(), e); } } } 复制代码
- RecordPersistentWorker继承了AbstractWorker,其in方法执行recordDAO.prepareBatchInsert(model, record),然后用返回的insertRequest执行batchDAO.asynchronous(insertRequest)
小结
HTTPAccessLog继承了AbstractLog,其scope方法返回的是HTTP_ACCESS_LOG;HTTPAccessLogDispatcher实现了SourceDispatcher接口,其dispatch将HTTPAccessLog转换为HTTPAccessLogRecord,然后执行RecordStreamProcessor.getInstance().in(record)
doc
这篇关于聊聊skywalking的HTTPAccessLog的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-04安装 VPrix Desktop 的系统要求-icode9专业技术文章分享
- 2024-05-01巧用 TiCDC Syncpoint 构建银行实时交易和准实时计算一体化架构
- 2024-05-01银行核心背后的落地工程体系丨Oracle - TiDB 数据迁移详解
- 2024-04-26高性能表格工具VTable总体构成-icode9专业技术文章分享
- 2024-04-16软路由代理问题, tg 无法代理问题-icode9专业技术文章分享
- 2024-04-16程序猿用什么锅-icode9专业技术文章分享
- 2024-04-16自建 NAS 的方案-icode9专业技术文章分享
- 2024-04-14ansible 在远程主机上执行脚本,并传入参数-icode9专业技术文章分享
- 2024-04-14ansible 在远程主机上执行脚本,并传入参数, 加上remote_src: yes 配置-icode9专业技术文章分享
- 2024-04-14ansible 检测远程主机的8080端口,如果关闭,则echo 进程已关闭-icode9专业技术文章分享