25 | public long optimalMemoryAllocation; | ||
25 | 26 | ||
26 | public OpProfileDef(int operatorId, int operatorType, int incomingCount) { | ||
27 | public OpProfileDef(int operatorId, int operatorType, int incomingCount, long optimalMemoryAllocation) { |
Will all the creator of OpProfileDef
always pass MaxAllocation
for optimalMemoryAllocation
?
89 | 89 | ||
90 | 90 | @VisibleForTesting | |
91 | public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) { | ||
91 | public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long initialAllocation) { |
suggest to rename initialAllocation
to optimalMemoryAllocation
49 | private final Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> bufferedOperators; | ||
50 | private final Map<DrillNode, List<Pair<PhysicalOperator, Long>>> bufferedOperators; | ||
50 | 51 | private final QueryContext queryContext; | |
52 | private final long MINIMUM_MEMORY_FOR_BUFFER_OPERS; |
upper case variable name should be used only for constants, please change it to lower case.
45 | endpoint.getUserPort() == otherEndpoint.getUserPort() && | ||
46 | endpoint.getControlPort() == otherEndpoint.getControlPort() && | ||
47 | endpoint.getDataPort() == otherEndpoint.getDataPort() && | ||
48 | endpoint.getVersion().equals(otherEndpoint.getVersion()); |
looks like all the fields in DrillbitEndpoint
are optional, so we should check if the field is present or not before calling equals on it. Just like done for hashCode()
below or refer equals
in generated file for DrillbitEndpoint.
84 | .append(endpoint.getAddress()) | ||
85 | .append("endpoint user port: ") | ||
86 | .append(endpoint.getUserPort()).toString(); | ||
87 | } |
check if field is present or not before accessing it.
252 | |||
253 | maxAllocPerNode = Math.min(maxAllocPerNode, perQueryMemory); | ||
254 | return maxAllocPerNode; | ||
255 | } |
Lot of code here and in DefaultMemoryAllocationUtilities
are duplicate. May be create a separate MemoryAllocationUtilities
to keep the common code. Also the same BufferedOpFinder
is defined in Fragment
class as well, would be good to define at one place only and reuse it wherever needed.
31 | 44 | ||
32 | private final long memoryInBytes; | ||
45 | private long memoryInBytes; | ||
46 | |||
47 | private long numVirtualCpu; |
Please move numVirtualCpu
back to int
type.
56 | 59 | } | |
57 | 60 | ||
58 | 61 | // Helper method to compute the minor fragment count per drillbit. This method returns | |
59 | 62 | // a map with key as DrillbitEndpoint and value as the width (i.e #minorFragments) |
key as DrillNode
133 | queryId, queryContext.getOnlineEndpointUUIDs(), rootFragment, | ||
138 | 134 | queryContext.getSession(), queryContext.getQueryContextInfo()); | |
139 | planner.visitPhysicalPlan(queryWorkUnit); | ||
135 | // planner.visitPhysicalPlan(queryWorkUnit); |
remove this commented line
79 | 79 | optional string options_json = 15; | |
80 | 80 | optional QueryContextInformation context = 16; | |
81 | 81 | repeated Collector collector = 17; | |
82 | optional string endpointUUID = 18; |
suggest to rename this to assignedEndpointUUID
since there are 2 endpoints as part of PlanFragment: assignedEndpoint
and foremanEndpoint
Login to write a write a comment.
This PR contains changes for the support of RM Framework both on execution and planning side, tracked by JIRA's DRILL-7191 and DRILL-7026.
Refactoring existing ZK based queue to accommodate new Distributed queue for RM. Moved QueryResourceAllocators memory allocation code to utility classes like ZKQueueMemoryAllocationUtilities and DefaultMemoryAllocationUtilities. Refactored the Parallelizer code to accommodate the memory adjustment for the operators during parallelization phase. There are 3 different implementation of SimpleParallelizer such as ZKQueueParallelizer, DistributedQueueParallelizer and DefaultParallelizer which will be used by ZK based RM, Distributed RM and Non RM configuration.
Planner integration with RM to select queue and reduce query level memory to be within queue limits. Changes to handle scenarios where buffered operator are at least getting minimum required memory allocation. Based on the calculated memory for each operator within each fragment it’s initial and maximum memory allocation is set which is later consumed by execution layer to enforce memory limits.
Introduced new DrillNode class to deal with issues when DrillbitEndpoint is searched in a map using some of it’s field.
Changes to support storing UUID for each Drillbit Service Instance locally to be used by planner and execution layer. This UUID is used to uniquely identify a Drillbit and register Drillbit information in the RM StateBlobs. Introduced a PersistentStore named ZookeeperTransactionalPersistenceStore with Transactional capabilities using Zookeeper Transactional API’s. This is used for updating RM State blobs as all the updates need to happen in transactional manner. Added RMStateBlobs definition and support for serde to Zookeeper. Implementation for DistributedRM and its corresponding QueryRM apis.
Updated the state management of Query in Foreman so that same Foreman object can be submitted multiple times. Also introduced concept of 2 maps keeping track of waiting and running queries. These were done to support for async admit protocol which will be needed with Distributed RM.
Support for serde of optimalMemoryAllocation for each operator in each minor fragment in QueryProfile. This is needed to verify the optimalMemory calculated by planner is correct.